import logging import os import shutil import platform import sys import time from abc import abstractmethod import requests from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager from supermarket_chain import SupermarketChain class CerberusWebClient(SupermarketChain): @property @abstractmethod def username(self): pass download_dir = f"{os.path.abspath(os.path.curdir)}/raw_files" def is_system_headless(self) -> bool: return sys.platform == "linux" and not os.environ.get("DISPLAY") def set_browser_options(self) -> webdriver.ChromeOptions: options = webdriver.ChromeOptions() options.set_capability("download.default_directory", f"{os.path.abspath(os.path.curdir)}/raw_files") options.add_argument("ignore-certificate-errors") options.add_argument("--ignore-ssl-errors=yes") options.headless = self.is_system_headless() return options def set_browser(self,options: webdriver.ChromeOptions) -> webdriver.Chrome: if self.is_system_headless() and platform.machine() == 'aarch64': return webdriver.Chrome(service=Service('/usr/bin/chromedriver'), options=options) return webdriver.Chrome( service=Service(ChromeDriverManager().install()), options=options ) def get_download_url_or_path( self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session, ) -> str: options=self.set_browser_options() driver = self.set_browser(options) driver.get("https://url.retail.publishedprices.co.il/login#") time.sleep(2) userElem = driver.find_element(By.NAME, "username") userElem.send_keys(self.username) driver.find_element(By.NAME, "Submit").click() time.sleep(2) searchElem = driver.find_element(By.CLASS_NAME, "form-control") searchElem.send_keys(category.name.lower().replace('s', '')) time.sleep(5) conns = driver.find_elements(By.CLASS_NAME, "f") best_link = "" for conn in conns: link = conn.get_attribute("href").lower() if category == SupermarketChain.XMLFilesCategory.Promos: filter_func = ( lambda l: "promo" in l and "full" not in l and f"-{store_id:03d}-20" in l ) elif category == SupermarketChain.XMLFilesCategory.PromosFull: filter_func = ( lambda l: "promo" in l and "full" in l and f"-{store_id:03d}-20" in l ) elif category == SupermarketChain.XMLFilesCategory.Prices: filter_func = ( lambda l: "price" in l and "full" not in l and f"-{store_id:03d}-20" in l ) elif category == SupermarketChain.XMLFilesCategory.PricesFull: filter_func = ( lambda l: "price" in l and "full" in l and f"-{store_id:03d}-20" in l ) elif category == SupermarketChain.XMLFilesCategory.Stores: filter_func = lambda l: "store" in l and "full" in l and f"-000-20" in l else: raise ValueError(f"Unknown category type: {category=}") if filter_func(link): if not best_link or int(link[-7:-3]) > int(best_link[-7:-3]): best_link = link if not best_link: return "" driver.get(best_link) time.sleep(3) filename = best_link.split("/")[-1] # don't be an idiot. it is stupid to count letters # split and grab, or rename it by yourself. path_download = os.path.join(self.download_dir, filename) logging.info(f"{path_download=}") path_to_save = f"raw_files/{self.username}-{filename}" try: shutil.move(path_download, path_to_save) print(f"Downloaded {filename} and moved file to {path_to_save}") except: print(f"{filename} already exists in {path_to_save}") return path_to_save