Files
DH/scrapping.py
2023-04-12 12:34:56 +03:00

232 lines
8.9 KiB
Python

import json
from typing import Dict, List
import requests
from bs4 import BeautifulSoup
import os
from pathlib import Path
import re
import glob
import logging
# parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# os.chdir(parent_dir)
import datetime
logging.basicConfig(
level=logging.INFO, filename=f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log', filemode='w',
format='%(name)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
JSONS_DIR = "jsons_unzipped"
SUB_PROJECTS_IN_NEO_ASS = ["01", "05", "06", "07", "09", "11", "12", "14", "15", "16"]
CORPUS_DIRNAME = "corpusjson"
def _load_json_from_path(json_path: str) -> Dict:
"""
This helper function loads a json from a given path, with the exception of empty files.
:param json_path: A given string representing a path to a json file.
:return: The json file (if it is a valid json file).
"""
with open(json_path, "r", encoding='utf-8') as json_file:
if os.stat(json_path).st_size != 0: # If the file is not empty:
return json.load(json_file)
def get_raw_english_texts_of_project(project_dirname: str, oracc_site: str = 'oracc.museum.upenn.edu') -> List[Dict]:
raw_jsons = list()
all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True) + glob.glob(
f'jsons_unzipped/{project_dirname}/corpusjson/*.json', recursive=True)
# path = Path(os.path.join(JSONS_DIR, project_dirname, 'catalogue.json'))
# if not os.path.isfile(path):
# return raw_jsons
# d = _load_json_to_dict(str(path))
# if d and d.get('members'):
# for member in d.get('members').values():
for filename in all_paths:
cur_json = _load_json_from_path(filename)
project_name = cur_json['project']
# # Skip in case we are in saa project and the current sub project is not in neo-assyrian
# if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate
# continue
# id_text = member.get('id_text', "") + member.get('id_composite', "")
# html_dir = "/".join(path.parts[1:-1])
url = f"http://{oracc_site}/{project_name}/{cur_json['textid']}/html"
# print(url)
logging.info(url)
try:
res = requests.get(url)
soup = BeautifulSoup(res.text, "html.parser")
results = soup.find_all("span", {"class": "cell"})
raw_text = " ".join(["".join([content if isinstance(content, str) else content.text
for content in result.contents]) for result in results])
raw_text = raw_text.replace('\n', ' ')
if raw_text:
raw_jsons.append({
"id_text": cur_json['textid'],
"project_name": project_name,
"raw_text": raw_text,
})
except Exception as e:
logging.error(e)
return raw_jsons
def num_words_in_english(jsonl_file):
words_counter = 0
with open(jsonl_file, "r", encoding='utf-8') as f_in:
for line in f_in:
cur_json = json.loads(line)
if cur_json["project_name"].startswith("saa"):
cur_json["raw_text"] = re.sub(r'\([^)]*\)', '', cur_json["raw_text"])
words_counter += len(cur_json["raw_text"].split())
print(words_counter)
def get_raw_text_akk_from_html(id_text, project_name):
# else: # If the file doesn't exist in the jsons -> look for it online
url = f'http://oracc.iaas.upenn.edu/{project_name}/{id_text}/html'
res = requests.get(url)
if res.status_code != 200:
print("******STATUS CODE IS NOT 200***********")
return ""
soup = BeautifulSoup(res.text, "html.parser")
# print(f"Check this out:\n{url}")
raw_text = _get_raw_text_html(soup)
# cur_json = {"id_text": id_text, "project_name": project_name, "raw_text": raw_text}
return raw_text
def _get_raw_text_html1(soup):
words = soup.find_all("a", class_="cbd")
return ' '.join(tag.text for tag in soup.find_all('p', class_='tt'))
return ' '.join([word.text for word in words])
def _get_raw_text_html(soup):
tags = soup.find_all('span', class_=lambda value: value and value.startswith('w '))
signs = list()
for tag in tags:
temp_tag = tag.find('a')
if temp_tag:
tag = temp_tag
for sign in tag.contents:
if isinstance(sign, str):
signs.append(sign)
elif sign.name == 'span':
signs.append(sign.text)
elif sign.name == 'sup':
signs.append("{" + sign.text + "}")
return ' '.join(signs)
def get_raw_akk_texts_of_project(project_dirname: str) -> List[Dict]:
"""
This function parses the raw texts of a project in ORACC.
:param project_dirname: A given string representing the path to the project's directory.
:return: A list of jsons containing the raw texts of the given project and basic metadata.
"""
raw_jsons = list()
all_paths = glob.glob(f'jsons_unzipped/{project_dirname}/**/corpusjson/*.json', recursive=True)
for filename in all_paths:
cur_json = _load_json_from_path(filename)
try:
project_name = cur_json['project']
sents_dicts = cur_json['cdl'][0]['cdl'][-1]['cdl']
except Exception as e:
print(f"In file {filename} failed because of {e}")
continue
raw_text = get_raw_akk_text_from_json(sents_dicts)
raw_jsons.append({
"id_text": cur_json['textid'],
"project_name": project_name,
"raw_text": raw_text,
})
# if not texts_jsons or not texts_jsons.get('members'):
# return raw_jsons
# for member in texts_jsons.get('members').values(): # Iterate over different tablets:
# project_name = member['project'].split("/")[-1]
#
# # Skip in case we are in saa project and the current sub project is not in neo-assyrian
# if project_dirname == "saao" and project_name[-2:] not in SUB_PROJECTS_IN_NEO_ASS: # TODO: validate
# continue
#
# id_text = member.get("id_text", "") + member.get("id_composite", "")
# json_file_path = os.path.join(JSONS_DIR, Path(member['project']), CORPUS_DIRNAME, f'{id_text}.json')
#
# if os.path.isfile(json_file_path): # If file exists in the jsons
# d = _load_json_to_dict(json_file_path)
#
# try:
# sents_dicts = d['cdl'][0]['cdl'][-1]['cdl']
# except Exception as e:
# print(f"In file {json_file_path} failed because of {e}")
# continue
#
# raw_text = " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)])
# cur_json = {
# "id_text": id_text,
# "project_name": project_name,
# "raw_text": raw_text,
# }
# raw_jsons.append(cur_json)
return raw_jsons
def get_raw_akk_text_from_json(sents_dicts):
return " ".join([_get_raw_text(sent_dict['cdl']) for sent_dict in sents_dicts if _is_sent(sent_dict)])
def _get_raw_text(json_dict: dict) -> str:
"""
This function gets the raw text of a given transliterated tablet in ORACC recursively.
It appends each instance in the tablet only once (even if there are multiple possible meanings).
:param json_dict: A given dictionary representing some portion of the words in the tablet.
:return: The aforementioned raw text.
"""
previous_ref: str = ""
raw_texts = list()
for d in json_dict:
if _is_sent(d): # If node represents a sentence -> call recursively to the inner dictionary
raw_texts.extend(_get_raw_text(d['cdl']).split())
elif _is_word(d): # If node represents a word
if previous_ref != d.get('ref'): # If encountered new instance:
cur_text = d['frag'] if d.get('frag') else d['f']['form']
raw_texts.append(cur_text + _get_addition(d))
previous_ref = d.get('ref')
return " ".join(raw_texts)
def _is_sent(d: Dict) -> bool:
return d.get('node') == 'c'
def _is_word(d: Dict) -> bool:
return d.get('node') == 'l'
def _get_addition(d: Dict) -> str:
"""
This function looks for an asterisk or a question mark in a dictionary representing a word in a tablet from ORACC.
:param d: A given dictionary as described above.
:return An asterisk or a question mark if one of the word's signs has one, otherwise an empty string.
"""
has_signs_dicts = 'f' in d and 'gdl' in d.get('f')
if has_signs_dicts:
for sign_dict in d['f']['gdl']:
if 'gdl_collated' in sign_dict: # If cur sign has an asterisk
return "*"
if 'queried' in sign_dict: # If cur sign has a question mark
return "?"
return ""