Files
DH/scrapping.py
2023-04-17 02:51:50 +03:00

244 lines
9.2 KiB
Python

import json
from typing import Dict, List
import requests
from bs4 import BeautifulSoup, ResultSet
import os
from pathlib import Path
import re
import glob
import logging
# parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# os.chdir(parent_dir)
import datetime
logging.basicConfig(
level=logging.INFO, filename=f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log', filemode='w',
format='%(name)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
JSONS_DIR = "jsons_unzipped"
SUB_PROJECTS_IN_NEO_ASS = ["01", "05", "06", "07", "09", "11", "12", "14", "15", "16"]
CORPUS_DIRNAME = "corpusjson"
def _load_json_from_path(json_path: str) -> Dict:
"""
This helper function loads a json from a given path, with the exception of empty files.
:param json_path: A given string representing a path to a json file.
:return: The json file (if it is a valid json file).
"""
with open(json_path, "r", encoding='utf-8') as json_file:
if os.stat(json_path).st_size != 0: # If the file is not empty:
return json.load(json_file)
def _download_data_from_website(url: str) -> ResultSet:
try:
res = requests.get(url)
soup = BeautifulSoup(res.text, "html.parser")
return soup.find_all("span", {"class": "cell"})
except Exception as e:
logging.error(e)
return list()
def _clean_raw_text(results: ResultSet) -> str:
return " ".join(["".join([content if isinstance(content, str) else content.text
for content in result.contents]) for result in results]).replace('\n', ' ')
def get_raw_english_texts_of_project(project_dirname: str, oracc_site: str = 'oracc.museum.upenn.edu') -> List[Dict]:
raw_jsons = list()
all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True) + glob.glob(
f'jsons_unzipped/{project_dirname}/corpusjson/*.json', recursive=True)
# path = Path(os.path.join(JSONS_DIR, project_dirname, 'catalogue.json'))
# if not os.path.isfile(path):
# return raw_jsons
# d = _load_json_to_dict(str(path))
# if d and d.get('members'):
# for member in d.get('members').values():
for filename in all_paths:
cur_json = _load_json_from_path(filename)
try:
project_name = cur_json['project']
except TypeError:
logging.error(f"Error in {filename}")
continue
# # Skip in case we are in saa project and the current sub project is not in neo-assyrian
# if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate
# continue
# id_text = member.get('id_text', "") + member.get('id_composite', "")
# html_dir = "/".join(path.parts[1:-1])
url = f"http://{oracc_site}/{project_name}/{cur_json['textid']}/html"
# print(url)
logging.info(url)
try:
raw_text = _clean_raw_text(_download_data_from_website(url))
if raw_text:
raw_jsons.append({
"id_text": cur_json['textid'],
"project_name": project_name,
"raw_text": raw_text,
})
except Exception as e:
logging.error(e)
return raw_jsons
def num_words_in_english(jsonl_file):
words_counter = 0
with open(jsonl_file, "r", encoding='utf-8') as f_in:
for line in f_in:
cur_json = json.loads(line)
if cur_json["project_name"].startswith("saa"):
cur_json["raw_text"] = re.sub(r'\([^)]*\)', '', cur_json["raw_text"])
words_counter += len(cur_json["raw_text"].split())
print(words_counter)
def get_raw_text_akk_from_html(id_text, project_name):
# else: # If the file doesn't exist in the jsons -> look for it online
url = f'http://oracc.iaas.upenn.edu/{project_name}/{id_text}/html'
res = requests.get(url)
if res.status_code != 200:
print("******STATUS CODE IS NOT 200***********")
return ""
soup = BeautifulSoup(res.text, "html.parser")
# print(f"Check this out:\n{url}")
raw_text = _get_raw_text_html(soup)
# cur_json = {"id_text": id_text, "project_name": project_name, "raw_text": raw_text}
return raw_text
def _get_raw_text_html1(soup):
words = soup.find_all("a", class_="cbd")
return ' '.join(tag.text for tag in soup.find_all('p', class_='tt'))
return ' '.join([word.text for word in words])
def _get_raw_text_html(soup):
tags = soup.find_all('span', class_=lambda value: value and value.startswith('w '))
signs = list()
for tag in tags:
temp_tag = tag.find('a')
if temp_tag:
tag = temp_tag
for sign in tag.contents:
if isinstance(sign, str):
signs.append(sign)
elif sign.name == 'span':
signs.append(sign.text)
elif sign.name == 'sup':
signs.append("{" + sign.text + "}")
return ' '.join(signs)
def get_raw_akk_texts_of_project(project_dirname: str) -> List[Dict]:
"""
This function parses the raw texts of a project in ORACC.
:param project_dirname: A given string representing the path to the project's directory.
:return: A list of jsons containing the raw texts of the given project and basic metadata.
"""
raw_jsons = list()
all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True)+glob.glob(
f'jsons_unzipped/{project_dirname}/corpusjson/*.json', recursive=True)
for filename in all_paths:
cur_json = _load_json_from_path(filename)
try:
sents_dicts = cur_json['cdl'][0]['cdl'][-1]['cdl']
except Exception as e:
print(f"In file {filename} failed because of {e}")
continue
raw_text = get_raw_akk_text_from_json(sents_dicts)
raw_jsons.append({
"id_text": cur_json['textid'],
"raw_text": raw_text
})
# if not texts_jsons or not texts_jsons.get('members'):
# return raw_jsons
# for member in texts_jsons.get('members').values(): # Iterate over different tablets:
# project_name = member['project'].split("/")[-1]
#
# # Skip in case we are in saa project and the current sub project is not in neo-assyrian
# if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate
# continue
#
# id_text = member.get("id_text", "") + member.get("id_composite", "")
# json_file_path = os.path.join(JSONS_DIR, Path(member['project']), CORPUS_DIRNAME, f'{id_text}.json')
#
# if os.path.isfile(json_file_path): # If file exists in the jsons
# d = _load_json_to_dict(json_file_path)
#
# try:
# sents_dicts = d['cdl'][0]['cdl'][-1]['cdl']
# except Exception as e:
# print(f"In file {json_file_path} failed because of {e}")
# continue
#
# raw_text = " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)])
# cur_json = {
# "id_text": id_text,
# "project_name": project_name,
# "raw_text": raw_text,
# }
# raw_jsons.append(cur_json)
return raw_jsons
def get_raw_akk_text_from_json(sents_dicts):
return " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)])
def _get_raw_text(json_dict: dict) -> str:
"""
This function gets the raw text of a given transliterated tablet in ORACC recursively.
It appends each instance in the tablet only once (even if there are multiple possible meanings).
:param json_dict: A given dictionary representing some portion of the words in the tablet.
:return: The aforementioned raw text.
"""
previous_ref: str = ""
raw_texts = list()
for d in json_dict:
if _is_sent(d): # If node represents a sentence -> call recursively to the inner dictionary
raw_texts.extend(_get_raw_text(d['cdl']).split())
elif _is_word(d): # If node represents a word
if previous_ref != d.get('ref'): # If encountered new instance:
cur_text = d['f']['norm'] if d['f'].get('norm') else d['f']['form']
raw_texts.append(cur_text + _get_addition(d))
previous_ref = d.get('ref')
return " ".join(raw_texts)
def _is_sent(d: Dict) -> bool:
return d.get('node') == 'c'
def _is_word(d: Dict) -> bool:
return d.get('node') == 'l'
def _get_addition(d: Dict) -> str:
"""
This function looks for an asterisk or a question mark in a dictionary representing a word in a tablet from ORACC.
:param d: A given dictionary as described above.
:return An asterisk or a question mark if one of the word's signs has one, otherwise an empty string.
"""
has_signs_dicts = 'f' in d and 'gdl' in d.get('f')
if has_signs_dicts:
for sign_dict in d['f']['gdl']:
if 'gdl_collated' in sign_dict: # If cur sign has an asterisk
return "*"
if 'queried' in sign_dict: # If cur sign has a question mark
return "?"
return ""