diff --git a/chains/binaproject_web_client.py b/chains/binaproject_web_client.py index 8f5ecb7..ec23e4e 100644 --- a/chains/binaproject_web_client.py +++ b/chains/binaproject_web_client.py @@ -1,8 +1,12 @@ import json +import re + import requests from supermarket_chain import SupermarketChain +FNAME_KEY = "FileNm" + class BinaProjectWebClient: _date_hour_format = '%Y-%m-%d %H:%M:%S' @@ -16,8 +20,16 @@ class BinaProjectWebClient: url = '/'.join([hostname, self.path_prefix, "MainIO_Hok.aspx"]) req_res: requests.Response = session.get(url) jsons_files = json.loads(req_res.text) - suffix = next(cur_json["FileNm"] for cur_json in jsons_files if f'-{store_id:03d}-20' in cur_json["FileNm"] - and category.name.replace('s', '') in cur_json["FileNm"]) + + if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]: + filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname \ + and not re.search('full', fname, re.IGNORECASE) + if not any(filter_func(cur_json[FNAME_KEY]) for cur_json in jsons_files): + return "" # Could not find non-full Promos/Prices file + else: + filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname + suffix = next( + cur_json[FNAME_KEY] for cur_json in jsons_files if filter_func(cur_json[FNAME_KEY])) down_url: str = '/'.join([hostname, self.path_prefix, "Download", suffix]) return down_url diff --git a/chains/cerberus_web_client.py b/chains/cerberus_web_client.py index ae8ef98..f02f75e 100644 --- a/chains/cerberus_web_client.py +++ b/chains/cerberus_web_client.py @@ -1,4 +1,6 @@ import json +import re + import requests from supermarket_chain import SupermarketChain @@ -17,7 +19,13 @@ class CerberusWebClient: ajax_dir_payload: dict = {'iDisplayLength': 100000, 'sSearch': category.name.replace('s', '')} s: requests.Response = session.post(hostname + "/file/ajax_dir", data=ajax_dir_payload) s_json: dict = json.loads(s.text) - suffix: str = next(d['name'] for d in s_json['aaData'] if f'-{store_id:03d}-20' in d['name']) + if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]: + filter_func = lambda d, id: f'-{id:03d}-20' in d['name'] and not re.search('full', d['name'], re.IGNORECASE) + if not any(filter_func(d, store_id) for d in s_json['aaData']): + return "" # Could not find non-full Prices/Promos file + else: + filter_func = lambda d, id: f'-{id:03d}-20' in d['name'] + suffix: str = next(d['name'] for d in s_json['aaData'] if filter_func(d, store_id)) download_url: str = hostname + "/file/d/" + suffix return download_url diff --git a/chains/mahsaneiHashook.py b/chains/mahsaneiHashook.py index 9cb7c5b..b11e387 100644 --- a/chains/mahsaneiHashook.py +++ b/chains/mahsaneiHashook.py @@ -1,3 +1,4 @@ +import re from typing import Dict, List import requests from bs4 import BeautifulSoup @@ -21,8 +22,16 @@ class MahsaneiHashook(SupermarketChain): url = prefix + "NBCompetitionRegulations.aspx" req_res: requests.Response = requests.get(url) soup = BeautifulSoup(req_res.text, features='lxml') - suffix: str = soup.find('a', href=lambda value: value and category.name.replace('s', '') in value - and f'-{store_id:03d}-20' in value).attrs['href'] + if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]: + fname_filter_func = lambda fname: fname and category.name.replace('s', '') in fname \ + and f'-{store_id:03d}-20' in fname \ + and not re.search('full', fname, re.IGNORECASE) + if soup.find('a', href=fname_filter_func) is None: + return "" # Could not find non-full Promos/Prices file + else: + fname_filter_func = lambda fname: fname and category.name.replace('s', '') in fname \ + and f'-{store_id:03d}-20' in fname + suffix: str = soup.find('a', href=fname_filter_func).attrs['href'] down_url: str = prefix + suffix return down_url diff --git a/main.py b/main.py index 84736ea..9251f15 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,4 @@ import os -import sys -import time from argparse import ArgumentParser from datetime import datetime from pathlib import Path @@ -33,13 +31,12 @@ from chains import ( shefa_birkat_hashem, ) -# TODO: fix problem of left-to-right printing - Path(RESULTS_DIRNAME).mkdir(exist_ok=True) Path(RAW_FILES_DIRNAME).mkdir(exist_ok=True) chain_dict = {repr(chain): chain() if callable(chain) else None for chain in SupermarketChain.__subclasses__()} +# TODO: change functions arguments to include all necessary parameters (e.g. chain) or split arguments if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('--promos', diff --git a/promotion.py b/promotion.py index 68e97ad..4023185 100644 --- a/promotion.py +++ b/promotion.py @@ -1,13 +1,15 @@ import logging import re from datetime import datetime -from enum import Enum from typing import Dict, List, Union from bs4.element import Tag import csv import sys import pandas as pd import xlsxwriter +from tqdm import tqdm +from aenum import Enum + from item import Item from utils import ( create_bs_object, create_items_dict, @@ -19,6 +21,8 @@ from supermarket_chain import SupermarketChain XML_FILES_PROMOTIONS_CATEGORIES = [SupermarketChain.XMLFilesCategory.PromosFull, SupermarketChain.XMLFilesCategory.Promos] +PROMOTION_COLS_NUM = 15 # The length of the list returned by get_promotion_row_for_table function + INVALID_OR_UNKNOWN_PROMOTION_FUNCTION = -1 PROMOTIONS_TABLE_HEADERS = [ @@ -41,10 +45,19 @@ PROMOTIONS_TABLE_HEADERS = [ class ClubID(Enum): - מבצע_רגיל = 0 - מועדון = 1 - כרטיס_אשראי = 2 - אחר = 3 + _init_ = 'value string' + + REGULAR = 0, 'מבצע רגיל' + CLUB = 1, 'מועדון' + CREDIT_CARD = 2, 'כרטיס אשראי' + OTHER = 3, 'אחר' + + @classmethod + def _missing_(cls, value): + return ClubID.OTHER + + def __str__(self): + return self.string class RewardType(Enum): @@ -57,6 +70,7 @@ class RewardType(Enum): SECOND_INSTANCE_SAME_DISCOUNT = 8 SECOND_INSTANCE_DIFFERENT_DISCOUNT = 9 DISCOUNT_IN_MULTIPLE_INSTANCES = 10 + OTHER = 11 class Promotion: @@ -90,15 +104,15 @@ class Promotion: return self.promotion_id == other.promotion_id -def write_promotions_to_csv(promotions: List[Promotion], output_filename: str) -> None: +def write_promotions_to_table(promotions: List[Promotion], output_filename: str) -> None: """ - This function writes a promotions table to a given CSV or XLSX output file. + This function writes a List of promotions to a csv or xlsx output file. :param promotions: A given list of promotions :param output_filename: A given file to write to """ log_message_and_time_if_debug('Writing promotions to output file') - rows = [get_promotion_row_for_csv(promo, item) for promo in promotions for item in promo.items] + rows = [get_promotion_row_for_table(promo, item) for promo in promotions for item in promo.items] if output_filename.endswith('.csv'): encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8" with open(output_filename, mode='w', newline='', encoding=encoding_file) as f_out: @@ -135,28 +149,30 @@ def write_promotions_to_csv(promotions: List[Promotion], output_filename: str) - raise ValueError(f"The given output file has an invalid extension:\n{output_filename}") -def get_promotion_row_for_csv(promo: Promotion, item: Item): +def get_promotion_row_for_table(promo: Promotion, item: Item) -> List: """ This function returns a row in the promotions XLSX table. :param promo: A given Promotion object :param item: A given item object participating in the promotion """ - return [promo.content, - item.name, - item.price, - promo.promo_func(item), - (item.price - promo.promo_func(item)) / item.price, - promo.club_id.name.replace('_', ' '), - promo.max_qty, - promo.allow_multiple_discounts, - promo.start_date <= datetime.now(), - promo.start_date, - promo.end_date, - promo.update_date, - item.manufacturer, - item.code, - promo.reward_type.value] + return [ + promo.content, + item.name, + item.price, + promo.promo_func(item), + (item.price - promo.promo_func(item)) / max(item.price, 1), + promo.club_id.string, + promo.max_qty, + promo.allow_multiple_discounts, + promo.start_date <= datetime.now(), + promo.start_date, + promo.end_date, + promo.update_date, + item.manufacturer, + item.code, + promo.reward_type.value, + ] def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool) \ @@ -177,7 +193,7 @@ def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bo log_message_and_time_if_debug('Creating promotions objects') promo_objs = list() - for promo in promo_tags: + for promo in tqdm(promo_tags, desc='creating_promotions'): promotion_id = int(promo.find(re.compile('PromotionId', re.IGNORECASE)).text) if promo_objs and promo_objs[-1].promotion_id == promotion_id: promo_objs[-1].items.extend(chain.get_items(promo, items_dict)) @@ -243,7 +259,7 @@ def get_discounted_price(promo): def get_discount_rate(discount_rate: Union[float, None], discount_in_percentage: bool): if discount_rate: if discount_in_percentage: - return int(discount_rate) * (10 ** -(len(str(discount_rate)))) + return float(discount_rate) * (10 ** -(len(str(discount_rate)))) return float(discount_rate) @@ -271,6 +287,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description: if reward_type == RewardType.DISCOUNT_BY_THRESHOLD: return lambda item: item.price - discount_rate + if reward_type == RewardType.OTHER: + return lambda item: item.price + if 'מחיר המבצע הינו המחיר לק"ג' in remark: return lambda item: discounted_price @@ -294,7 +313,7 @@ def main_latest_promos(store_id: int, output_filename, chain: SupermarketChain, promotions: List[Promotion] = get_available_promos(chain, store_id, load_xml, load_promos) promotions.sort(key=lambda promo: (max(promo.update_date.date(), promo.start_date.date()), promo.start_date - promo.end_date), reverse=True) - write_promotions_to_csv(promotions, output_filename) + write_promotions_to_table(promotions, output_filename) def log_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, load_prices: bool, load_promos: bool): @@ -313,7 +332,6 @@ def log_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, logging.info(promo.repr_ltr()) -# TODO: change to returning list of Items def get_all_null_items_in_promos(chain, store_id) -> List[str]: """ This function finds all items appearing in the chain's promotions file but not in the chain's prices file. @@ -335,7 +353,7 @@ def get_all_promos_tags(chain: SupermarketChain, store_id: int, load_xml: bool) :return: A list of promotions tags """ bs_objects = list() - for category in XML_FILES_PROMOTIONS_CATEGORIES: + for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc='promotions_files'): xml_path = xml_file_gen(chain, store_id, category.name) bs_objects.append(create_bs_object(chain, store_id, category, load_xml, xml_path)) diff --git a/requirements.txt b/requirements.txt index 98bb28c..35740e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,10 @@ lxml==4.6.1 requests==2.25.0 soupsieve==2.0.1 urllib3==1.26.2 +openpyxl +tqdm~=4.62.1 +pytest~=6.2.2 +pandas~=1.2.0 +argparse~=1.4.0 +XlsxWriter~=1.4.3 +aenum \ No newline at end of file diff --git a/tests/test_scraping.py b/tests/test_scraping.py index e69de29..f2e3d9d 100644 --- a/tests/test_scraping.py +++ b/tests/test_scraping.py @@ -0,0 +1,121 @@ +import logging +import os +import pytest +import requests +from tqdm import tqdm +import pandas as pd +import re + +from chains.bareket import Bareket +from chains.co_op import CoOp +from chains.dor_alon import DorAlon +from chains.keshet import Keshet +from chains.shuk_hayir import ShukHayir +from chains.stop_market import StopMarket +from chains.tiv_taam import TivTaam +from chains.zol_vebegadol import ZolVebegadol +from promotion import PROMOTION_COLS_NUM, main_latest_promos +from supermarket_chain import SupermarketChain +from chains import ( + bareket, + mahsaneiHashook, + dor_alon, + freshmarket, + hazi_hinam, + keshet, + stop_market, + tiv_taam, + shufersal, + co_op, + victory, + yohananof, + zol_vebegadol, + rami_levi, + osher_ad, + maayan2000, + shuk_hayir, + king_store, + shefa_birkat_hashem, +) + +pytest.main(args=['-s', os.path.abspath(__file__)]) + +chain_dict = {repr(chain): chain() if callable(chain) else None for chain in SupermarketChain.__subclasses__()} + +MIN_NUM_OF_PROMOS = 3 + + +def test_searching_for_download_urls(): + """ + Test that get_download_url of each chain returns the correct download url for each category in every chain. + """ + session = requests.Session() + for chain_name, chain in tqdm(chain_dict.items(), desc='chains'): + + logging.info(f'Checking download urls in chain {chain_name}') + store_id: int = valid_store_id_by_chain(chain_name) + + _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.PromosFull, r'promo[s]?full', session) + _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.Promos, r'promo[s]?', session) + _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.PricesFull, r'price[s]?full', session) + _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.Prices, r'price[s]?', session) + + +def _test_download_url_helper(chain: SupermarketChain, store_id: int, category: SupermarketChain.XMLFilesCategory, + regex_pat: str, session: requests.session): + download_url: str = chain.get_download_url(store_id, category, session) + if not download_url: # Not found non-full Promos/Prices file + return + logging.debug(download_url) + assert re.search(regex_pat, download_url, re.IGNORECASE), f'Invalid {category.name} url in {repr(type(chain))}' + if category in [chain.XMLFilesCategory.Prices, chain.XMLFilesCategory.Promos]: + assert not re.search('full', download_url, re.IGNORECASE), \ + f'Downloaded the full {category.name} file mistakenly in {repr(type(chain))}' + + +def test_promotions_scraping(): + """ + Test scraping of promotions is completed successfully and a valid xlsx file is generated as an output. + """ + filename = 'temp.xlsx' + for chain_name, chain in tqdm(chain_dict.items(), desc='chains'): + logging.info(f'Test scraping promotions from {chain_name}') + + store_id: int = valid_store_id_by_chain(chain_name) + try: + main_latest_promos( + store_id=store_id, + output_filename=filename, + chain=chain, + load_promos=False, + load_xml=False, + ) + df = pd.read_excel(filename) + except Exception as e: + logging.error(e) + logging.error(f"Failed loading excel of {chain_name}") + raise + + assert df.shape[0] > MIN_NUM_OF_PROMOS and df.shape[1] == PROMOTION_COLS_NUM, f"Failed scraping {chain_name}" + + +def valid_store_id_by_chain(chain_name) -> int: + """ + This function returns a valid store ID for a given chain. + + :param chain_name: The name of a chain as returned by repr(ChainClassName). + :return: An integer representing a valid store ID in the given chain + """ + if chain_name == repr(DorAlon): + store_id = 501 + elif chain_name in [repr(TivTaam), repr(Bareket), repr(ZolVebegadol)]: + store_id = 2 + elif chain_name == repr(CoOp): + store_id = 202 + elif chain_name == repr(ShukHayir): + store_id = 4 + elif chain_name in [repr(StopMarket), repr(Keshet)]: + store_id = 5 + else: + store_id = 1 + return store_id diff --git a/utils.py b/utils.py index f809ba3..4d3e3b2 100644 --- a/utils.py +++ b/utils.py @@ -9,6 +9,8 @@ import requests from bs4 import BeautifulSoup from os import path +from tqdm import tqdm + from item import Item from supermarket_chain import SupermarketChain @@ -64,6 +66,8 @@ def get_bs_object_from_link(chain: SupermarketChain, store_id: int, category: Su """ session = requests.Session() download_url: str = chain.get_download_url(store_id, category, session) + if not download_url: + return BeautifulSoup() response_content = session.get(download_url).content try: xml_content: AnyStr = gzip.decompress(response_content) @@ -98,7 +102,7 @@ def create_items_dict(chain: SupermarketChain, store_id: int, load_xml) -> Dict[ :param store_id: A given store id """ items_dict = dict() - for category in [chain.XMLFilesCategory.PricesFull, chain.XMLFilesCategory.Prices]: + for category in tqdm([chain.XMLFilesCategory.PricesFull, chain.XMLFilesCategory.Prices], desc='prices_files'): xml_path: str = xml_file_gen(chain, store_id, category.name) bs_prices: BeautifulSoup = create_bs_object(chain, store_id, category, load_xml, xml_path) items_tags = bs_prices.find_all(chain.item_tag_name)