import json from typing import Dict, List import requests from bs4 import BeautifulSoup, ResultSet import os from pathlib import Path import re import glob import logging # parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # os.chdir(parent_dir) import datetime logging.basicConfig( level=logging.INFO, filename=f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S') JSONS_DIR = "jsons_unzipped" SUB_PROJECTS_IN_NEO_ASS = ["01", "05", "06", "07", "09", "11", "12", "14", "15", "16"] CORPUS_DIRNAME = "corpusjson" def _load_json_from_path(json_path: str) -> Dict: """ This helper function loads a json from a given path, with the exception of empty files. :param json_path: A given string representing a path to a json file. :return: The json file (if it is a valid json file). """ with open(json_path, "r", encoding='utf-8') as json_file: if os.stat(json_path).st_size != 0: # If the file is not empty: return json.load(json_file) def _download_data_from_website(url: str) -> ResultSet: try: res = requests.get(url) soup = BeautifulSoup(res.text, "html.parser") return soup.find_all("span", {"class": "cell"}) except Exception as e: logging.error(e) return list() def _clean_raw_text(results: ResultSet) -> str: return " ".join(["".join([content if isinstance(content, str) else content.text for content in result.contents]) for result in results]).replace('\n', ' ') def get_raw_english_texts_of_project(project_dirname: str, oracc_site: str = 'oracc.museum.upenn.edu') -> List[Dict]: raw_jsons = list() all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True) + glob.glob( f'jsons_unzipped/{project_dirname}/corpusjson/*.json', recursive=True) # path = Path(os.path.join(JSONS_DIR, project_dirname, 'catalogue.json')) # if not os.path.isfile(path): # return raw_jsons # d = _load_json_to_dict(str(path)) # if d and d.get('members'): # for member in d.get('members').values(): for filename in all_paths: cur_json = _load_json_from_path(filename) try: project_name = cur_json['project'] except TypeError: logging.error(f"Error in {filename}") continue # # Skip in case we are in saa project and the current sub project is not in neo-assyrian # if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate # continue # id_text = member.get('id_text', "") + member.get('id_composite', "") # html_dir = "/".join(path.parts[1:-1]) url = f"http://{oracc_site}/{project_name}/{cur_json['textid']}/html" # print(url) logging.info(url) try: raw_text = _clean_raw_text(_download_data_from_website(url)) if raw_text: raw_jsons.append({ "id_text": cur_json['textid'], "project_name": project_name, "raw_text": raw_text, }) except Exception as e: logging.error(e) return raw_jsons def num_words_in_english(jsonl_file): words_counter = 0 with open(jsonl_file, "r", encoding='utf-8') as f_in: for line in f_in: cur_json = json.loads(line) if cur_json["project_name"].startswith("saa"): cur_json["raw_text"] = re.sub(r'\([^)]*\)', '', cur_json["raw_text"]) words_counter += len(cur_json["raw_text"].split()) print(words_counter) def get_raw_text_akk_from_html(id_text, project_name): # else: # If the file doesn't exist in the jsons -> look for it online url = f'http://oracc.iaas.upenn.edu/{project_name}/{id_text}/html' res = requests.get(url) if res.status_code != 200: print("******STATUS CODE IS NOT 200***********") return "" soup = BeautifulSoup(res.text, "html.parser") # print(f"Check this out:\n{url}") raw_text = _get_raw_text_html(soup) # cur_json = {"id_text": id_text, "project_name": project_name, "raw_text": raw_text} return raw_text def _get_raw_text_html1(soup): words = soup.find_all("a", class_="cbd") return ' '.join(tag.text for tag in soup.find_all('p', class_='tt')) return ' '.join([word.text for word in words]) def _get_raw_text_html(soup): tags = soup.find_all('span', class_=lambda value: value and value.startswith('w ')) signs = list() for tag in tags: temp_tag = tag.find('a') if temp_tag: tag = temp_tag for sign in tag.contents: if isinstance(sign, str): signs.append(sign) elif sign.name == 'span': signs.append(sign.text) elif sign.name == 'sup': signs.append("{" + sign.text + "}") return ' '.join(signs) def get_raw_akk_texts_of_project(project_dirname: str) -> List[Dict]: """ This function parses the raw texts of a project in ORACC. :param project_dirname: A given string representing the path to the project's directory. :return: A list of jsons containing the raw texts of the given project and basic metadata. """ raw_jsons = list() all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True)+glob.glob( f'jsons_unzipped/{project_dirname}/corpusjson/*.json', recursive=True) for filename in all_paths: cur_json = _load_json_from_path(filename) try: sents_dicts = cur_json['cdl'][0]['cdl'][-1]['cdl'] except Exception as e: print(f"In file {filename} failed because of {e}") continue raw_text = get_raw_akk_text_from_json(sents_dicts) raw_jsons.append({ "id_text": cur_json['textid'], "raw_text": raw_text }) # if not texts_jsons or not texts_jsons.get('members'): # return raw_jsons # for member in texts_jsons.get('members').values(): # Iterate over different tablets: # project_name = member['project'].split("/")[-1] # # # Skip in case we are in saa project and the current sub project is not in neo-assyrian # if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate # continue # # id_text = member.get("id_text", "") + member.get("id_composite", "") # json_file_path = os.path.join(JSONS_DIR, Path(member['project']), CORPUS_DIRNAME, f'{id_text}.json') # # if os.path.isfile(json_file_path): # If file exists in the jsons # d = _load_json_to_dict(json_file_path) # # try: # sents_dicts = d['cdl'][0]['cdl'][-1]['cdl'] # except Exception as e: # print(f"In file {json_file_path} failed because of {e}") # continue # # raw_text = " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)]) # cur_json = { # "id_text": id_text, # "project_name": project_name, # "raw_text": raw_text, # } # raw_jsons.append(cur_json) return raw_jsons def get_raw_akk_text_from_json(sents_dicts): return " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)]) def _get_raw_text(json_dict: dict) -> str: """ This function gets the raw text of a given transliterated tablet in ORACC recursively. It appends each instance in the tablet only once (even if there are multiple possible meanings). :param json_dict: A given dictionary representing some portion of the words in the tablet. :return: The aforementioned raw text. """ previous_ref: str = "" raw_texts = list() for d in json_dict: if _is_sent(d): # If node represents a sentence -> call recursively to the inner dictionary raw_texts.extend(_get_raw_text(d['cdl']).split()) elif _is_word(d): # If node represents a word if previous_ref != d.get('ref'): # If encountered new instance: cur_text = d['f']['norm'] if d['f'].get('norm') else d['f']['form'] raw_texts.append(cur_text + _get_addition(d)) previous_ref = d.get('ref') return " ".join(raw_texts) def _is_sent(d: Dict) -> bool: return d.get('node') == 'c' def _is_word(d: Dict) -> bool: return d.get('node') == 'l' def _get_addition(d: Dict) -> str: """ This function looks for an asterisk or a question mark in a dictionary representing a word in a tablet from ORACC. :param d: A given dictionary as described above. :return An asterisk or a question mark if one of the word's signs has one, otherwise an empty string. """ has_signs_dicts = 'f' in d and 'gdl' in d.get('f') if has_signs_dicts: for sign_dict in d['f']['gdl']: if 'gdl_collated' in sign_dict: # If cur sign has an asterisk return "*" if 'queried' in sign_dict: # If cur sign has a question mark return "?" return ""