242 lines
9.2 KiB
Python
242 lines
9.2 KiB
Python
import json
|
|
from typing import Dict, List
|
|
import requests
|
|
from bs4 import BeautifulSoup, ResultSet
|
|
import os
|
|
from pathlib import Path
|
|
import re
|
|
import glob
|
|
import logging
|
|
# parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
# os.chdir(parent_dir)
|
|
import datetime
|
|
logging.basicConfig(
|
|
level=logging.INFO, filename=f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log', filemode='w',
|
|
format='%(name)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
|
|
JSONS_DIR = "jsons_unzipped"
|
|
|
|
SUB_PROJECTS_IN_NEO_ASS = ["01", "05", "06", "07", "09", "11", "12", "14", "15", "16"]
|
|
|
|
CORPUS_DIRNAME = "corpusjson"
|
|
|
|
|
|
def _load_json_from_path(json_path: str) -> Dict:
|
|
"""
|
|
This helper function loads a json from a given path, with the exception of empty files.
|
|
|
|
:param json_path: A given string representing a path to a json file.
|
|
:return: The json file (if it is a valid json file).
|
|
"""
|
|
with open(json_path, "r", encoding='utf-8') as json_file:
|
|
if os.stat(json_path).st_size != 0: # If the file is not empty:
|
|
return json.load(json_file)
|
|
|
|
def _download_data_from_website(url: str) -> ResultSet:
|
|
try:
|
|
res = requests.get(url)
|
|
soup = BeautifulSoup(res.text, "html.parser")
|
|
return soup.find_all("span", {"class": "cell"})
|
|
except Exception as e:
|
|
logging.error(e)
|
|
return ResultSet()
|
|
|
|
def _clean_raw_text(results: ResultSet)->str:
|
|
return " ".join(["".join([content if isinstance(content, str) else content.text
|
|
for content in result.contents]) for result in results]).replace('\n', ' ')
|
|
|
|
def get_raw_english_texts_of_project(project_dirname: str, oracc_site: str = 'oracc.museum.upenn.edu') -> List[Dict]:
|
|
raw_jsons = list()
|
|
all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True) + glob.glob(
|
|
f'jsons_unzipped/{project_dirname}/corpusjson/*.json', recursive=True)
|
|
# path = Path(os.path.join(JSONS_DIR, project_dirname, 'catalogue.json'))
|
|
# if not os.path.isfile(path):
|
|
# return raw_jsons
|
|
# d = _load_json_to_dict(str(path))
|
|
# if d and d.get('members'):
|
|
# for member in d.get('members').values():
|
|
for filename in all_paths:
|
|
cur_json = _load_json_from_path(filename)
|
|
try:
|
|
project_name = cur_json['project']
|
|
except TypeError:
|
|
logging.error(f"Error in {filename}")
|
|
continue
|
|
# # Skip in case we are in saa project and the current sub project is not in neo-assyrian
|
|
# if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate
|
|
# continue
|
|
|
|
# id_text = member.get('id_text', "") + member.get('id_composite', "")
|
|
# html_dir = "/".join(path.parts[1:-1])
|
|
url = f"http://{oracc_site}/{project_name}/{cur_json['textid']}/html"
|
|
# print(url)
|
|
logging.info(url)
|
|
try:
|
|
raw_text = _clean_raw_text(_download_data_from_website(url))
|
|
if raw_text:
|
|
raw_jsons.append({
|
|
"id_text": cur_json['textid'],
|
|
"project_name": project_name,
|
|
"raw_text": raw_text,
|
|
})
|
|
except Exception as e:
|
|
logging.error(e)
|
|
return raw_jsons
|
|
|
|
|
|
def num_words_in_english(jsonl_file):
|
|
words_counter = 0
|
|
with open(jsonl_file, "r", encoding='utf-8') as f_in:
|
|
for line in f_in:
|
|
cur_json = json.loads(line)
|
|
if cur_json["project_name"].startswith("saa"):
|
|
cur_json["raw_text"] = re.sub(r'\([^)]*\)', '', cur_json["raw_text"])
|
|
words_counter += len(cur_json["raw_text"].split())
|
|
print(words_counter)
|
|
|
|
|
|
def get_raw_text_akk_from_html(id_text, project_name):
|
|
# else: # If the file doesn't exist in the jsons -> look for it online
|
|
url = f'http://oracc.iaas.upenn.edu/{project_name}/{id_text}/html'
|
|
res = requests.get(url)
|
|
if res.status_code != 200:
|
|
print("******STATUS CODE IS NOT 200***********")
|
|
return ""
|
|
soup = BeautifulSoup(res.text, "html.parser")
|
|
# print(f"Check this out:\n{url}")
|
|
raw_text = _get_raw_text_html(soup)
|
|
# cur_json = {"id_text": id_text, "project_name": project_name, "raw_text": raw_text}
|
|
return raw_text
|
|
|
|
|
|
def _get_raw_text_html1(soup):
|
|
words = soup.find_all("a", class_="cbd")
|
|
return ' '.join(tag.text for tag in soup.find_all('p', class_='tt'))
|
|
return ' '.join([word.text for word in words])
|
|
|
|
|
|
def _get_raw_text_html(soup):
|
|
tags = soup.find_all('span', class_=lambda value: value and value.startswith('w '))
|
|
signs = list()
|
|
for tag in tags:
|
|
temp_tag = tag.find('a')
|
|
if temp_tag:
|
|
tag = temp_tag
|
|
for sign in tag.contents:
|
|
if isinstance(sign, str):
|
|
signs.append(sign)
|
|
elif sign.name == 'span':
|
|
signs.append(sign.text)
|
|
elif sign.name == 'sup':
|
|
signs.append("{" + sign.text + "}")
|
|
return ' '.join(signs)
|
|
|
|
|
|
def get_raw_akk_texts_of_project(project_dirname: str) -> List[Dict]:
|
|
"""
|
|
This function parses the raw texts of a project in ORACC.
|
|
|
|
:param project_dirname: A given string representing the path to the project's directory.
|
|
:return: A list of jsons containing the raw texts of the given project and basic metadata.
|
|
"""
|
|
raw_jsons = list()
|
|
all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True)
|
|
|
|
for filename in all_paths:
|
|
cur_json = _load_json_from_path(filename)
|
|
|
|
try:
|
|
project_name = cur_json['project']
|
|
sents_dicts = cur_json['cdl'][0]['cdl'][-1]['cdl']
|
|
except Exception as e:
|
|
print(f"In file {filename} failed because of {e}")
|
|
continue
|
|
|
|
raw_text = get_raw_akk_text_from_json(sents_dicts)
|
|
raw_jsons.append({
|
|
"id_text": cur_json['textid'],
|
|
"project_name": project_name,
|
|
"raw_text": raw_text,
|
|
})
|
|
|
|
# if not texts_jsons or not texts_jsons.get('members'):
|
|
# return raw_jsons
|
|
# for member in texts_jsons.get('members').values(): # Iterate over different tablets:
|
|
# project_name = member['project'].split("/")[-1]
|
|
#
|
|
# # Skip in case we are in saa project and the current sub project is not in neo-assyrian
|
|
# if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate
|
|
# continue
|
|
#
|
|
# id_text = member.get("id_text", "") + member.get("id_composite", "")
|
|
# json_file_path = os.path.join(JSONS_DIR, Path(member['project']), CORPUS_DIRNAME, f'{id_text}.json')
|
|
#
|
|
# if os.path.isfile(json_file_path): # If file exists in the jsons
|
|
# d = _load_json_to_dict(json_file_path)
|
|
#
|
|
# try:
|
|
# sents_dicts = d['cdl'][0]['cdl'][-1]['cdl']
|
|
# except Exception as e:
|
|
# print(f"In file {json_file_path} failed because of {e}")
|
|
# continue
|
|
#
|
|
# raw_text = " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)])
|
|
# cur_json = {
|
|
# "id_text": id_text,
|
|
# "project_name": project_name,
|
|
# "raw_text": raw_text,
|
|
# }
|
|
# raw_jsons.append(cur_json)
|
|
|
|
return raw_jsons
|
|
|
|
|
|
def get_raw_akk_text_from_json(sents_dicts):
|
|
return " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)])
|
|
|
|
|
|
def _get_raw_text(json_dict: dict) -> str:
|
|
"""
|
|
This function gets the raw text of a given transliterated tablet in ORACC recursively.
|
|
It appends each instance in the tablet only once (even if there are multiple possible meanings).
|
|
|
|
:param json_dict: A given dictionary representing some portion of the words in the tablet.
|
|
:return: The aforementioned raw text.
|
|
"""
|
|
previous_ref: str = ""
|
|
raw_texts = list()
|
|
for d in json_dict:
|
|
if _is_sent(d): # If node represents a sentence -> call recursively to the inner dictionary
|
|
raw_texts.extend(_get_raw_text(d['cdl']).split())
|
|
elif _is_word(d): # If node represents a word
|
|
if previous_ref != d.get('ref'): # If encountered new instance:
|
|
cur_text = d['frag'] if d.get('frag') else d['f']['form']
|
|
raw_texts.append(cur_text + _get_addition(d))
|
|
previous_ref = d.get('ref')
|
|
|
|
return " ".join(raw_texts)
|
|
|
|
|
|
def _is_sent(d: Dict) -> bool:
|
|
return d.get('node') == 'c'
|
|
|
|
|
|
def _is_word(d: Dict) -> bool:
|
|
return d.get('node') == 'l'
|
|
|
|
|
|
def _get_addition(d: Dict) -> str:
|
|
"""
|
|
This function looks for an asterisk or a question mark in a dictionary representing a word in a tablet from ORACC.
|
|
|
|
:param d: A given dictionary as described above.
|
|
:return An asterisk or a question mark if one of the word's signs has one, otherwise an empty string.
|
|
"""
|
|
has_signs_dicts = 'f' in d and 'gdl' in d.get('f')
|
|
if has_signs_dicts:
|
|
for sign_dict in d['f']['gdl']:
|
|
if 'gdl_collated' in sign_dict: # If cur sign has an asterisk
|
|
return "*"
|
|
if 'queried' in sign_dict: # If cur sign has a question mark
|
|
return "?"
|
|
return ""
|