diff --git a/chains/bareket.py b/chains/bareket.py index c20590d..3d8d066 100644 --- a/chains/bareket.py +++ b/chains/bareket.py @@ -1,6 +1,5 @@ from chains.mahsaneiHashook import MahsaneiHashook -from supermarket_chain import SupermarketChain -class Bareket(MahsaneiHashook, SupermarketChain): +class Bareket(MahsaneiHashook): pass diff --git a/chains/binaproject_web_client.py b/chains/binaproject_web_client.py index ec23e4e..971fe4e 100644 --- a/chains/binaproject_web_client.py +++ b/chains/binaproject_web_client.py @@ -8,14 +8,16 @@ from supermarket_chain import SupermarketChain FNAME_KEY = "FileNm" -class BinaProjectWebClient: +class BinaProjectWebClient(SupermarketChain): _date_hour_format = '%Y-%m-%d %H:%M:%S' _update_date_format = '%Y-%m-%d %H:%M:%S' _path_prefix = "" _hostname_suffix = ".binaprojects.com" - def get_download_url(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \ + def get_download_url_or_path(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \ -> str: + if not SupermarketChain.is_valid_store_id(store_id): + raise ValueError(f"Invalid {store_id=} (store id must be a natural number)") hostname = f"http://{self.hostname_prefix}{self.hostname_suffix}" url = '/'.join([hostname, self.path_prefix, "MainIO_Hok.aspx"]) req_res: requests.Response = session.get(url) @@ -27,7 +29,7 @@ class BinaProjectWebClient: if not any(filter_func(cur_json[FNAME_KEY]) for cur_json in jsons_files): return "" # Could not find non-full Promos/Prices file else: - filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname + filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname and 'null' not in fname suffix = next( cur_json[FNAME_KEY] for cur_json in jsons_files if filter_func(cur_json[FNAME_KEY])) down_url: str = '/'.join([hostname, self.path_prefix, "Download", suffix]) diff --git a/chains/cerberus_web_client.py b/chains/cerberus_web_client.py index f02f75e..c5e3499 100644 --- a/chains/cerberus_web_client.py +++ b/chains/cerberus_web_client.py @@ -1,35 +1,99 @@ -import json -import re +import logging +import os +import shutil +import time +from abc import abstractmethod import requests +from selenium import webdriver +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from webdriver_manager.chrome import ChromeDriverManager from supermarket_chain import SupermarketChain -class CerberusWebClient: - - def get_download_url(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \ - -> str: - hostname: str = "https://publishedprices.co.il" - - # Post the payload to the site to log in - session.post(hostname + "/login/user", data={'username': self.username}) - - # Scrape the data - ajax_dir_payload: dict = {'iDisplayLength': 100000, 'sSearch': category.name.replace('s', '')} - s: requests.Response = session.post(hostname + "/file/ajax_dir", data=ajax_dir_payload) - s_json: dict = json.loads(s.text) - if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]: - filter_func = lambda d, id: f'-{id:03d}-20' in d['name'] and not re.search('full', d['name'], re.IGNORECASE) - if not any(filter_func(d, store_id) for d in s_json['aaData']): - return "" # Could not find non-full Prices/Promos file - else: - filter_func = lambda d, id: f'-{id:03d}-20' in d['name'] - suffix: str = next(d['name'] for d in s_json['aaData'] if filter_func(d, store_id)) - - download_url: str = hostname + "/file/d/" + suffix - return download_url - +class CerberusWebClient(SupermarketChain): @property + @abstractmethod def username(self): - return repr(type(self)) + pass + + def get_download_url_or_path( + self, + store_id: int, + category: SupermarketChain.XMLFilesCategory, + session: requests.Session, + ) -> str: + options = webdriver.ChromeOptions() + options.add_argument("ignore-certificate-errors") + options.add_argument("--ignore-ssl-errors=yes") + + driver = webdriver.Chrome( + service=Service(ChromeDriverManager().install()), options=options + ) + + driver.get("https://url.retail.publishedprices.co.il/login#") + time.sleep(2) + userElem = driver.find_element(By.NAME, "username") + userElem.send_keys(self.username) + driver.find_element(By.NAME, "Submit").click() + time.sleep(2) + + searchElem = driver.find_element(By.CLASS_NAME, "form-control") + searchElem.send_keys(category.value) + time.sleep(5) + + conns = driver.find_elements(By.CLASS_NAME, "f") + best_link = "" + for conn in conns: + link = conn.get_attribute("href").lower() + if category == SupermarketChain.XMLFilesCategory.Promos: + filter_func = ( + lambda l: "promo" in l + and "full" not in l + and f"-{store_id:03d}-20" in l + ) + elif category == SupermarketChain.XMLFilesCategory.PromosFull: + filter_func = ( + lambda l: "promo" in l + and "full" in l + and f"-{store_id:03d}-20" in l + ) + elif category == SupermarketChain.XMLFilesCategory.Prices: + filter_func = ( + lambda l: "price" in l + and "full" not in l + and f"-{store_id:03d}-20" in l + ) + elif category == SupermarketChain.XMLFilesCategory.PricesFull: + filter_func = ( + lambda l: "price" in l + and "full" in l + and f"-{store_id:03d}-20" in l + ) + elif category == SupermarketChain.XMLFilesCategory.Stores: + filter_func = lambda l: "store" in l and "full" in l and f"-000-20" in l + else: + raise ValueError(f"Unknown category type: {category=}") + + if filter_func(link): + if not best_link or int(link[-7:-3]) > int(best_link[-7:-3]): + best_link = link + + if not best_link: + return "" + driver.get(best_link) + time.sleep(3) + download_dir = "/Users/korenlazar/Downloads" + filename = best_link[48:] + path_download = os.path.join(download_dir, filename) + logging.info(f"{path_download=}") + path_to_save = f"raw_files/{self.username}-{filename}" + try: + shutil.move(path_download, path_to_save) + print(f"Downloaded {filename} and moved file to {path_to_save}") + except: + print(f"{filename} already exists in {path_to_save}") + + return path_to_save diff --git a/chains/co_op.py b/chains/co_op.py index 429fc6e..d03d972 100644 --- a/chains/co_op.py +++ b/chains/co_op.py @@ -1,6 +1,5 @@ from chains.mahsaneiHashook import MahsaneiHashook -from supermarket_chain import SupermarketChain -class CoOp(MahsaneiHashook, SupermarketChain): +class CoOp(MahsaneiHashook): pass diff --git a/chains/dor_alon.py b/chains/dor_alon.py index 171fa31..e9d2a8d 100644 --- a/chains/dor_alon.py +++ b/chains/dor_alon.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class DorAlon(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class DorAlon(CerberusWebClient): + @property + def username(self): + return "doralon" + + _date_hour_format = "%Y-%m-%d %H:%M:%S" diff --git a/chains/freshmarket.py b/chains/freshmarket.py index cab96c3..127f0b2 100644 --- a/chains/freshmarket.py +++ b/chains/freshmarket.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class Freshmarket(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class Freshmarket(CerberusWebClient): + _date_hour_format = "%Y-%m-%d %H:%M:%S" + + @property + def username(self): + return "freshmarket" diff --git a/chains/hazi_hinam.py b/chains/hazi_hinam.py index e4a8731..7d88064 100644 --- a/chains/hazi_hinam.py +++ b/chains/hazi_hinam.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class HaziHinam(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class HaziHinam(CerberusWebClient): + @property + def username(self): + return "HaziHinam" + + _date_hour_format = "%Y-%m-%d %H:%M:%S" diff --git a/chains/keshet.py b/chains/keshet.py index bebfd86..57f4f45 100644 --- a/chains/keshet.py +++ b/chains/keshet.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class Keshet(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class Keshet(CerberusWebClient): + @property + def username(self): + return "Keshet" + + _date_hour_format = "%Y-%m-%d %H:%M:%S" diff --git a/chains/king_store.py b/chains/king_store.py index 882a7ca..9fa9f66 100644 --- a/chains/king_store.py +++ b/chains/king_store.py @@ -1,7 +1,6 @@ from chains.binaproject_web_client import BinaProjectWebClient -from supermarket_chain import SupermarketChain -class KingStore(BinaProjectWebClient, SupermarketChain): +class KingStore(BinaProjectWebClient): _path_prefix = "Food_Law" _hostname_suffix = ".co.il" diff --git a/chains/maayan2000.py b/chains/maayan2000.py index 6d47ae3..13c1c50 100644 --- a/chains/maayan2000.py +++ b/chains/maayan2000.py @@ -1,6 +1,5 @@ from chains.binaproject_web_client import BinaProjectWebClient -from supermarket_chain import SupermarketChain -class Maayan2000(BinaProjectWebClient, SupermarketChain): - pass \ No newline at end of file +class Maayan2000(BinaProjectWebClient): + pass diff --git a/chains/mahsaneiHashook.py b/chains/mahsaneiHashook.py index b11e387..e9019ce 100644 --- a/chains/mahsaneiHashook.py +++ b/chains/mahsaneiHashook.py @@ -1,5 +1,6 @@ import re from typing import Dict, List + import requests from bs4 import BeautifulSoup from bs4.element import Tag @@ -9,33 +10,46 @@ from supermarket_chain import SupermarketChain class MahsaneiHashook(SupermarketChain): - _promotion_tag_name = 'Sale' - _promotion_update_tag_name = 'PriceUpdateDate' - _date_format = '%Y/%m/%d' - _date_hour_format = '%Y/%m/%d %H:%M:%S' - _update_date_format = '%Y/%m/%d %H:%M:%S' - _item_tag_name = 'Product' + _promotion_tag_name = "Sale" + _promotion_update_tag_name = "PriceUpdateDate" + _date_format = "%Y/%m/%d" + _date_hour_format = "%Y/%m/%d %H:%M:%S" + _update_date_format = "%Y/%m/%d %H:%M:%S" + _item_tag_name = "Product" @staticmethod - def get_download_url(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str: + def get_download_url_or_path( + store_id: int, + category: SupermarketChain.XMLFilesCategory, + session: requests.Session, + ) -> str: prefix = "http://matrixcatalog.co.il/" url = prefix + "NBCompetitionRegulations.aspx" req_res: requests.Response = requests.get(url) - soup = BeautifulSoup(req_res.text, features='lxml') - if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]: - fname_filter_func = lambda fname: fname and category.name.replace('s', '') in fname \ - and f'-{store_id:03d}-20' in fname \ - and not re.search('full', fname, re.IGNORECASE) - if soup.find('a', href=fname_filter_func) is None: + soup = BeautifulSoup(req_res.text, features="lxml") + if category in [ + SupermarketChain.XMLFilesCategory.Promos, + SupermarketChain.XMLFilesCategory.Prices, + ]: + fname_filter_func = ( + lambda fname: fname + and category.name.replace("s", "") in fname + and f"-{store_id:03d}-20" in fname + and not re.search("full", fname, re.IGNORECASE) + ) + if soup.find("a", href=fname_filter_func) is None: return "" # Could not find non-full Promos/Prices file else: - fname_filter_func = lambda fname: fname and category.name.replace('s', '') in fname \ - and f'-{store_id:03d}-20' in fname - suffix: str = soup.find('a', href=fname_filter_func).attrs['href'] + fname_filter_func = ( + lambda fname: fname + and category.name.replace("s", "") in fname + and f"-{store_id:03d}-20" in fname + ) + suffix: str = soup.find("a", href=fname_filter_func).attrs["href"] down_url: str = prefix + suffix return down_url @staticmethod def get_items(promo: Tag, items_dict: Dict[str, Item]) -> List[Item]: - promo_item = items_dict.get(promo.find('ItemCode').text) + promo_item = items_dict.get(promo.find("ItemCode").text) return [promo_item] if promo_item else [] diff --git a/chains/osher_ad.py b/chains/osher_ad.py index b367920..8054f55 100644 --- a/chains/osher_ad.py +++ b/chains/osher_ad.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class OsherAd(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class OsherAd(CerberusWebClient): + @property + def username(self): + return "osherad" + + _date_hour_format = "%Y-%m-%d %H:%M:%S" diff --git a/chains/rami_levi.py b/chains/rami_levi.py index 94224b5..1ac4b9a 100644 --- a/chains/rami_levi.py +++ b/chains/rami_levi.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class RamiLevi(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class RamiLevi(CerberusWebClient): + @property + def username(self): + return "RamiLevi" + + _date_hour_format = "%Y-%m-%d %H:%M:%S" diff --git a/chains/shefa_birkat_hashem.py b/chains/shefa_birkat_hashem.py index dc39a9f..60633f7 100644 --- a/chains/shefa_birkat_hashem.py +++ b/chains/shefa_birkat_hashem.py @@ -1,6 +1,5 @@ from chains.binaproject_web_client import BinaProjectWebClient -from supermarket_chain import SupermarketChain -class ShefaBirkatHashem(BinaProjectWebClient, SupermarketChain): - pass \ No newline at end of file +class ShefaBirkatHashem(BinaProjectWebClient): + pass diff --git a/chains/shuk_hayir.py b/chains/shuk_hayir.py index 8a81511..16fdbd6 100644 --- a/chains/shuk_hayir.py +++ b/chains/shuk_hayir.py @@ -1,7 +1,7 @@ from chains.binaproject_web_client import BinaProjectWebClient -from supermarket_chain import SupermarketChain -class ShukHayir(BinaProjectWebClient, SupermarketChain): +class ShukHayir(BinaProjectWebClient): @property - def hostname_prefix(self): return "shuk-hayir" + def hostname_prefix(self): + return "shuk-hayir" diff --git a/chains/stop_market.py b/chains/stop_market.py index baf85dc..3c8071c 100644 --- a/chains/stop_market.py +++ b/chains/stop_market.py @@ -1,9 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class StopMarket(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class StopMarket(CerberusWebClient): + _date_hour_format = "%Y-%m-%d %H:%M:%S" + @property def username(self): - return 'Stop_Market' + return "Stop_Market" diff --git a/chains/tiv_taam.py b/chains/tiv_taam.py index 9d29d0a..8045ec6 100644 --- a/chains/tiv_taam.py +++ b/chains/tiv_taam.py @@ -1,6 +1,7 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class TivTaam(CerberusWebClient, SupermarketChain): - pass +class TivTaam(CerberusWebClient): + @property + def username(self): + return "TivTaam" diff --git a/chains/victory.py b/chains/victory.py index 77de5a7..7181876 100644 --- a/chains/victory.py +++ b/chains/victory.py @@ -1,6 +1,5 @@ from chains.mahsaneiHashook import MahsaneiHashook -from supermarket_chain import SupermarketChain -class Victory(MahsaneiHashook, SupermarketChain): +class Victory(MahsaneiHashook): pass diff --git a/chains/yeinot_bitan.py b/chains/yeinot_bitan.py new file mode 100644 index 0000000..e69de29 diff --git a/chains/yohananof.py b/chains/yohananof.py index 6810c52..84e3151 100644 --- a/chains/yohananof.py +++ b/chains/yohananof.py @@ -1,6 +1,9 @@ from chains.cerberus_web_client import CerberusWebClient -from supermarket_chain import SupermarketChain -class Yohananof(CerberusWebClient, SupermarketChain): - _date_hour_format = '%Y-%m-%d %H:%M:%S' +class Yohananof(CerberusWebClient): + @property + def username(self): + return "yohananof" + + _date_hour_format = "%Y-%m-%d %H:%M:%S" diff --git a/chains/zol_vebegadol.py b/chains/zol_vebegadol.py index baa1f15..2d6e0f5 100644 --- a/chains/zol_vebegadol.py +++ b/chains/zol_vebegadol.py @@ -1,6 +1,5 @@ from chains.binaproject_web_client import BinaProjectWebClient -from supermarket_chain import SupermarketChain -class ZolVebegadol(BinaProjectWebClient, SupermarketChain): +class ZolVebegadol(BinaProjectWebClient): pass diff --git a/item.py b/item.py index d0c9251..ece9fc5 100644 --- a/item.py +++ b/item.py @@ -1,14 +1,44 @@ +import json +import re + +from bs4.element import Tag + + class Item: """ A class representing a product in some supermarket. """ - def __init__(self, name: str, price: float, price_by_measure: float, code: str, manufacturer: str): + def __init__( + self, + name: str, + price: float, + price_by_measure: float, + code: str, + manufacturer: str, + ): self.name: str = name self.price: float = price + self.final_price: float = price self.price_by_measure = price_by_measure self.manufacturer: str = manufacturer self.code: str = code + @classmethod + def from_tag(cls, item: Tag): + """ + This method creates an Item instance from an xml tag. + """ + return cls( + name=item.find(re.compile(r"ItemN[a]?m[e]?")).text, + price=float(item.find("ItemPrice").text), + price_by_measure=float(item.find("UnitOfMeasurePrice").text), + code=item.find("ItemCode").text, + manufacturer=item.find(re.compile(r"Manufacture[r]?Name")).text, + ) + + def to_json(self): + return json.dumps(self, default=lambda o: o.__dict__) + def __repr__(self): return f"\nשם: {self.name}\nמחיר: {self.price}\nיצרן: {self.manufacturer}\nקוד: {self.code}\n" diff --git a/main.py b/main.py index 9251f15..af5c803 100644 --- a/main.py +++ b/main.py @@ -1,127 +1,212 @@ -import os -from argparse import ArgumentParser -from datetime import datetime -from pathlib import Path +import json import logging +import os +import subprocess +import sys +from argparse import ArgumentParser +from datetime import datetime, date +from pathlib import Path -from promotion import main_latest_promos, log_promos_by_name +from chains.bareket import Bareket +from chains.co_op import CoOp +from chains.dor_alon import DorAlon +from chains.freshmarket import Freshmarket +from chains.hazi_hinam import HaziHinam +from chains.keshet import Keshet +from chains.king_store import KingStore +from chains.maayan2000 import Maayan2000 +from chains.mahsaneiHashook import MahsaneiHashook +from chains.osher_ad import OsherAd +from chains.rami_levi import RamiLevi +from chains.shefa_birkat_hashem import ShefaBirkatHashem +from chains.shufersal import Shufersal +from chains.shuk_hayir import ShukHayir +from chains.stop_market import StopMarket +from chains.tiv_taam import TivTaam +from chains.victory import Victory +from chains.yohananof import Yohananof +from chains.zol_vebegadol import ZolVebegadol +from promotion import main_latest_promos, log_promos_by_name, get_all_prices from store_utils import log_stores_ids -from utils import RESULTS_DIRNAME, RAW_FILES_DIRNAME, VALID_PROMOTION_FILE_EXTENSIONS, log_products_prices, \ - valid_promotion_output_file from supermarket_chain import SupermarketChain -from chains import ( - bareket, - mahsaneiHashook, - dor_alon, - freshmarket, - hazi_hinam, - keshet, - stop_market, - tiv_taam, - shufersal, - co_op, - victory, - yohananof, - zol_vebegadol, - rami_levi, - osher_ad, - maayan2000, - shuk_hayir, - king_store, - shefa_birkat_hashem, +from utils import ( + RESULTS_DIRNAME, + RAW_FILES_DIRNAME, + VALID_PROMOTION_FILE_EXTENSIONS, + log_products_prices, + valid_promotion_output_file, + is_valid_promotion_output_file, ) +CHAINS_LIST = [ + Bareket, + MahsaneiHashook, + DorAlon, + Freshmarket, + HaziHinam, + Keshet, + StopMarket, + TivTaam, + Shufersal, + CoOp, + Victory, + Yohananof, + ZolVebegadol, + RamiLevi, + OsherAd, + Maayan2000, + ShukHayir, + KingStore, + ShefaBirkatHashem, +] Path(RESULTS_DIRNAME).mkdir(exist_ok=True) Path(RAW_FILES_DIRNAME).mkdir(exist_ok=True) -chain_dict = {repr(chain): chain() if callable(chain) else None for chain in SupermarketChain.__subclasses__()} +CHAINS_DICT = { + repr(chain): chain() if callable(chain) else None for chain in CHAINS_LIST +} # TODO: change functions arguments to include all necessary parameters (e.g. chain) or split arguments -if __name__ == '__main__': +if __name__ == "__main__": parser = ArgumentParser() - parser.add_argument('--promos', - help="generates a CSV file with all the promotions in the requested store", - metavar='store_id', - nargs=1, - type=SupermarketChain.store_id_type, - ) - parser.add_argument('--find_promos_by_name', - help="prints all promos containing the given promo_name in the given store", - metavar=('store_id', 'promo_name'), - nargs=2, - # type=store_id_type, # TODO: add type-checking of first parameter - ) - parser.add_argument('--price', - help='prints all products that contain the given name in the requested store', - metavar=('store_id', 'product_name'), - nargs=2, - ) - parser.add_argument('--find_store_id', - help='prints all Shufersal stores in a given city. Input should be a city name in Hebrew', - metavar='city', - nargs=1, - ) - parser.add_argument('--load_prices', - help='boolean flag representing whether to load an existing price XML file', - action='store_true', - ) - parser.add_argument('--load_promos', - help='boolean flag representing whether to load an existing promo XML file', - action='store_true', - ) - parser.add_argument('--load_stores', - help='boolean flag representing whether to load an existing stores XML file', - action='store_true', - ) - parser.add_argument('--chain', - required=True, - help='The name of the requested chain', - choices=chain_dict.keys(), - ) - parser.add_argument('--file_extension', - help='The extension of the promotions output file', - choices=VALID_PROMOTION_FILE_EXTENSIONS, - default='.xlsx', - ) - parser.add_argument('--output_filename', - help='The path to write the promotions table to', - type=valid_promotion_output_file, - ) - parser.add_argument('--only_export_to_file', - help='Boolean flag representing whether only export or also open the promotion output file', - action='store_true', - ) - parser.add_argument('--debug', - help='Boolean flag representing whether to run in debug mode', - action='store_true', - ) + parser.add_argument( + "--promos", + help="generates a CSV file with all the promotions in the requested store", + metavar="store_id", + nargs=1, + type=SupermarketChain.store_id_type, + ) + parser.add_argument( + "--find_promos_by_name", + help="prints all promos containing the given promo_name in the given store", + metavar=("store_id", "promo_name"), + nargs=2, + ) + parser.add_argument( + "--price", + help="prints all products that contain the given name in the requested store", + metavar=("store_id", "product_name"), + nargs=2, + ) + parser.add_argument( + "--prices-with-promos", + help="logs all products with prices updated by promos", + metavar="store_id", + nargs=1, + type=SupermarketChain.store_id_type, + ) + parser.add_argument( + "--find_store_id", + help="prints all Shufersal stores in a given city. Input should be a city name in Hebrew", + metavar="city", + nargs=1, + ) + parser.add_argument( + "--load_prices", + help="boolean flag representing whether to load an existing price XML file", + action="store_true", + ) + parser.add_argument( + "--load_promos", + help="boolean flag representing whether to load an existing promo XML file", + action="store_true", + ) + parser.add_argument( + "--load_stores", + help="boolean flag representing whether to load an existing stores XML file", + action="store_true", + ) + parser.add_argument( + "--chain", + required=True, + help="The name of the requested chain", + choices=CHAINS_DICT.keys(), + ) + parser.add_argument( + "--output_filename", + help="The path to write the promotions/prices to", + type=valid_promotion_output_file, + ) + parser.add_argument( + "--only_export_to_file", + help="Boolean flag representing whether only export or also open the promotion output file", + action="store_true", + ) + parser.add_argument( + "--debug", + help="Boolean flag representing whether to run in debug mode", + action="store_true", + ) args = parser.parse_args() if args.debug: logging.basicConfig(level=logging.DEBUG) else: - logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(message)s') + logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s") - chain: SupermarketChain = chain_dict[args.chain] + chain: SupermarketChain = CHAINS_DICT[args.chain] - if args.promos: - arg_store_id = int(args.promos[0]) + if args.promos or args.prices_with_promos: + arg_store_id = ( + int(args.promos[0]) if args.promos else int(args.prices_with_promos[0]) + ) if args.output_filename: output_filename = args.output_filename + if args.promos and not is_valid_promotion_output_file(output_filename): + raise ValueError( + f"Output filename for promos must end with: {VALID_PROMOTION_FILE_EXTENSIONS}" + ) + if args.prices_with_promos and not output_filename.endswith(".json"): + raise ValueError(f"Output filename for promos must be a json file") directory = os.path.dirname(output_filename) Path(directory).mkdir(parents=True, exist_ok=True) else: Path(RESULTS_DIRNAME).mkdir(exist_ok=True) - output_filename = f'{RESULTS_DIRNAME}/{repr(type(chain))}_promos_{arg_store_id}{args.file_extension}' + file_extension = ".xlsx" if args.promos else ".json" + file_type = "promos" if args.promos else "prices" + output_filename = f"{RESULTS_DIRNAME}/{repr(type(chain))}-{file_type}-{arg_store_id}-{date.today()}{file_extension}" + + if args.promos: + main_latest_promos( + store_id=arg_store_id, + output_filename=output_filename, + chain=chain, + load_promos=args.load_promos, + load_prices=args.load_prices, + ) + else: + items_dict = get_all_prices( + store_id=arg_store_id, + output_filename=output_filename, + chain=chain, + load_promos=args.load_promos, + load_prices=args.load_prices, + ) + items_dict_to_json = { + item_code: { + k: v + for k, v in item.__dict__.items() + if not k.startswith("__") and not callable(k) + } + for item_code, item in items_dict.items() + } + + with open(output_filename, "w") as fOut: + json.dump(items_dict_to_json, fOut) - main_latest_promos(store_id=arg_store_id, output_filename=output_filename, chain=chain, - load_promos=args.load_promos, load_xml=args.load_prices) if not args.only_export_to_file: - os.startfile(Path(output_filename)) - logging.debug(f'Process finished at: {datetime.now()}') + opener = "open" if sys.platform == "darwin" else "xdg-open" + subprocess.call([opener, Path(output_filename)]) + # os.startfile(Path(output_filename)) + logging.debug(f"Process finished at: {datetime.now()}") elif args.price: - log_products_prices(chain, store_id=args.price[0], load_xml=args.load_prices, product_name=args.price[1]) + log_products_prices( + chain, + store_id=args.price[0], + load_xml=args.load_prices, + product_name=args.price[1], + ) elif args.find_store_id: arg_city = args.find_store_id[0] @@ -129,5 +214,10 @@ if __name__ == '__main__': elif args.find_promos_by_name: arg_store_id = int(args.find_promos_by_name[0]) - log_promos_by_name(store_id=arg_store_id, chain=chain, promo_name=args.find_promos_by_name[1], - load_prices=args.load_prices, load_promos=args.load_promos) + log_promos_by_name( + store_id=arg_store_id, + chain=chain, + promo_name=args.find_promos_by_name[1], + load_prices=args.load_prices, + load_promos=args.load_promos, + ) diff --git a/promotion.py b/promotion.py index 4023185..947524e 100644 --- a/promotion.py +++ b/promotion.py @@ -12,45 +12,51 @@ from aenum import Enum from item import Item from utils import ( - create_bs_object, create_items_dict, + create_bs_object, + create_items_dict, get_float_from_tag, - log_message_and_time_if_debug, xml_file_gen, + log_message_and_time_if_debug, + xml_file_gen, ) from supermarket_chain import SupermarketChain -XML_FILES_PROMOTIONS_CATEGORIES = [SupermarketChain.XMLFilesCategory.PromosFull, - SupermarketChain.XMLFilesCategory.Promos] +XML_FILES_PROMOTIONS_CATEGORIES = [ + SupermarketChain.XMLFilesCategory.PromosFull, + SupermarketChain.XMLFilesCategory.Promos, +] -PROMOTION_COLS_NUM = 15 # The length of the list returned by get_promotion_row_for_table function +PROMOTION_COLS_NUM = ( + 15 # The length of the list returned by get_promotion_row_for_table function +) INVALID_OR_UNKNOWN_PROMOTION_FUNCTION = -1 PROMOTIONS_TABLE_HEADERS = [ - 'תיאור מבצע', - 'הפריט המשתתף במבצע', - 'מחיר לפני מבצע', - 'מחיר אחרי מבצע', - 'אחוז הנחה', - 'סוג מבצע', - 'כמות מקס', - 'כפל הנחות', - 'המבצע החל', - 'זמן תחילת מבצע', - 'זמן סיום מבצע', - 'זמן עדכון אחרון', - 'יצרן', - 'ברקוד פריט', - 'סוג מבצע לפי תקנות שקיפות מחירים', + "תיאור מבצע", + "הפריט המשתתף במבצע", + "מחיר לפני מבצע", + "מחיר אחרי מבצע", + "אחוז הנחה", + "סוג מבצע", + "כמות מקס", + "כפל הנחות", + "המבצע החל", + "זמן תחילת מבצע", + "זמן סיום מבצע", + "זמן עדכון אחרון", + "יצרן", + "ברקוד פריט", + "סוג מבצע לפי תקנות שקיפות מחירים", ] class ClubID(Enum): - _init_ = 'value string' + _init_ = "value string" - REGULAR = 0, 'מבצע רגיל' - CLUB = 1, 'מועדון' - CREDIT_CARD = 2, 'כרטיס אשראי' - OTHER = 3, 'אחר' + REGULAR = 0, "מבצע רגיל" + CLUB = 1, "מועדון" + CREDIT_CARD = 2, "כרטיס אשראי" + OTHER = 3, "אחר" @classmethod def _missing_(cls, value): @@ -79,9 +85,20 @@ class Promotion: It contains only part of the available information in Shufersal's data. """ - def __init__(self, content: str, start_date: datetime, end_date: datetime, update_date: datetime, items: List[Item], - promo_func: callable, club_id: ClubID, promotion_id: int, max_qty: int, - allow_multiple_discounts: bool, reward_type: RewardType): + def __init__( + self, + content: str, + start_date: datetime, + end_date: datetime, + update_date: datetime, + items: List[Item], + promo_func: callable, + club_id: ClubID, + promotion_id: int, + max_qty: int, + allow_multiple_discounts: bool, + reward_type: RewardType, + ): self.content: str = content self.start_date: datetime = start_date self.end_date: datetime = end_date @@ -98,41 +115,49 @@ class Promotion: title = self.content dates_range = f"Between {self.start_date} and {self.end_date}" update_line = f"Updated at {self.update_date}" - return '\n'.join([title, dates_range, update_line, str(self.items)]) + '\n' + return "\n".join([title, dates_range, update_line, str(self.items)]) + "\n" def __eq__(self, other): return self.promotion_id == other.promotion_id -def write_promotions_to_table(promotions: List[Promotion], output_filename: str) -> None: +def write_promotions_to_table( + promotions: List[Promotion], output_filename: str +) -> None: """ This function writes a List of promotions to a csv or xlsx output file. :param promotions: A given list of promotions :param output_filename: A given file to write to """ - log_message_and_time_if_debug('Writing promotions to output file') - rows = [get_promotion_row_for_table(promo, item) for promo in promotions for item in promo.items] - if output_filename.endswith('.csv'): + log_message_and_time_if_debug("Writing promotions to output file") + rows = [ + get_promotion_row_for_table(promo, item) + for promo in promotions + for item in promo.items + ] + if output_filename.endswith(".csv"): encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8" - with open(output_filename, mode='w', newline='', encoding=encoding_file) as f_out: + with open( + output_filename, mode="w", newline="", encoding=encoding_file + ) as f_out: promos_writer = csv.writer(f_out) promos_writer.writerow(PROMOTIONS_TABLE_HEADERS) promos_writer.writerows(rows) - elif output_filename.endswith('.xlsx'): + elif output_filename.endswith(".xlsx"): df = pd.DataFrame(rows, columns=PROMOTIONS_TABLE_HEADERS) workbook = xlsxwriter.Workbook(output_filename) worksheet1 = workbook.add_worksheet() worksheet1.right_to_left() - date_time_format = workbook.add_format({'num_format': 'm/d/yy h:mm;@'}) - number_format = workbook.add_format({'num_format': '0.00'}) - percentage_format = workbook.add_format({'num_format': '0.00%'}) - worksheet1.set_column('A:A', width=35) - worksheet1.set_column('B:B', width=25) - worksheet1.set_column('C:D', cell_format=number_format) - worksheet1.set_column('E:E', cell_format=percentage_format) - worksheet1.set_column('J:L', width=15, cell_format=date_time_format) + date_time_format = workbook.add_format({"num_format": "m/d/yy h:mm;@"}) + number_format = workbook.add_format({"num_format": "0.00"}) + percentage_format = workbook.add_format({"num_format": "0.00%"}) + worksheet1.set_column("A:A", width=35) + worksheet1.set_column("B:B", width=25) + worksheet1.set_column("C:D", cell_format=number_format) + worksheet1.set_column("E:E", cell_format=percentage_format) + worksheet1.set_column("J:L", width=15, cell_format=date_time_format) worksheet1.add_table( first_row=0, first_col=0, @@ -141,12 +166,15 @@ def write_promotions_to_table(promotions: List[Promotion], output_filename: str) options={ "columns": [{"header": i} for i in PROMOTIONS_TABLE_HEADERS], "data": df.values.tolist(), - 'style': 'Table Style Medium 11', - }, ) + "style": "Table Style Medium 11", + }, + ) workbook.close() else: - raise ValueError(f"The given output file has an invalid extension:\n{output_filename}") + raise ValueError( + f"The given output file has an invalid extension:\n{output_filename}" + ) def get_promotion_row_for_table(promo: Promotion, item: Item) -> List: @@ -175,8 +203,9 @@ def get_promotion_row_for_table(promo: Promotion, item: Item) -> List: ] -def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool) \ - -> List[Promotion]: +def get_available_promos( + chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool +) -> List[Promotion]: """ This function return the available promotions given a BeautifulSoup object. @@ -186,15 +215,15 @@ def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bo :param load_promos: A boolean representing whether to load an existing promotion file or download it :return: Promotions that are not included in PRODUCTS_TO_IGNORE and are currently available """ - log_message_and_time_if_debug('Importing prices XML file') + log_message_and_time_if_debug("Importing prices XML file") items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices) - log_message_and_time_if_debug('Importing promotions XML file') + log_message_and_time_if_debug("Importing promotions XML file") promo_tags = get_all_promos_tags(chain, store_id, load_promos) - log_message_and_time_if_debug('Creating promotions objects') + log_message_and_time_if_debug("Creating promotions objects") promo_objs = list() - for promo in tqdm(promo_tags, desc='creating_promotions'): - promotion_id = int(promo.find(re.compile('PromotionId', re.IGNORECASE)).text) + for promo in tqdm(promo_tags, desc="creating_promotions"): + promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text) if promo_objs and promo_objs[-1].promotion_id == promotion_id: promo_objs[-1].items.extend(chain.get_items(promo, items_dict)) continue @@ -206,8 +235,9 @@ def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bo return promo_objs -def create_new_promo_instance(chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int) \ - -> Union[Promotion, None]: +def create_new_promo_instance( + chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int +) -> Union[Promotion, None]: """ This function generates a Promotion object from a promotion tag. @@ -217,41 +247,64 @@ def create_new_promo_instance(chain: SupermarketChain, items_dict: Dict[str, Ite :param promotion_id: An integer representing the promotion ID :return: If the promotion expired - return None, else return the Promotion object """ - promo_end_time = datetime.strptime(promo.find('PromotionEndDate').text + ' ' + - promo.find('PromotionEndHour').text, - chain.date_hour_format) + promo_end_time = datetime.strptime( + promo.find("PromotionEndDate").text + " " + promo.find("PromotionEndHour").text, + chain.date_hour_format, + ) if promo_end_time < datetime.now(): return None reward_type = RewardType(int(promo.find("RewardType").text)) discounted_price = get_discounted_price(promo) - promo_description = promo.find('PromotionDescription').text - is_discount_in_percentage = reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price - raw_discount_rate = promo.find('DiscountRate').text if promo.find('DiscountRate') else None + promo_description = promo.find("PromotionDescription").text + is_discount_in_percentage = ( + reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price + ) + raw_discount_rate = ( + promo.find("DiscountRate").text if promo.find("DiscountRate") else None + ) discount_rate = get_discount_rate(raw_discount_rate, is_discount_in_percentage) - min_qty = get_float_from_tag(promo, 'MinQty') - max_qty = get_float_from_tag(promo, 'MaxQty') + min_qty = get_float_from_tag(promo, "MinQty") + max_qty = get_float_from_tag(promo, "MaxQty") remark = promo.find("Remark") - promo_func = find_promo_function(reward_type=reward_type, remark=remark.text if remark else '', - promo_description=promo_description, min_qty=min_qty, - discount_rate=discount_rate, discounted_price=discounted_price) - promo_start_time = datetime.strptime(promo.find('PromotionStartDate').text + ' ' + - promo.find('PromotionStartHour').text, - chain.date_hour_format) - promo_update_time = datetime.strptime(promo.find(chain.promotion_update_tag_name).text, - chain.update_date_format) - club_id = ClubID(int(promo.find(re.compile('ClubId', re.IGNORECASE)).text)) - multiple_discounts_allowed = bool(int(promo.find('AllowMultipleDiscounts').text)) + promo_func = find_promo_function( + reward_type=reward_type, + remark=remark.text if remark else "", + promo_description=promo_description, + min_qty=min_qty, + discount_rate=discount_rate, + discounted_price=discounted_price, + ) + promo_start_time = datetime.strptime( + promo.find("PromotionStartDate").text + + " " + + promo.find("PromotionStartHour").text, + chain.date_hour_format, + ) + promo_update_time = datetime.strptime( + promo.find(chain.promotion_update_tag_name).text, chain.update_date_format + ) + club_id = ClubID(int(promo.find(re.compile("ClubId", re.IGNORECASE)).text)) + multiple_discounts_allowed = bool(int(promo.find("AllowMultipleDiscounts").text)) items = chain.get_items(promo, items_dict) - return Promotion(content=promo_description, start_date=promo_start_time, end_date=promo_end_time, - update_date=promo_update_time, items=items, promo_func=promo_func, - club_id=club_id, promotion_id=promotion_id, max_qty=max_qty, - allow_multiple_discounts=multiple_discounts_allowed, reward_type=reward_type) + return Promotion( + content=promo_description, + start_date=promo_start_time, + end_date=promo_end_time, + update_date=promo_update_time, + items=items, + promo_func=promo_func, + club_id=club_id, + promotion_id=promotion_id, + max_qty=max_qty, + allow_multiple_discounts=multiple_discounts_allowed, + reward_type=reward_type, + ) def get_discounted_price(promo): - discounted_price = promo.find('DiscountedPrice') + discounted_price = promo.find("DiscountedPrice") if discounted_price: return float(discounted_price.text) @@ -263,8 +316,14 @@ def get_discount_rate(discount_rate: Union[float, None], discount_in_percentage: return float(discount_rate) -def find_promo_function(reward_type: RewardType, remark: str, promo_description: str, min_qty: float, - discount_rate: Union[float, None], discounted_price: Union[float, None]): +def find_promo_function( + reward_type: RewardType, + remark: str, + promo_description: str, + min_qty: float, + discount_rate: Union[float, None], + discounted_price: Union[float, None], +): if reward_type == RewardType.SECOND_INSTANCE_DIFFERENT_DISCOUNT: if not discounted_price: return lambda item: item.price * (1 - (discount_rate / min_qty)) @@ -277,7 +336,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description: return lambda item: item.price * (1 - (1 / min_qty)) if reward_type == RewardType.DISCOUNT_IN_PERCENTAGE: - return lambda item: item.price * (1 - discount_rate / (2 if "השני ב" in promo_description else 1)) + return lambda item: item.price * ( + 1 - discount_rate / (2 if "השני ב" in promo_description else 1) + ) if reward_type == RewardType.SECOND_INSTANCE_SAME_DISCOUNT: if "השני ב" in promo_description: @@ -299,24 +360,73 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description: return lambda item: INVALID_OR_UNKNOWN_PROMOTION_FUNCTION -def main_latest_promos(store_id: int, output_filename, chain: SupermarketChain, load_promos: bool, - load_xml: bool) -> None: +def main_latest_promos( + store_id: int, + output_filename, + chain: SupermarketChain, + load_promos: bool, + load_prices: bool, +) -> None: """ This function writes to a file the available promotions in a store with a given id sorted by their update date. :param chain: The name of the requested supermarket chain :param store_id: A given store id - :param load_xml: A boolean representing whether to load an existing prices xml file + :param load_prices: A boolean representing whether to load an existing prices xml file :param load_promos: A boolean representing whether to load an existing promos xml file :param output_filename: A path to write the promotions table """ - promotions: List[Promotion] = get_available_promos(chain, store_id, load_xml, load_promos) - promotions.sort(key=lambda promo: (max(promo.update_date.date(), promo.start_date.date()), promo.start_date - - promo.end_date), reverse=True) + promotions: List[Promotion] = get_available_promos( + chain, store_id, load_prices, load_promos + ) + promotions.sort( + key=lambda promo: ( + max(promo.update_date.date(), promo.start_date.date()), + promo.start_date - promo.end_date, + ), + reverse=True, + ) write_promotions_to_table(promotions, output_filename) -def log_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, load_prices: bool, load_promos: bool): +def get_all_prices( + store_id: int, + output_filename, + chain: SupermarketChain, + load_promos: bool, + load_prices: bool, +): + log_message_and_time_if_debug("Importing prices XML file") + items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices) + log_message_and_time_if_debug("Importing promotions XML file") + promo_tags = get_all_promos_tags(chain, store_id, load_promos) + + log_message_and_time_if_debug("Creating promotions objects") + promo_obj = None + for promo in tqdm(promo_tags, desc="creating_promotions"): + promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text) + if promo_obj is None or promo_obj.promotion_id != promotion_id: + promo_obj = create_new_promo_instance( + chain, items_dict, promo, promotion_id + ) + for item in promo.find_all("Item"): + item_code = item.find("ItemCode").text + cur_item = items_dict.get(item_code) + if cur_item is not None: + discounted_price = promo_obj.promo_func(cur_item) + if cur_item.price > discounted_price: + cur_item.final_price = discounted_price + + return items_dict + + +def log_promos_by_name( + store_id: int, + chain: SupermarketChain, + promo_name: str, + load_prices: bool, + load_promos: bool, +): """ This function prints all promotions in a given chain and store_id containing a given promo_name. @@ -326,7 +436,9 @@ def log_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, :param load_prices: A boolean representing whether to load an saved prices XML file or scrape a new one :param load_promos: A boolean representing whether to load an saved XML file or scrape a new one """ - promotions: List[Promotion] = get_available_promos(chain, store_id, load_prices, load_promos) + promotions: List[Promotion] = get_available_promos( + chain, store_id, load_prices, load_promos + ) for promo in promotions: if promo_name in promo.content: logging.info(promo.repr_ltr()) @@ -339,10 +451,16 @@ def get_all_null_items_in_promos(chain, store_id) -> List[str]: """ items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml=True) promo_tags = get_all_promos_tags(chain, store_id, load_xml=True) - return [item for promo_tag in promo_tags for item in chain.get_null_items(promo_tag, items_dict)] + return [ + item + for promo_tag in promo_tags + for item in chain.get_null_items(promo_tag, items_dict) + ] -def get_all_promos_tags(chain: SupermarketChain, store_id: int, load_xml: bool) -> List[Tag]: +def get_all_promos_tags( + chain: SupermarketChain, store_id: int, load_xml: bool +) -> List[Tag]: """ This function gets all the promotions tags for a given store in a given chain. It includes both the full and not full promotions files. @@ -353,8 +471,14 @@ def get_all_promos_tags(chain: SupermarketChain, store_id: int, load_xml: bool) :return: A list of promotions tags """ bs_objects = list() - for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc='promotions_files'): + for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc="promotions_files"): xml_path = xml_file_gen(chain, store_id, category.name) - bs_objects.append(create_bs_object(chain, store_id, category, load_xml, xml_path)) + bs_objects.append( + create_bs_object(chain, store_id, category, load_xml, xml_path) + ) - return [promo for bs_obj in bs_objects for promo in bs_obj.find_all(chain.promotion_tag_name)] + return [ + promo + for bs_obj in bs_objects + for promo in bs_obj.find_all(chain.promotion_tag_name) + ] diff --git a/requirements.txt b/requirements.txt index 35740e8..588def0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,6 @@ pytest~=6.2.2 pandas~=1.2.0 argparse~=1.4.0 XlsxWriter~=1.4.3 -aenum \ No newline at end of file +aenum +selenium +webdriver-manager \ No newline at end of file diff --git a/supermarket_chain.py b/supermarket_chain.py index 9668710..f721776 100644 --- a/supermarket_chain.py +++ b/supermarket_chain.py @@ -1,10 +1,9 @@ -import re from abc import abstractmethod -from enum import Enum from argparse import ArgumentTypeError from typing import Dict, List import requests +from aenum import StrEnum from bs4.element import Tag from item import Item @@ -20,18 +19,24 @@ class SupermarketChain(object, metaclass=Meta): A class representing a supermarket chain. """ - class XMLFilesCategory(Enum): + class XMLFilesCategory(StrEnum): """ An enum class of different XML files produced by a supermarket chain """ - All, Prices, PricesFull, Promos, PromosFull, Stores = range(6) - _promotion_tag_name = 'Promotion' - _promotion_update_tag_name = 'PromotionUpdateDate' - _date_format = '%Y-%m-%d' - _date_hour_format = '%Y-%m-%d %H:%M' - _update_date_format = '%Y-%m-%d %H:%M' - _item_tag_name = 'Item' + All = ("All",) + Prices = ("price",) + PricesFull = ("pricefull",) + Promos = ("promo",) + PromosFull = ("promofull",) + Stores = "store" + + _promotion_tag_name = "Promotion" + _promotion_update_tag_name = "PromotionUpdateDate" + _date_format = "%Y-%m-%d" + _date_hour_format = "%Y-%m-%d %H:%M" + _update_date_format = "%Y-%m-%d %H:%M" + _item_tag_name = "Item" @property def promotion_tag_name(self): @@ -75,14 +80,19 @@ class SupermarketChain(object, metaclass=Meta): :return: The given store_id if valid, else raise an ArgumentTypeError. """ if not SupermarketChain.is_valid_store_id(int(store_id)): - raise ArgumentTypeError(f"Given store_id: {store_id} is not a valid store_id.") + raise ArgumentTypeError( + f"Given store_id: {store_id} is not a valid store_id." + ) return store_id @staticmethod @abstractmethod - def get_download_url(store_id: int, category: XMLFilesCategory, session: requests.Session) -> str: + def get_download_url_or_path( + store_id: int, category: XMLFilesCategory, session: requests.Session + ) -> str: """ - This method scrapes supermarket's website and returns a url containing the data for a given store and category. + This method scrapes the supermarket's website and according to the given store id and category, + it returns a url containing the data or or a path to a gz file containing the data. :param store_id: A given ID of a store :param category: A given category @@ -100,8 +110,8 @@ class SupermarketChain(object, metaclass=Meta): :param items_dict: A given dictionary of products """ items = list() - for item in promo.find_all('Item'): - item_code = item.find('ItemCode').text + for item in promo.find_all("Item"): + item_code = item.find("ItemCode").text full_item_info = items_dict.get(item_code) if full_item_info: items.append(full_item_info) @@ -112,14 +122,8 @@ class SupermarketChain(object, metaclass=Meta): """ This function returns all the items in a given promotion which do not appear in the given items_dict. """ - return [item.find('ItemCode').text for item in promo.find_all('Item') - if not items_dict.get(item.find('ItemCode').text)] - - @staticmethod - def get_item_info(item: Tag) -> Item: - """ - This function returns a string containing important information about a given supermarket's product. - """ - return Item(name=item.find(re.compile(r'ItemN[a]?m[e]?')).text, price=float(item.find('ItemPrice').text), - price_by_measure=float(item.find('UnitOfMeasurePrice').text), code=item.find('ItemCode').text, - manufacturer=item.find(re.compile(r'Manufacture[r]?Name')).text) + return [ + item.find("ItemCode").text + for item in promo.find_all("Item") + if not items_dict.get(item.find("ItemCode").text) + ] diff --git a/tests/test_scraping.py b/tests/test_scraping.py index f2e3d9d..1160bc4 100644 --- a/tests/test_scraping.py +++ b/tests/test_scraping.py @@ -1,10 +1,11 @@ import logging import os +import re +import tempfile + +import pandas as pd import pytest import requests -from tqdm import tqdm -import pandas as pd -import re from chains.bareket import Bareket from chains.co_op import CoOp @@ -14,89 +15,90 @@ from chains.shuk_hayir import ShukHayir from chains.stop_market import StopMarket from chains.tiv_taam import TivTaam from chains.zol_vebegadol import ZolVebegadol +from main import CHAINS_DICT from promotion import PROMOTION_COLS_NUM, main_latest_promos from supermarket_chain import SupermarketChain -from chains import ( - bareket, - mahsaneiHashook, - dor_alon, - freshmarket, - hazi_hinam, - keshet, - stop_market, - tiv_taam, - shufersal, - co_op, - victory, - yohananof, - zol_vebegadol, - rami_levi, - osher_ad, - maayan2000, - shuk_hayir, - king_store, - shefa_birkat_hashem, -) -pytest.main(args=['-s', os.path.abspath(__file__)]) +pytest.main(args=["-s", os.path.abspath(__file__)]) -chain_dict = {repr(chain): chain() if callable(chain) else None for chain in SupermarketChain.__subclasses__()} +session = requests.Session() MIN_NUM_OF_PROMOS = 3 -def test_searching_for_download_urls(): +@pytest.mark.parametrize("chain_tuple", CHAINS_DICT.items()) +def test_searching_for_download_urls(chain_tuple): """ Test that get_download_url of each chain returns the correct download url for each category in every chain. """ - session = requests.Session() - for chain_name, chain in tqdm(chain_dict.items(), desc='chains'): + chain_name, chain = chain_tuple + # for chain_name, chain in tqdm(chain_dict.items(), desc='chains'): - logging.info(f'Checking download urls in chain {chain_name}') - store_id: int = valid_store_id_by_chain(chain_name) + logging.info(f"Checking download urls in chain {chain_name}") + store_id: int = valid_store_id_by_chain(chain_name) - _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.PromosFull, r'promo[s]?full', session) - _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.Promos, r'promo[s]?', session) - _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.PricesFull, r'price[s]?full', session) - _test_download_url_helper(chain, store_id, chain.XMLFilesCategory.Prices, r'price[s]?', session) + _test_download_url_helper( + chain, store_id, chain.XMLFilesCategory.PromosFull, r"promo[s]?full", session + ) + _test_download_url_helper( + chain, store_id, chain.XMLFilesCategory.Promos, r"promo[s]?", session + ) + _test_download_url_helper( + chain, store_id, chain.XMLFilesCategory.PricesFull, r"price[s]?full", session + ) + _test_download_url_helper( + chain, store_id, chain.XMLFilesCategory.Prices, r"price[s]?", session + ) -def _test_download_url_helper(chain: SupermarketChain, store_id: int, category: SupermarketChain.XMLFilesCategory, - regex_pat: str, session: requests.session): - download_url: str = chain.get_download_url(store_id, category, session) +def _test_download_url_helper( + chain: SupermarketChain, + store_id: int, + category: SupermarketChain.XMLFilesCategory, + regex_pat: str, + session: requests.session, +): + download_url: str = chain.get_download_url_or_path(store_id, category, session) if not download_url: # Not found non-full Promos/Prices file return logging.debug(download_url) - assert re.search(regex_pat, download_url, re.IGNORECASE), f'Invalid {category.name} url in {repr(type(chain))}' + assert re.search( + regex_pat, download_url, re.IGNORECASE + ), f"Invalid {category.name} url in {repr(type(chain))}" if category in [chain.XMLFilesCategory.Prices, chain.XMLFilesCategory.Promos]: - assert not re.search('full', download_url, re.IGNORECASE), \ - f'Downloaded the full {category.name} file mistakenly in {repr(type(chain))}' + assert not re.search( + "full", download_url, re.IGNORECASE + ), f"Downloaded the full {category.name} file mistakenly in {repr(type(chain))}" -def test_promotions_scraping(): +@pytest.mark.parametrize("chain_tuple", CHAINS_DICT.items()) +def test_promotions_scraping(chain_tuple): """ Test scraping of promotions is completed successfully and a valid xlsx file is generated as an output. """ - filename = 'temp.xlsx' - for chain_name, chain in tqdm(chain_dict.items(), desc='chains'): - logging.info(f'Test scraping promotions from {chain_name}') + chain_name, chain = chain_tuple + tf = tempfile.NamedTemporaryFile(suffix=".xlsx") - store_id: int = valid_store_id_by_chain(chain_name) - try: - main_latest_promos( - store_id=store_id, - output_filename=filename, - chain=chain, - load_promos=False, - load_xml=False, - ) - df = pd.read_excel(filename) - except Exception as e: - logging.error(e) - logging.error(f"Failed loading excel of {chain_name}") - raise + logging.info(f"Test scraping promotions from {chain_name}") - assert df.shape[0] > MIN_NUM_OF_PROMOS and df.shape[1] == PROMOTION_COLS_NUM, f"Failed scraping {chain_name}" + store_id: int = valid_store_id_by_chain(chain_name) + try: + main_latest_promos( + store_id=store_id, + output_filename=tf.name, + chain=chain, + load_promos=False, + load_prices=False, + ) + df = pd.read_excel(tf.name) + except Exception as e: + logging.error(e) + logging.error(f"Failed loading excel of {chain_name}") + raise + + assert ( + df.shape[0] > MIN_NUM_OF_PROMOS and df.shape[1] == PROMOTION_COLS_NUM + ), f"Failed scraping {chain_name}" def valid_store_id_by_chain(chain_name) -> int: @@ -108,11 +110,11 @@ def valid_store_id_by_chain(chain_name) -> int: """ if chain_name == repr(DorAlon): store_id = 501 - elif chain_name in [repr(TivTaam), repr(Bareket), repr(ZolVebegadol)]: + elif chain_name in [repr(TivTaam), repr(Bareket)]: store_id = 2 elif chain_name == repr(CoOp): store_id = 202 - elif chain_name == repr(ShukHayir): + elif chain_name == [repr(ShukHayir), repr(ZolVebegadol)]: store_id = 4 elif chain_name in [repr(StopMarket), repr(Keshet)]: store_id = 5 diff --git a/utils.py b/utils.py index 4d3e3b2..0aa911a 100644 --- a/utils.py +++ b/utils.py @@ -1,14 +1,16 @@ import gzip import io import logging +import os.path import zipfile from argparse import ArgumentTypeError +from datetime import date from datetime import datetime +from os import path from typing import AnyStr, Dict + import requests from bs4 import BeautifulSoup -from os import path - from tqdm import tqdm from item import Item @@ -29,12 +31,22 @@ def xml_file_gen(chain: SupermarketChain, store_id: int, category_name: str) -> :param category_name: A given category name :return: An xml filename """ - store_id_str: str = f"-{str(store_id)}" if SupermarketChain.is_valid_store_id(store_id) else "" - return path.join(RAW_FILES_DIRNAME, f"{repr(type(chain))}-{category_name}{store_id_str}.xml") + store_id_str: str = ( + f"-{str(store_id)}" if SupermarketChain.is_valid_store_id(store_id) else "" + ) + return path.join( + RAW_FILES_DIRNAME, + f"{repr(type(chain))}-{category_name}{store_id_str}-{date.today()}.xml", + ) -def create_bs_object(chain: SupermarketChain, store_id: int, category: SupermarketChain.XMLFilesCategory, - load_xml: bool, xml_path: str) -> BeautifulSoup: +def create_bs_object( + chain: SupermarketChain, + store_id: int, + category: SupermarketChain.XMLFilesCategory, + load_xml: bool, + xml_path: str, +) -> BeautifulSoup: """ This function creates a BeautifulSoup (BS) object according to the given parameters. In case the given load_xml is True and the XML file exists, the function creates the BS object from the given @@ -53,8 +65,12 @@ def create_bs_object(chain: SupermarketChain, store_id: int, category: Supermark return get_bs_object_from_link(chain, store_id, category, xml_path) -def get_bs_object_from_link(chain: SupermarketChain, store_id: int, category: SupermarketChain.XMLFilesCategory, - xml_path: str) -> BeautifulSoup: +def get_bs_object_from_link( + chain: SupermarketChain, + store_id: int, + category: SupermarketChain.XMLFilesCategory, + xml_path: str, +) -> BeautifulSoup: """ This function creates a BeautifulSoup (BS) object by generating a download link the given chain's API. @@ -65,20 +81,25 @@ def get_bs_object_from_link(chain: SupermarketChain, store_id: int, category: Su :return: A BeautifulSoup object with xml content. """ session = requests.Session() - download_url: str = chain.get_download_url(store_id, category, session) - if not download_url: + download_url_or_path: str = chain.get_download_url_or_path(store_id, category, session) + if not download_url_or_path: return BeautifulSoup() - response_content = session.get(download_url).content - try: - xml_content: AnyStr = gzip.decompress(response_content) - except gzip.BadGzipFile: - with zipfile.ZipFile(io.BytesIO(response_content)) as the_zip: - zip_info = the_zip.infolist()[0] - with the_zip.open(zip_info) as the_file: - xml_content = the_file.read() - with open(xml_path, 'wb') as f_out: + if os.path.isfile(download_url_or_path): + with gzip.open(download_url_or_path) as fIn: + xml_content = fIn.read() + os.remove(download_url_or_path) # Delete gz file + else: + response_content = session.get(download_url_or_path).content + try: + xml_content: AnyStr = gzip.decompress(response_content) + except gzip.BadGzipFile: + with zipfile.ZipFile(io.BytesIO(response_content)) as the_zip: + zip_info = the_zip.infolist()[0] + with the_zip.open(zip_info) as the_file: + xml_content = the_file.read() + with open(xml_path, "wb") as f_out: f_out.write(xml_content) - return BeautifulSoup(xml_content, features='xml') + return BeautifulSoup(xml_content, features="xml") def get_bs_object_from_xml(xml_path: str) -> BeautifulSoup: @@ -88,11 +109,13 @@ def get_bs_object_from_xml(xml_path: str) -> BeautifulSoup: :param xml_path: A given path to an xml file to load/save the BS object from/to. :return: A BeautifulSoup object with xml content. """ - with open(xml_path, 'rb') as f_in: - return BeautifulSoup(f_in, features='xml') + with open(xml_path, "rb") as f_in: + return BeautifulSoup(f_in, features="xml") -def create_items_dict(chain: SupermarketChain, store_id: int, load_xml) -> Dict[str, Item]: +def create_items_dict( + chain: SupermarketChain, store_id: int, load_xml +) -> Dict[str, Item]: """ This function creates a dictionary where every key is an item code and its value is its corresponding Item instance. We take both full and not full prices files, and assume that the no full is more updated (in case of overwriting). @@ -102,16 +125,28 @@ def create_items_dict(chain: SupermarketChain, store_id: int, load_xml) -> Dict[ :param store_id: A given store id """ items_dict = dict() - for category in tqdm([chain.XMLFilesCategory.PricesFull, chain.XMLFilesCategory.Prices], desc='prices_files'): + for category in tqdm( + [chain.XMLFilesCategory.PricesFull, chain.XMLFilesCategory.Prices], + desc="prices_files", + ): xml_path: str = xml_file_gen(chain, store_id, category.name) - bs_prices: BeautifulSoup = create_bs_object(chain, store_id, category, load_xml, xml_path) + bs_prices: BeautifulSoup = create_bs_object( + chain, store_id, category, load_xml, xml_path + ) items_tags = bs_prices.find_all(chain.item_tag_name) - items_dict.update({item.find('ItemCode').text: chain.get_item_info(item) for item in items_tags}) + items_dict.update( + { + item_tag.find("ItemCode").text: Item.from_tag(item_tag) + for item_tag in items_tags + } + ) return items_dict -def log_products_prices(chain: SupermarketChain, store_id: int, load_xml: bool, product_name: str) -> None: +def log_products_prices( + chain: SupermarketChain, store_id: int, load_xml: bool, product_name: str +) -> None: """ This function prints the products in a given store which contains a given product_name. @@ -121,8 +156,12 @@ def log_products_prices(chain: SupermarketChain, store_id: int, load_xml: bool, :param load_xml: A boolean representing whether to load an existing xml or load an already saved one """ items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml) - products_by_name = [item for item in items_dict.values() if product_name in item.name] - products_by_name_sorted_by_price = sorted(products_by_name, key=lambda item: item.price_by_measure) + products_by_name = [ + item for item in items_dict.values() if product_name in item.name + ] + products_by_name_sorted_by_price = sorted( + products_by_name, key=lambda item: item.price_by_measure + ) for prod in products_by_name_sorted_by_price: logging.info(prod) @@ -134,12 +173,16 @@ def get_float_from_tag(tag, int_tag) -> int: def is_valid_promotion_output_file(output_file: str) -> bool: - return any(output_file.endswith(extension) for extension in VALID_PROMOTION_FILE_EXTENSIONS) + return any( + output_file.endswith(extension) for extension in VALID_PROMOTION_FILE_EXTENSIONS + ) def valid_promotion_output_file(output_file: str) -> str: if not is_valid_promotion_output_file(output_file): - raise ArgumentTypeError(f"Given output file is not a natural number:\n{output_file}") + raise ArgumentTypeError( + f"Given output file has an invalid extension is invalid: {output_file}" + ) return output_file