Compare commits
37 Commits
Author | SHA1 | Date | |
---|---|---|---|
6f527b12d5 | |||
b8ccd6dcf7 | |||
|
6755ff5caf | ||
42fac846aa | |||
d047ffdcc2 | |||
|
9b6f63a7f0 | ||
|
86ff2ca7b7 | ||
|
b1737839ce | ||
|
7b63eab7bd | ||
|
ceff48dbd9 | ||
|
b5db721a3d | ||
|
90cab0a2e1 | ||
|
87b6fbe2b0 | ||
|
322995ba15 | ||
|
294dee8cc2 | ||
|
cffdd84086 | ||
|
3770352d04 | ||
|
63fec1490c | ||
|
c1281cb312 | ||
|
1a88ed6e01 | ||
|
9b0ab013c9 | ||
|
1a6707341d | ||
|
844a106c57 | ||
|
c793057623 | ||
|
13991aaa40 | ||
|
b3d410306d | ||
|
62089dd538 | ||
|
03ff6d5281 | ||
|
e09b2da4a1 | ||
|
58bb04f1dd | ||
|
ebb1e912b9 | ||
|
98dcc1c33d | ||
|
8a726ff605 | ||
|
27b45a4999 | ||
|
ec505dba67 | ||
3ae8d02836 | |||
|
e740b122ff |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -6,3 +6,6 @@ raw_files/
|
||||
results/
|
||||
all_deals.py
|
||||
unknown_items.csv
|
||||
helper_*
|
||||
.vscode/
|
||||
desktop.ini
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from chains.mahsaneiHashook import MahsaneiHashook
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class Bareket(MahsaneiHashook, SupermarketChain):
|
||||
class Bareket(MahsaneiHashook):
|
||||
pass
|
||||
|
@@ -1,25 +1,38 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
FNAME_KEY = "FileNm"
|
||||
|
||||
class BinaProjectWebClient:
|
||||
|
||||
class BinaProjectWebClient(SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
_update_date_format = '%Y-%m-%d %H:%M:%S'
|
||||
_path_prefix = ""
|
||||
_hostname_suffix = ".binaprojects.com"
|
||||
|
||||
def get_download_url(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \
|
||||
def get_download_url_or_path(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \
|
||||
-> str:
|
||||
if not SupermarketChain.is_valid_store_id(store_id):
|
||||
raise ValueError(f"Invalid {store_id=} (store id must be a natural number)")
|
||||
hostname = f"http://{self.hostname_prefix}{self.hostname_suffix}"
|
||||
url = '/'.join([hostname, self.path_prefix, "MainIO_Hok.aspx"])
|
||||
req_res: requests.Response = session.get(url)
|
||||
jsons_files = json.loads(req_res.text)
|
||||
suffix = next(cur_json["FileNm"] for cur_json in jsons_files if f'-{store_id:03d}-20' in cur_json["FileNm"]
|
||||
and category.name.replace('s', '') in cur_json["FileNm"])
|
||||
|
||||
if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]:
|
||||
filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname \
|
||||
and not re.search('full', fname, re.IGNORECASE)
|
||||
if not any(filter_func(cur_json[FNAME_KEY]) for cur_json in jsons_files):
|
||||
return "" # Could not find non-full Promos/Prices file
|
||||
else:
|
||||
filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname and 'null' not in fname
|
||||
suffix = next(
|
||||
cur_json[FNAME_KEY] for cur_json in jsons_files if filter_func(cur_json[FNAME_KEY]))
|
||||
down_url: str = '/'.join([hostname, self.path_prefix, "Download", suffix])
|
||||
print(down_url)
|
||||
return down_url
|
||||
|
||||
@property
|
||||
|
@@ -1,28 +1,113 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import platform
|
||||
import sys
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
|
||||
import requests
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.common.by import By
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class CerberusWebClient:
|
||||
|
||||
def get_download_url(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \
|
||||
-> str:
|
||||
hostname: str = "https://publishedprices.co.il"
|
||||
|
||||
# Post the payload to the site to log in
|
||||
session.post(hostname + "/login/user", data={'username': self.username})
|
||||
|
||||
# Scrape the data
|
||||
ajax_dir_payload: dict = {'iDisplayLength': 100000, 'sSearch': category.name.replace('s', '')}
|
||||
s: requests.Response = session.post(hostname + "/file/ajax_dir", data=ajax_dir_payload)
|
||||
s_json: dict = json.loads(s.text)
|
||||
suffix: str = next(d['name'] for d in s_json['aaData'] if f'-{store_id:03d}-20' in d['name'])
|
||||
|
||||
download_url: str = hostname + "/file/d/" + suffix
|
||||
print(download_url)
|
||||
return download_url
|
||||
|
||||
class CerberusWebClient(SupermarketChain):
|
||||
@property
|
||||
@abstractmethod
|
||||
def username(self):
|
||||
return repr(type(self))
|
||||
pass
|
||||
|
||||
download_dir = f"{os.path.abspath(os.path.curdir)}/raw_files"
|
||||
|
||||
def is_system_headless(self) -> bool:
|
||||
return sys.platform == "linux" and not os.environ.get("DISPLAY")
|
||||
|
||||
def set_browser_options(self) -> webdriver.ChromeOptions:
|
||||
options = webdriver.ChromeOptions()
|
||||
options.add_experimental_option("prefs",{"download.default_directory": self.download_dir})
|
||||
options.add_argument("ignore-certificate-errors")
|
||||
options.add_argument("--ignore-ssl-errors=yes")
|
||||
options.headless = self.is_system_headless()
|
||||
return options
|
||||
|
||||
def set_browser(self,options: webdriver.ChromeOptions) -> webdriver.Chrome:
|
||||
if self.is_system_headless() and platform.machine() == 'aarch64':
|
||||
return webdriver.Chrome(service=Service('/usr/bin/chromedriver'), options=options)
|
||||
return webdriver.Chrome(
|
||||
service=Service(ChromeDriverManager().install()), options=options
|
||||
)
|
||||
|
||||
def get_download_url_or_path(
|
||||
self,
|
||||
store_id: int,
|
||||
category: SupermarketChain.XMLFilesCategory,
|
||||
session: requests.Session,
|
||||
) -> str:
|
||||
options=self.set_browser_options()
|
||||
driver = self.set_browser(options)
|
||||
driver.get("https://url.retail.publishedprices.co.il/login#")
|
||||
time.sleep(2)
|
||||
userElem = driver.find_element(By.NAME, "username")
|
||||
userElem.send_keys(self.username)
|
||||
driver.find_element(By.NAME, "Submit").click()
|
||||
time.sleep(2)
|
||||
searchElem = driver.find_element(By.CLASS_NAME, "form-control")
|
||||
searchElem.send_keys(category.name.lower().replace('s', ''))
|
||||
time.sleep(5)
|
||||
conns = driver.find_elements(By.CLASS_NAME, "f")
|
||||
best_link = ""
|
||||
for conn in conns:
|
||||
link = conn.get_attribute("href").lower()
|
||||
if category == SupermarketChain.XMLFilesCategory.Promos:
|
||||
filter_func = (
|
||||
lambda l: "promo" in l
|
||||
and "full" not in l
|
||||
and f"-{store_id:03d}-20" in l
|
||||
)
|
||||
elif category == SupermarketChain.XMLFilesCategory.PromosFull:
|
||||
filter_func = (
|
||||
lambda l: "promo" in l
|
||||
and "full" in l
|
||||
and f"-{store_id:03d}-20" in l
|
||||
)
|
||||
elif category == SupermarketChain.XMLFilesCategory.Prices:
|
||||
filter_func = (
|
||||
lambda l: "price" in l
|
||||
and "full" not in l
|
||||
and f"-{store_id:03d}-20" in l
|
||||
)
|
||||
elif category == SupermarketChain.XMLFilesCategory.PricesFull:
|
||||
filter_func = (
|
||||
lambda l: "price" in l
|
||||
and "full" in l
|
||||
and f"-{store_id:03d}-20" in l
|
||||
)
|
||||
elif category == SupermarketChain.XMLFilesCategory.Stores:
|
||||
filter_func = lambda l: "store" in l and "full" in l and f"-000-20" in l
|
||||
else:
|
||||
raise ValueError(f"Unknown category type: {category=}")
|
||||
|
||||
if filter_func(link):
|
||||
if not best_link or int(link[-7:-3]) > int(best_link[-7:-3]):
|
||||
best_link = link
|
||||
|
||||
if not best_link:
|
||||
return ""
|
||||
driver.get(best_link)
|
||||
time.sleep(3)
|
||||
filename = best_link.split("/")[-1] # don't be an idiot. it is stupid to count letters
|
||||
# split and grab, or rename it by yourself.
|
||||
path_download = os.path.join(self.download_dir, filename)
|
||||
logging.info(f"{path_download=}")
|
||||
path_to_save = f"raw_files/{self.username}-{filename}"
|
||||
try:
|
||||
shutil.move(path_download, path_to_save)
|
||||
print(f"Downloaded {filename} and moved file to {path_to_save}")
|
||||
except:
|
||||
print(f"{filename} already exists in {path_to_save}")
|
||||
|
||||
return path_to_save
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from chains.mahsaneiHashook import MahsaneiHashook
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class CoOp(MahsaneiHashook, SupermarketChain):
|
||||
class CoOp(MahsaneiHashook):
|
||||
pass
|
||||
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class DorAlon(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class DorAlon(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "doralon"
|
||||
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class Freshmarket(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class Freshmarket(CerberusWebClient):
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
@property
|
||||
def username(self):
|
||||
return "freshmarket"
|
||||
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class HaziHinam(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class HaziHinam(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "HaziHinam"
|
||||
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class Keshet(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class Keshet(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "Keshet"
|
||||
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
@@ -1,7 +1,6 @@
|
||||
from chains.binaproject_web_client import BinaProjectWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class KingStore(BinaProjectWebClient, SupermarketChain):
|
||||
class KingStore(BinaProjectWebClient):
|
||||
_path_prefix = "Food_Law"
|
||||
_hostname_suffix = ".co.il"
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from chains.binaproject_web_client import BinaProjectWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class Maayan2000(BinaProjectWebClient, SupermarketChain):
|
||||
pass
|
||||
class Maayan2000(BinaProjectWebClient):
|
||||
pass
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import re
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from bs4.element import Tag
|
||||
@@ -8,26 +10,46 @@ from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class MahsaneiHashook(SupermarketChain):
|
||||
_promotion_tag_name = 'Sale'
|
||||
_promotion_update_tag_name = 'PriceUpdateDate'
|
||||
_date_format = '%Y/%m/%d'
|
||||
_date_hour_format = '%Y/%m/%d %H:%M:%S'
|
||||
_update_date_format = '%Y/%m/%d %H:%M:%S'
|
||||
_item_tag_name = 'Product'
|
||||
_promotion_tag_name = "Sale"
|
||||
_promotion_update_tag_name = "PriceUpdateDate"
|
||||
_date_format = "%Y/%m/%d"
|
||||
_date_hour_format = "%Y/%m/%d %H:%M:%S"
|
||||
_update_date_format = "%Y/%m/%d %H:%M:%S"
|
||||
_item_tag_name = "Product"
|
||||
|
||||
@staticmethod
|
||||
def get_download_url(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str:
|
||||
def get_download_url_or_path(
|
||||
store_id: int,
|
||||
category: SupermarketChain.XMLFilesCategory,
|
||||
session: requests.Session,
|
||||
) -> str:
|
||||
prefix = "http://matrixcatalog.co.il/"
|
||||
url = prefix + "NBCompetitionRegulations.aspx"
|
||||
req_res: requests.Response = requests.get(url)
|
||||
soup = BeautifulSoup(req_res.text, features='lxml')
|
||||
suffix: str = soup.find('a', href=lambda value: value and category.name.replace('s', '') in value
|
||||
and f'-{store_id:03d}-20' in value).attrs['href']
|
||||
soup = BeautifulSoup(req_res.text, features="lxml")
|
||||
if category in [
|
||||
SupermarketChain.XMLFilesCategory.Promos,
|
||||
SupermarketChain.XMLFilesCategory.Prices,
|
||||
]:
|
||||
fname_filter_func = (
|
||||
lambda fname: fname
|
||||
and category.name.replace("s", "") in fname
|
||||
and f"-{store_id:03d}-20" in fname
|
||||
and not re.search("full", fname, re.IGNORECASE)
|
||||
)
|
||||
if soup.find("a", href=fname_filter_func) is None:
|
||||
return "" # Could not find non-full Promos/Prices file
|
||||
else:
|
||||
fname_filter_func = (
|
||||
lambda fname: fname
|
||||
and category.name.replace("s", "") in fname
|
||||
and f"-{store_id:03d}-20" in fname
|
||||
)
|
||||
suffix: str = soup.find("a", href=fname_filter_func).attrs["href"]
|
||||
down_url: str = prefix + suffix
|
||||
print(down_url)
|
||||
return down_url
|
||||
|
||||
@staticmethod
|
||||
def get_items(promo: Tag, items_dict: Dict[str, Item]) -> List[Item]:
|
||||
promo_item = items_dict.get(promo.find('ItemCode').text)
|
||||
promo_item = items_dict.get(promo.find("ItemCode").text)
|
||||
return [promo_item] if promo_item else []
|
||||
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class OsherAd(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class OsherAd(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "osherad"
|
||||
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class RamiLevi(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class RamiLevi(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "RamiLevi"
|
||||
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from chains.binaproject_web_client import BinaProjectWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class ShefaBirkatHashem(BinaProjectWebClient, SupermarketChain):
|
||||
pass
|
||||
class ShefaBirkatHashem(BinaProjectWebClient):
|
||||
pass
|
||||
|
@@ -7,12 +7,11 @@ from supermarket_chain import SupermarketChain
|
||||
class Shufersal(SupermarketChain):
|
||||
|
||||
@staticmethod
|
||||
def get_download_url(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str:
|
||||
def get_download_url_or_path(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str:
|
||||
url = f"http://prices.shufersal.co.il/FileObject/UpdateCategory?catID={category.value}"
|
||||
if SupermarketChain.is_valid_store_id(int(store_id)):
|
||||
url += f"&storeId={store_id}"
|
||||
req_res: requests.Response = requests.get(url)
|
||||
soup: BeautifulSoup = BeautifulSoup(req_res.text, features='lxml')
|
||||
down_url: str = soup.find('a', text="לחץ להורדה")['href']
|
||||
print(down_url)
|
||||
return down_url
|
||||
|
@@ -1,7 +1,7 @@
|
||||
from chains.binaproject_web_client import BinaProjectWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class ShukHayir(BinaProjectWebClient, SupermarketChain):
|
||||
class ShukHayir(BinaProjectWebClient):
|
||||
@property
|
||||
def hostname_prefix(self): return "shuk-hayir"
|
||||
def hostname_prefix(self):
|
||||
return "shuk-hayir"
|
||||
|
@@ -1,9 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class StopMarket(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class StopMarket(CerberusWebClient):
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
@property
|
||||
def username(self):
|
||||
return 'Stop_Market'
|
||||
return "Stop_Market"
|
||||
|
@@ -1,6 +1,7 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class TivTaam(CerberusWebClient, SupermarketChain):
|
||||
pass
|
||||
class TivTaam(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "TivTaam"
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from chains.mahsaneiHashook import MahsaneiHashook
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class Victory(MahsaneiHashook, SupermarketChain):
|
||||
class Victory(MahsaneiHashook):
|
||||
pass
|
||||
|
35
chains/yeinot_bitan.py
Normal file
35
chains/yeinot_bitan.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import numpy as np
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class YeinotBitan(SupermarketChain):
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
@staticmethod
|
||||
def get_download_url_or_path(
|
||||
store_id: int,
|
||||
category: SupermarketChain.XMLFilesCategory,
|
||||
session: requests.Session,
|
||||
) -> str:
|
||||
today_date_suffix = datetime.today().date().strftime("%Y%m%d")
|
||||
url = f"http://publishprice.ybitan.co.il/{today_date_suffix}/"
|
||||
req_res = requests.get(url)
|
||||
soup = BeautifulSoup(req_res.text, features="lxml")
|
||||
promo_tags = soup.findAll(
|
||||
"a",
|
||||
attrs={
|
||||
"href": re.compile(
|
||||
rf"^{category.name.replace('s', '')}.*-{store_id:04d}-"
|
||||
)
|
||||
},
|
||||
)
|
||||
most_recent_tag_ind = np.argmax(
|
||||
[int(promo_tag["href"][-7:-3]) for promo_tag in promo_tags]
|
||||
)
|
||||
return url + promo_tags[most_recent_tag_ind]["href"]
|
@@ -1,6 +1,9 @@
|
||||
from chains.cerberus_web_client import CerberusWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class Yohananof(CerberusWebClient, SupermarketChain):
|
||||
_date_hour_format = '%Y-%m-%d %H:%M:%S'
|
||||
class Yohananof(CerberusWebClient):
|
||||
@property
|
||||
def username(self):
|
||||
return "yohananof"
|
||||
|
||||
_date_hour_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from chains.binaproject_web_client import BinaProjectWebClient
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
|
||||
class ZolVebegadol(BinaProjectWebClient, SupermarketChain):
|
||||
class ZolVebegadol(BinaProjectWebClient):
|
||||
pass
|
||||
|
35
item.py
35
item.py
@@ -1,13 +1,44 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
from bs4.element import Tag
|
||||
|
||||
|
||||
class Item:
|
||||
"""
|
||||
A class representing a product in some supermarket.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, price: float, manufacturer: str, code: str):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
price: float,
|
||||
price_by_measure: float,
|
||||
code: str,
|
||||
manufacturer: str,
|
||||
):
|
||||
self.name: str = name
|
||||
self.price: float = price
|
||||
self.final_price: float = price
|
||||
self.price_by_measure = price_by_measure
|
||||
self.manufacturer: str = manufacturer
|
||||
self.code: str = code
|
||||
|
||||
@classmethod
|
||||
def from_tag(cls, item: Tag):
|
||||
"""
|
||||
This method creates an Item instance from an xml tag.
|
||||
"""
|
||||
return cls(
|
||||
name=item.find(re.compile(r"ItemN[a]?m[e]?")).text,
|
||||
price=float(item.find("ItemPrice").text),
|
||||
price_by_measure=float(item.find("UnitOfMeasurePrice").text),
|
||||
code=item.find("ItemCode").text,
|
||||
manufacturer=item.find(re.compile(r"Manufacture[r]?Name")).text,
|
||||
)
|
||||
|
||||
def to_json(self):
|
||||
return json.dumps(self, default=lambda o: o.__dict__)
|
||||
|
||||
def __repr__(self):
|
||||
return str((self.name, self.price, self.manufacturer, self.code))
|
||||
return f"\nשם: {self.name}\nמחיר: {self.price}\nיצרן: {self.manufacturer}\nקוד: {self.code}\n"
|
||||
|
290
main.py
290
main.py
@@ -1,105 +1,225 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
|
||||
from promotion import main_latest_promos, get_promos_by_name
|
||||
from store_utils import get_store_id
|
||||
from utils import RESULTS_DIRNAME, RAW_FILES_DIRNAME, get_products_prices
|
||||
from chains.bareket import Bareket
|
||||
from chains.co_op import CoOp
|
||||
from chains.dor_alon import DorAlon
|
||||
from chains.freshmarket import Freshmarket
|
||||
from chains.hazi_hinam import HaziHinam
|
||||
from chains.keshet import Keshet
|
||||
from chains.king_store import KingStore
|
||||
from chains.maayan2000 import Maayan2000
|
||||
from chains.mahsaneiHashook import MahsaneiHashook
|
||||
from chains.osher_ad import OsherAd
|
||||
from chains.rami_levi import RamiLevi
|
||||
from chains.shefa_birkat_hashem import ShefaBirkatHashem
|
||||
from chains.shufersal import Shufersal
|
||||
from chains.shuk_hayir import ShukHayir
|
||||
from chains.stop_market import StopMarket
|
||||
from chains.tiv_taam import TivTaam
|
||||
from chains.victory import Victory
|
||||
from chains.yeinot_bitan import YeinotBitan
|
||||
from chains.yohananof import Yohananof
|
||||
from chains.zol_vebegadol import ZolVebegadol
|
||||
from promotion import main_latest_promos, log_promos_by_name, get_all_prices
|
||||
from store_utils import log_stores_ids
|
||||
from supermarket_chain import SupermarketChain
|
||||
from chains import (
|
||||
bareket,
|
||||
mahsaneiHashook,
|
||||
dor_alon,
|
||||
freshmarket,
|
||||
hazi_hinam,
|
||||
keshet,
|
||||
stop_market,
|
||||
tiv_taam,
|
||||
shufersal,
|
||||
co_op,
|
||||
victory,
|
||||
yohananof,
|
||||
zol_vebegadol,
|
||||
rami_levi,
|
||||
osher_ad,
|
||||
maayan2000,
|
||||
shuk_hayir,
|
||||
king_store,
|
||||
shefa_birkat_hashem,
|
||||
from utils import (
|
||||
RESULTS_DIRNAME,
|
||||
RAW_FILES_DIRNAME,
|
||||
VALID_PROMOTION_FILE_EXTENSIONS,
|
||||
log_products_prices,
|
||||
valid_promotion_output_file,
|
||||
is_valid_promotion_output_file,
|
||||
)
|
||||
|
||||
# TODO: fix problem of left-to-right printing
|
||||
|
||||
CHAINS_LIST = [
|
||||
Bareket,
|
||||
MahsaneiHashook,
|
||||
DorAlon,
|
||||
Freshmarket,
|
||||
HaziHinam,
|
||||
Keshet,
|
||||
StopMarket,
|
||||
TivTaam,
|
||||
Shufersal,
|
||||
CoOp,
|
||||
Victory,
|
||||
Yohananof,
|
||||
ZolVebegadol,
|
||||
RamiLevi,
|
||||
OsherAd,
|
||||
Maayan2000,
|
||||
ShukHayir,
|
||||
KingStore,
|
||||
ShefaBirkatHashem,
|
||||
YeinotBitan,
|
||||
]
|
||||
Path(RESULTS_DIRNAME).mkdir(exist_ok=True)
|
||||
Path(RAW_FILES_DIRNAME).mkdir(exist_ok=True)
|
||||
|
||||
chain_dict = {repr(chain): chain() if callable(chain) else None for chain in SupermarketChain.__subclasses__()}
|
||||
CHAINS_DICT = {
|
||||
repr(chain): chain() if callable(chain) else None for chain in CHAINS_LIST
|
||||
}
|
||||
|
||||
if __name__ == '__main__':
|
||||
# TODO: change functions arguments to include all necessary parameters (e.g. chain) or split arguments
|
||||
if __name__ == "__main__":
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument('--promos',
|
||||
help="generates a CSV file with all the promotions in the requested store",
|
||||
metavar='store_id',
|
||||
nargs=1,
|
||||
type=SupermarketChain.store_id_type,
|
||||
)
|
||||
parser.add_argument('--find_promos_by_name',
|
||||
help="prints all promos containing the given promo_name in the given store",
|
||||
metavar=('store_id', 'promo_name'),
|
||||
nargs=2,
|
||||
# type=store_id_type, # TODO: add type-checking of first parameter
|
||||
)
|
||||
parser.add_argument('--price',
|
||||
help='prints all products that contain the given name in the requested store',
|
||||
metavar=('store_id', 'product_name'),
|
||||
nargs=2,
|
||||
)
|
||||
parser.add_argument('--find_store_id',
|
||||
help='prints all Shufersal stores in a given city. Input should be a city name in Hebrew',
|
||||
metavar='city',
|
||||
nargs=1,
|
||||
)
|
||||
# parser.add_argument('--all_deals',
|
||||
# action='store_true',
|
||||
# )
|
||||
parser.add_argument('--load_prices',
|
||||
help='boolean flag representing whether to load an existing price XML file',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument('--load_promos',
|
||||
help='boolean flag representing whether to load an existing promo XML file',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument('--load_stores',
|
||||
help='boolean flag representing whether to load an existing stores XML file',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument('--chain',
|
||||
required=True,
|
||||
help='The name of the requested chain',
|
||||
choices=chain_dict.keys(),
|
||||
)
|
||||
parser.add_argument('--type',
|
||||
choices=("excel", "csv"),
|
||||
default='excel',
|
||||
|
||||
help="a switch flag to set set the output file to a CSV file",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--promos",
|
||||
help="generates a CSV file with all the promotions in the requested store",
|
||||
metavar="store_id",
|
||||
nargs=1,
|
||||
type=SupermarketChain.store_id_type,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--find_promos_by_name",
|
||||
help="prints all promos containing the given promo_name in the given store",
|
||||
metavar=("store_id", "promo_name"),
|
||||
nargs=2,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--price",
|
||||
help="prints all products that contain the given name in the requested store",
|
||||
metavar=("store_id", "product_name"),
|
||||
nargs=2,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--prices-with-promos",
|
||||
help="logs all products with prices updated by promos",
|
||||
metavar="store_id",
|
||||
nargs=1,
|
||||
type=SupermarketChain.store_id_type,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--find_store_id",
|
||||
help="prints all Shufersal stores in a given city. Input should be a city name in Hebrew",
|
||||
metavar="city",
|
||||
nargs=1,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--load_prices",
|
||||
help="boolean flag representing whether to load an existing price XML file",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--load_promos",
|
||||
help="boolean flag representing whether to load an existing promo XML file",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--load_stores",
|
||||
help="boolean flag representing whether to load an existing stores XML file",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--chain",
|
||||
required=True,
|
||||
help="The name of the requested chain",
|
||||
choices=CHAINS_DICT.keys(),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output_filename",
|
||||
help="The path to write the promotions/prices to",
|
||||
type=valid_promotion_output_file,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--only_export_to_file",
|
||||
help="Boolean flag representing whether only export or also open the promotion output file",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug",
|
||||
help="Boolean flag representing whether to run in debug mode",
|
||||
action="store_true",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
file_type = '.xlsx' if not args.type or args.type == 'excel' else '.csv'
|
||||
chain: SupermarketChain = chain_dict[args.chain]
|
||||
if args.promos:
|
||||
arg_store_id = int(args.promos[0])
|
||||
main_latest_promos(store_id=arg_store_id, load_xml=args.load_prices, chain=chain, load_promos=args.load_promos, file_type=file_type)
|
||||
if args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s")
|
||||
|
||||
chain: SupermarketChain = CHAINS_DICT[args.chain]
|
||||
|
||||
if args.promos or args.prices_with_promos:
|
||||
arg_store_id = (
|
||||
int(args.promos[0]) if args.promos else int(args.prices_with_promos[0])
|
||||
)
|
||||
|
||||
if args.output_filename:
|
||||
output_filename = args.output_filename
|
||||
if args.promos and not is_valid_promotion_output_file(output_filename):
|
||||
raise ValueError(
|
||||
f"Output filename for promos must end with: {VALID_PROMOTION_FILE_EXTENSIONS}"
|
||||
)
|
||||
if args.prices_with_promos and not output_filename.endswith(".json"):
|
||||
raise ValueError(f"Output filename for promos must be a json file")
|
||||
directory = os.path.dirname(output_filename)
|
||||
Path(directory).mkdir(parents=True, exist_ok=True)
|
||||
else:
|
||||
Path(RESULTS_DIRNAME).mkdir(exist_ok=True)
|
||||
file_extension = ".xlsx" if args.promos else ".json"
|
||||
file_type = "promos" if args.promos else "prices"
|
||||
output_filename = f"{RESULTS_DIRNAME}/{repr(type(chain))}-{file_type}-{arg_store_id}-{date.today()}{file_extension}"
|
||||
|
||||
if args.promos:
|
||||
main_latest_promos(
|
||||
store_id=arg_store_id,
|
||||
output_filename=output_filename,
|
||||
chain=chain,
|
||||
load_promos=args.load_promos,
|
||||
load_prices=args.load_prices,
|
||||
)
|
||||
else:
|
||||
items_dict = get_all_prices(
|
||||
store_id=arg_store_id,
|
||||
output_filename=output_filename,
|
||||
chain=chain,
|
||||
load_promos=args.load_promos,
|
||||
load_prices=args.load_prices,
|
||||
)
|
||||
items_dict_to_json = {
|
||||
item_code: {
|
||||
k: v
|
||||
for k, v in item.__dict__.items()
|
||||
if not k.startswith("__") and not callable(k)
|
||||
}
|
||||
for item_code, item in items_dict.items()
|
||||
}
|
||||
|
||||
with open(output_filename, "w") as fOut:
|
||||
json.dump(items_dict_to_json, fOut)
|
||||
|
||||
if not args.only_export_to_file:
|
||||
opener = "open" if sys.platform == "darwin" else "xdg-open"
|
||||
subprocess.call([opener, Path(output_filename)])
|
||||
# os.startfile(Path(output_filename))
|
||||
logging.debug(f"Process finished at: {datetime.now()}")
|
||||
|
||||
elif args.price:
|
||||
get_products_prices(chain, store_id=args.price[0], load_xml=args.load_prices, product_name=args.price[1])
|
||||
log_products_prices(
|
||||
chain,
|
||||
store_id=args.price[0],
|
||||
load_xml=args.load_prices,
|
||||
product_name=args.price[1],
|
||||
)
|
||||
|
||||
elif args.find_store_id:
|
||||
arg_city = args.find_store_id[0]
|
||||
get_store_id(city=arg_city, load_xml=args.load_stores, chain=chain)
|
||||
log_stores_ids(city=arg_city, load_xml=args.load_stores, chain=chain)
|
||||
|
||||
elif args.find_promos_by_name:
|
||||
arg_store_id = int(args.find_promos_by_name[0])
|
||||
get_promos_by_name(store_id=arg_store_id, chain=chain, promo_name=args.find_promos_by_name[1],
|
||||
load_prices=args.load_prices, load_promos=args.load_promos)
|
||||
log_promos_by_name(
|
||||
store_id=arg_store_id,
|
||||
chain=chain,
|
||||
promo_name=args.find_promos_by_name[1],
|
||||
load_prices=args.load_prices,
|
||||
load_promos=args.load_promos,
|
||||
)
|
||||
|
467
promotion.py
467
promotion.py
@@ -1,28 +1,72 @@
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Union
|
||||
from bs4.element import Tag
|
||||
import csv
|
||||
import sys
|
||||
import pandas as pd
|
||||
import xlsxwriter
|
||||
from tqdm import tqdm
|
||||
from aenum import Enum
|
||||
|
||||
from item import Item
|
||||
from utils import (
|
||||
create_items_dict,
|
||||
get_float_from_tag, xml_file_gen,
|
||||
create_bs_object,
|
||||
create_items_dict,
|
||||
get_float_from_tag,
|
||||
log_message_and_time_if_debug,
|
||||
xml_file_gen,
|
||||
)
|
||||
from supermarket_chain import SupermarketChain
|
||||
import pandas as pd
|
||||
from utils import (create_bs_object, create_items_dict, get_float_from_tag,
|
||||
xml_file_gen)
|
||||
|
||||
XML_FILES_PROMOTIONS_CATEGORIES = [
|
||||
SupermarketChain.XMLFilesCategory.PromosFull,
|
||||
SupermarketChain.XMLFilesCategory.Promos,
|
||||
]
|
||||
|
||||
PROMOTION_COLS_NUM = (
|
||||
15 # The length of the list returned by get_promotion_row_for_table function
|
||||
)
|
||||
|
||||
INVALID_OR_UNKNOWN_PROMOTION_FUNCTION = -1
|
||||
|
||||
PRODUCTS_TO_IGNORE = ['סירים', 'מגבות', 'מגבת', 'מפות', 'פסטיגל', 'ביגי']
|
||||
PROMOTIONS_TABLE_HEADERS = [
|
||||
"תיאור מבצע",
|
||||
"הפריט המשתתף במבצע",
|
||||
"מחיר לפני מבצע",
|
||||
"מחיר אחרי מבצע",
|
||||
"אחוז הנחה",
|
||||
"סוג מבצע",
|
||||
"כמות מקס",
|
||||
"כפל הנחות",
|
||||
"המבצע החל",
|
||||
"זמן תחילת מבצע",
|
||||
"זמן סיום מבצע",
|
||||
"זמן עדכון אחרון",
|
||||
"יצרן",
|
||||
"ברקוד פריט",
|
||||
"סוג מבצע לפי תקנות שקיפות מחירים",
|
||||
]
|
||||
|
||||
|
||||
class ClubID(Enum):
|
||||
מבצע_רגיל = 0
|
||||
מועדון = 1
|
||||
כרטיס_אשראי = 2
|
||||
אחר = 3
|
||||
_init_ = "value string"
|
||||
|
||||
REGULAR = 0, "מבצע רגיל"
|
||||
CLUB = 1, "מועדון"
|
||||
CREDIT_CARD = 2, "כרטיס אשראי"
|
||||
OTHER = 3, "אחר"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value):
|
||||
return ClubID.OTHER
|
||||
|
||||
def __str__(self):
|
||||
return self.string
|
||||
|
||||
|
||||
class RewardType(Enum):
|
||||
@@ -35,6 +79,7 @@ class RewardType(Enum):
|
||||
SECOND_INSTANCE_SAME_DISCOUNT = 8
|
||||
SECOND_INSTANCE_DIFFERENT_DISCOUNT = 9
|
||||
DISCOUNT_IN_MULTIPLE_INSTANCES = 10
|
||||
OTHER = 11
|
||||
|
||||
|
||||
class Promotion:
|
||||
@@ -43,9 +88,20 @@ class Promotion:
|
||||
It contains only part of the available information in Shufersal's data.
|
||||
"""
|
||||
|
||||
def __init__(self, content: str, start_date: datetime, end_date: datetime, update_date: datetime, items: List[Item],
|
||||
promo_func: callable, club_id: ClubID, promotion_id: float, max_qty: int,
|
||||
allow_multiple_discounts: bool, reward_type: RewardType, type_file: str = "excel"):
|
||||
def __init__(
|
||||
self,
|
||||
content: str,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
update_date: datetime,
|
||||
items: List[Item],
|
||||
promo_func: callable,
|
||||
club_id: ClubID,
|
||||
promotion_id: int,
|
||||
max_qty: int,
|
||||
allow_multiple_discounts: bool,
|
||||
reward_type: RewardType,
|
||||
):
|
||||
self.content: str = content
|
||||
self.start_date: datetime = start_date
|
||||
self.end_date: datetime = end_date
|
||||
@@ -54,148 +110,206 @@ class Promotion:
|
||||
self.items: List[Item] = items
|
||||
self.club_id: ClubID = club_id
|
||||
self.max_qty: int = max_qty
|
||||
self.allow_multiple_discounts = allow_multiple_discounts
|
||||
self.reward_type = reward_type
|
||||
self.promotion_id = promotion_id
|
||||
self.type_file = type_file
|
||||
self.allow_multiple_discounts: bool = allow_multiple_discounts
|
||||
self.reward_type: RewardType = reward_type
|
||||
self.promotion_id: int = promotion_id
|
||||
|
||||
def repr_ltr(self):
|
||||
title = self.content
|
||||
dates_range = f"Between {self.start_date} and {self.end_date}"
|
||||
update_line = f"Updated at {self.update_date}"
|
||||
return '\n'.join([title, dates_range, update_line, str(self.items)]) + '\n'
|
||||
return "\n".join([title, dates_range, update_line, str(self.items)]) + "\n"
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.promotion_id == other.promotion_id
|
||||
|
||||
|
||||
def write_promotions_to_csv(promotions: List[Promotion], output_filename: str) -> None:
|
||||
def write_promotions_to_table(
|
||||
promotions: List[Promotion], output_filename: str
|
||||
) -> None:
|
||||
"""
|
||||
This function writes a given list of promotions to a given output file in a CSV format.
|
||||
This function writes a List of promotions to a csv or xlsx output file.
|
||||
|
||||
:param promotions: A given list of promotions
|
||||
:param output_filename: A given file to write to
|
||||
"""
|
||||
encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8"
|
||||
columns = [
|
||||
'תיאור מבצע',
|
||||
'הפריט המשתתף במבצע',
|
||||
'מחיר לפני מבצע',
|
||||
'מחיר אחרי מבצע',
|
||||
'אחוז הנחה',
|
||||
'סוג מבצע',
|
||||
'כמות מקס',
|
||||
'כפל הנחות',
|
||||
'המבצע החל',
|
||||
'זמן תחילת מבצע',
|
||||
'זמן סיום מבצע',
|
||||
'זמן עדכון אחרון',
|
||||
'יצרן',
|
||||
'ברקוד פריט',
|
||||
'סוג מבצע לפי תקנות שקיפות מחירים',
|
||||
log_message_and_time_if_debug("Writing promotions to output file")
|
||||
rows = [
|
||||
get_promotion_row_for_table(promo, item)
|
||||
for promo in promotions
|
||||
for item in promo.items
|
||||
]
|
||||
if output_filename.endswith(".csv"):
|
||||
with open(output_filename, mode='w', newline='', encoding=encoding_file) as f_out:
|
||||
encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8"
|
||||
with open(
|
||||
output_filename, mode="w", newline="", encoding=encoding_file
|
||||
) as f_out:
|
||||
promos_writer = csv.writer(f_out)
|
||||
promos_writer.writerow(columns)
|
||||
for promo in promotions:
|
||||
promos_writer.writerows([get_promotion_row_in_csv(promo, item) for item in promo.items])
|
||||
promos_writer.writerow(PROMOTIONS_TABLE_HEADERS)
|
||||
promos_writer.writerows(rows)
|
||||
|
||||
elif output_filename.endswith(".xlsx"):
|
||||
df = pd.DataFrame(rows, columns=PROMOTIONS_TABLE_HEADERS)
|
||||
workbook = xlsxwriter.Workbook(output_filename)
|
||||
worksheet1 = workbook.add_worksheet()
|
||||
worksheet1.right_to_left()
|
||||
date_time_format = workbook.add_format({"num_format": "m/d/yy h:mm;@"})
|
||||
number_format = workbook.add_format({"num_format": "0.00"})
|
||||
percentage_format = workbook.add_format({"num_format": "0.00%"})
|
||||
worksheet1.set_column("A:A", width=35)
|
||||
worksheet1.set_column("B:B", width=25)
|
||||
worksheet1.set_column("C:D", cell_format=number_format)
|
||||
worksheet1.set_column("E:E", cell_format=percentage_format)
|
||||
worksheet1.set_column("J:L", width=15, cell_format=date_time_format)
|
||||
worksheet1.add_table(
|
||||
first_row=0,
|
||||
first_col=0,
|
||||
last_row=len(df),
|
||||
last_col=len(df.columns) - 1,
|
||||
options={
|
||||
"columns": [{"header": i} for i in PROMOTIONS_TABLE_HEADERS],
|
||||
"data": df.values.tolist(),
|
||||
"style": "Table Style Medium 11",
|
||||
},
|
||||
)
|
||||
workbook.close()
|
||||
|
||||
else:
|
||||
with pd.ExcelWriter(output_filename, 'openpyxl', datetime_format='DD/MM/YYYY') as xl:
|
||||
dt = pd.DataFrame(columns=columns)
|
||||
for promo in promotions:
|
||||
prms = dict_promos([get_promotion_row_in_csv(promo, item) for item in promo.items], columns)
|
||||
if prms:
|
||||
dt = dt.append(prms, True)
|
||||
else:
|
||||
continue
|
||||
dt.to_excel(xl, index=False, sheet_name="name")
|
||||
raise ValueError(
|
||||
f"The given output file has an invalid extension:\n{output_filename}"
|
||||
)
|
||||
|
||||
|
||||
def dict_promos(promos: list, columns: list):
|
||||
return {col: p for prom in promos for col, p in zip(columns, prom)}
|
||||
def get_promotion_row_for_table(promo: Promotion, item: Item) -> List:
|
||||
"""
|
||||
This function returns a row in the promotions XLSX table.
|
||||
|
||||
:param promo: A given Promotion object
|
||||
:param item: A given item object participating in the promotion
|
||||
"""
|
||||
return [
|
||||
promo.content,
|
||||
item.name,
|
||||
item.price,
|
||||
promo.promo_func(item),
|
||||
(item.price - promo.promo_func(item)) / max(item.price, 1),
|
||||
promo.club_id.string,
|
||||
promo.max_qty,
|
||||
promo.allow_multiple_discounts,
|
||||
promo.start_date <= datetime.now(),
|
||||
promo.start_date,
|
||||
promo.end_date,
|
||||
promo.update_date,
|
||||
item.manufacturer,
|
||||
item.code,
|
||||
promo.reward_type.value,
|
||||
]
|
||||
|
||||
|
||||
def get_promotion_row_in_csv(promo: Promotion, item: Item):
|
||||
return [promo.content,
|
||||
item.name,
|
||||
item.price,
|
||||
f'{promo.promo_func(item):.3f}',
|
||||
f'{(item.price - promo.promo_func(item)) / item.price:.3%}',
|
||||
promo.club_id.name.replace('_', ' '),
|
||||
promo.max_qty,
|
||||
promo.allow_multiple_discounts,
|
||||
promo.start_date <= datetime.now(),
|
||||
promo.start_date,
|
||||
promo.end_date,
|
||||
promo.update_date,
|
||||
item.manufacturer,
|
||||
item.code,
|
||||
promo.reward_type.value]
|
||||
|
||||
|
||||
def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bool, load_promos) -> List[Promotion]:
|
||||
def get_available_promos(
|
||||
chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool
|
||||
) -> List[Promotion]:
|
||||
"""
|
||||
This function return the available promotions given a BeautifulSoup object.
|
||||
|
||||
:param load_promos:
|
||||
:param chain: The name of the requested supermarket chain
|
||||
:param store_id: A given store id
|
||||
:param load_prices: A boolean representing whether to load an existing xml or load an already saved one
|
||||
:param store_id: A given store ID
|
||||
:param load_prices: A boolean representing whether to load an existing prices file or download it
|
||||
:param load_promos: A boolean representing whether to load an existing promotion file or download it
|
||||
:return: Promotions that are not included in PRODUCTS_TO_IGNORE and are currently available
|
||||
"""
|
||||
items_dict: Dict[str, Item] = create_items_dict(chain, load_prices, store_id)
|
||||
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PromosFull.name)
|
||||
bs_promos = create_bs_object(xml_path, chain, store_id, load_promos, chain.XMLFilesCategory.PromosFull)
|
||||
log_message_and_time_if_debug("Importing prices XML file")
|
||||
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
|
||||
log_message_and_time_if_debug("Importing promotions XML file")
|
||||
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
|
||||
|
||||
log_message_and_time_if_debug("Creating promotions objects")
|
||||
promo_objs = list()
|
||||
for promo in bs_promos.find_all(chain.promotion_tag_name):
|
||||
promotion_id = promo.find(re.compile('PromotionId', re.IGNORECASE))
|
||||
for promo in tqdm(promo_tags, desc="creating_promotions"):
|
||||
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
|
||||
if promo_objs and promo_objs[-1].promotion_id == promotion_id:
|
||||
promo_objs[-1].items.extend(chain.get_items(promo, items_dict))
|
||||
continue
|
||||
|
||||
promo_inst = create_new_promo_instance(chain, items_dict, promo, promotion_id)
|
||||
if len(promo_inst.items) > 1000: # Too many items -> probably illegal promotion
|
||||
continue
|
||||
if promo_inst:
|
||||
promo_objs.append(promo_inst)
|
||||
|
||||
return promo_objs
|
||||
|
||||
|
||||
def create_new_promo_instance(chain, items_dict, promo, promotion_id):
|
||||
def create_new_promo_instance(
|
||||
chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int
|
||||
) -> Union[Promotion, None]:
|
||||
"""
|
||||
This function generates a Promotion object from a promotion tag.
|
||||
|
||||
:param chain: The supermarket chain publishing the promotion
|
||||
:param items_dict: A dictionary of items that might participate in the promotion
|
||||
:param promo: An xml Tag representing the promotion
|
||||
:param promotion_id: An integer representing the promotion ID
|
||||
:return: If the promotion expired - return None, else return the Promotion object
|
||||
"""
|
||||
promo_end_time = datetime.strptime(
|
||||
promo.find("PromotionEndDate").text + " " + promo.find("PromotionEndHour").text,
|
||||
chain.date_hour_format,
|
||||
)
|
||||
if promo_end_time < datetime.now():
|
||||
return None
|
||||
|
||||
reward_type = RewardType(int(promo.find("RewardType").text))
|
||||
discounted_price = get_discounted_price(promo)
|
||||
promo_description = promo.find('PromotionDescription').text
|
||||
is_discount_in_percentage = reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
|
||||
raw_discount_rate = promo.find('DiscountRate').text if promo.find('DiscountRate') else None
|
||||
promo_description = promo.find("PromotionDescription").text
|
||||
is_discount_in_percentage = (
|
||||
reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
|
||||
)
|
||||
raw_discount_rate = (
|
||||
promo.find("DiscountRate").text if promo.find("DiscountRate") else None
|
||||
)
|
||||
discount_rate = get_discount_rate(raw_discount_rate, is_discount_in_percentage)
|
||||
min_qty = get_float_from_tag(promo, 'MinQty')
|
||||
max_qty = get_float_from_tag(promo, 'MaxQty')
|
||||
min_qty = get_float_from_tag(promo, "MinQty")
|
||||
max_qty = get_float_from_tag(promo, "MaxQty")
|
||||
remark = promo.find("Remark")
|
||||
promo_func = find_promo_function(reward_type=reward_type, remark=remark.text if remark else '',
|
||||
promo_description=promo_description, min_qty=min_qty,
|
||||
discount_rate=discount_rate, discounted_price=discounted_price)
|
||||
promo_start_time = datetime.strptime(promo.find('PromotionStartDate').text + ' ' +
|
||||
promo.find('PromotionStartHour').text,
|
||||
chain.date_hour_format)
|
||||
promo_end_time = datetime.strptime(promo.find('PromotionEndDate').text + ' ' +
|
||||
promo.find('PromotionEndHour').text,
|
||||
chain.date_hour_format)
|
||||
promo_update_time = datetime.strptime(promo.find(chain.promotion_update_tag_name).text,
|
||||
chain.update_date_format)
|
||||
club_id = ClubID(int(promo.find(re.compile('ClubId', re.IGNORECASE)).text))
|
||||
multiple_discounts_allowed = bool(int(promo.find('AllowMultipleDiscounts').text))
|
||||
promo_func = find_promo_function(
|
||||
reward_type=reward_type,
|
||||
remark=remark.text if remark else "",
|
||||
promo_description=promo_description,
|
||||
min_qty=min_qty,
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
promo_start_time = datetime.strptime(
|
||||
promo.find("PromotionStartDate").text
|
||||
+ " "
|
||||
+ promo.find("PromotionStartHour").text,
|
||||
chain.date_hour_format,
|
||||
)
|
||||
promo_update_time = datetime.strptime(
|
||||
promo.find(chain.promotion_update_tag_name).text, chain.update_date_format
|
||||
)
|
||||
club_id = ClubID(int(promo.find(re.compile("ClubId", re.IGNORECASE)).text))
|
||||
multiple_discounts_allowed = bool(int(promo.find("AllowMultipleDiscounts").text))
|
||||
items = chain.get_items(promo, items_dict)
|
||||
|
||||
if is_valid_promo(end_time=promo_end_time, description=promo_description):
|
||||
return Promotion(content=promo_description, start_date=promo_start_time, end_date=promo_end_time,
|
||||
update_date=promo_update_time, items=items, promo_func=promo_func,
|
||||
club_id=club_id, promotion_id=promotion_id, max_qty=max_qty,
|
||||
allow_multiple_discounts=multiple_discounts_allowed, reward_type=reward_type)
|
||||
return Promotion(
|
||||
content=promo_description,
|
||||
start_date=promo_start_time,
|
||||
end_date=promo_end_time,
|
||||
update_date=promo_update_time,
|
||||
items=items,
|
||||
promo_func=promo_func,
|
||||
club_id=club_id,
|
||||
promotion_id=promotion_id,
|
||||
max_qty=max_qty,
|
||||
allow_multiple_discounts=multiple_discounts_allowed,
|
||||
reward_type=reward_type,
|
||||
)
|
||||
|
||||
|
||||
def get_discounted_price(promo):
|
||||
discounted_price = promo.find('DiscountedPrice')
|
||||
discounted_price = promo.find("DiscountedPrice")
|
||||
if discounted_price:
|
||||
return float(discounted_price.text)
|
||||
|
||||
@@ -203,12 +317,18 @@ def get_discounted_price(promo):
|
||||
def get_discount_rate(discount_rate: Union[float, None], discount_in_percentage: bool):
|
||||
if discount_rate:
|
||||
if discount_in_percentage:
|
||||
return int(discount_rate) * (10 ** -(len(str(discount_rate))))
|
||||
return float(discount_rate) * (10 ** -(len(str(discount_rate))))
|
||||
return float(discount_rate)
|
||||
|
||||
|
||||
def find_promo_function(reward_type: RewardType, remark: str, promo_description: str, min_qty: float,
|
||||
discount_rate: Union[float, None], discounted_price: Union[float, None]):
|
||||
def find_promo_function(
|
||||
reward_type: RewardType,
|
||||
remark: str,
|
||||
promo_description: str,
|
||||
min_qty: float,
|
||||
discount_rate: Union[float, None],
|
||||
discounted_price: Union[float, None],
|
||||
):
|
||||
if reward_type == RewardType.SECOND_INSTANCE_DIFFERENT_DISCOUNT:
|
||||
if not discounted_price:
|
||||
return lambda item: item.price * (1 - (discount_rate / min_qty))
|
||||
@@ -221,7 +341,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
|
||||
return lambda item: item.price * (1 - (1 / min_qty))
|
||||
|
||||
if reward_type == RewardType.DISCOUNT_IN_PERCENTAGE:
|
||||
return lambda item: item.price * (1 - discount_rate / (2 if "השני ב" in promo_description else 1))
|
||||
return lambda item: item.price * (
|
||||
1 - discount_rate / (2 if "השני ב" in promo_description else 1)
|
||||
)
|
||||
|
||||
if reward_type == RewardType.SECOND_INSTANCE_SAME_DISCOUNT:
|
||||
if "השני ב" in promo_description:
|
||||
@@ -231,6 +353,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
|
||||
if reward_type == RewardType.DISCOUNT_BY_THRESHOLD:
|
||||
return lambda item: item.price - discount_rate
|
||||
|
||||
if reward_type == RewardType.OTHER:
|
||||
return lambda item: item.price
|
||||
|
||||
if 'מחיר המבצע הינו המחיר לק"ג' in remark:
|
||||
return lambda item: discounted_price
|
||||
|
||||
@@ -240,34 +365,78 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
|
||||
return lambda item: INVALID_OR_UNKNOWN_PROMOTION_FUNCTION
|
||||
|
||||
|
||||
def is_valid_promo(end_time: datetime, description) -> bool:
|
||||
"""
|
||||
This function returns whether a given Promotion object is currently valid.
|
||||
"""
|
||||
not_expired: bool = end_time >= datetime.now()
|
||||
in_promo_ignore_list: bool = any(product in description for product in PRODUCTS_TO_IGNORE)
|
||||
return not_expired and not in_promo_ignore_list
|
||||
|
||||
|
||||
def main_latest_promos(
|
||||
store_id: int, load_xml: bool, chain: SupermarketChain, load_promos: bool, file_type: str) -> None:
|
||||
store_id: int,
|
||||
output_filename,
|
||||
chain: SupermarketChain,
|
||||
load_promos: bool,
|
||||
load_prices: bool,
|
||||
) -> None:
|
||||
"""
|
||||
This function writes to a CSV file the available promotions in a store with a given id sorted by their update date.
|
||||
This function writes to a file the available promotions in a store with a given id sorted by their update date.
|
||||
|
||||
:param chain: The name of the requested supermarket chain
|
||||
:param store_id: A given store id
|
||||
:param load_xml: A boolean representing whether to load an existing prices xml file
|
||||
:param load_prices: A boolean representing whether to load an existing prices xml file
|
||||
:param load_promos: A boolean representing whether to load an existing promos xml file
|
||||
:param output_filename: A path to write the promotions table
|
||||
"""
|
||||
|
||||
promotions: List[Promotion] = get_available_promos(chain, store_id, load_xml, load_promos)
|
||||
promotions.sort(key=lambda promo: (max(promo.update_date.date(), promo.start_date.date()), promo.start_date -
|
||||
promo.end_date), reverse=True)
|
||||
ex_file = f'results/{repr(type(chain))}_promos_{store_id}{file_type}'
|
||||
write_promotions_to_csv(promotions, ex_file)
|
||||
promotions: List[Promotion] = get_available_promos(
|
||||
chain, store_id, load_prices, load_promos
|
||||
)
|
||||
promotions.sort(
|
||||
key=lambda promo: (
|
||||
max(promo.update_date.date(), promo.start_date.date()),
|
||||
promo.start_date - promo.end_date,
|
||||
),
|
||||
reverse=True,
|
||||
)
|
||||
write_promotions_to_table(promotions, output_filename)
|
||||
|
||||
|
||||
def get_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, load_prices: bool, load_promos: bool):
|
||||
def get_all_prices(
|
||||
store_id: int,
|
||||
output_filename,
|
||||
chain: SupermarketChain,
|
||||
load_promos: bool,
|
||||
load_prices: bool,
|
||||
):
|
||||
log_message_and_time_if_debug("Importing prices XML file")
|
||||
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
|
||||
log_message_and_time_if_debug("Importing promotions XML file")
|
||||
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
|
||||
|
||||
log_message_and_time_if_debug("Creating promotions objects")
|
||||
promo_obj = None
|
||||
for promo in tqdm(promo_tags, desc="creating_promotions"):
|
||||
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
|
||||
|
||||
if promo_obj is None or promo_obj.promotion_id != promotion_id:
|
||||
promo_obj = create_new_promo_instance(
|
||||
chain, items_dict, promo, promotion_id
|
||||
)
|
||||
if promo_obj.club_id == ClubID.REGULAR:
|
||||
promo_items = promo.find_all("Item")
|
||||
if len(promo_items) > 1000: # Too many items -> probably illegal promotion
|
||||
continue
|
||||
for item in promo_items:
|
||||
item_code = item.find("ItemCode").text
|
||||
cur_item = items_dict.get(item_code)
|
||||
if cur_item is not None:
|
||||
discounted_price = promo_obj.promo_func(cur_item)
|
||||
if cur_item.price > discounted_price:
|
||||
cur_item.final_price = discounted_price
|
||||
|
||||
return items_dict
|
||||
|
||||
|
||||
def log_promos_by_name(
|
||||
store_id: int,
|
||||
chain: SupermarketChain,
|
||||
promo_name: str,
|
||||
load_prices: bool,
|
||||
load_promos: bool,
|
||||
):
|
||||
"""
|
||||
This function prints all promotions in a given chain and store_id containing a given promo_name.
|
||||
|
||||
@@ -277,23 +446,49 @@ def get_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str,
|
||||
:param load_prices: A boolean representing whether to load an saved prices XML file or scrape a new one
|
||||
:param load_promos: A boolean representing whether to load an saved XML file or scrape a new one
|
||||
"""
|
||||
promotions: List[Promotion] = get_available_promos(chain, store_id, load_prices, load_promos)
|
||||
promotions: List[Promotion] = get_available_promos(
|
||||
chain, store_id, load_prices, load_promos
|
||||
)
|
||||
for promo in promotions:
|
||||
if promo_name in promo.content:
|
||||
print(promo.repr_ltr())
|
||||
logging.info(promo.repr_ltr())
|
||||
|
||||
|
||||
# TODO: change to returning list of Items
|
||||
def get_all_null_items_in_promos(chain, store_id) -> List[str]:
|
||||
"""
|
||||
This function finds all items appearing in the chain's promotions file but not in the chain's prices file.
|
||||
Outdated.
|
||||
"""
|
||||
items_dict: Dict[str, Item] = create_items_dict(chain, True, store_id)
|
||||
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PromosFull.name)
|
||||
bs_promos = create_bs_object(xml_path, chain, store_id, True, chain.XMLFilesCategory.PromosFull)
|
||||
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml=True)
|
||||
promo_tags = get_all_promos_tags(chain, store_id, load_xml=True)
|
||||
return [
|
||||
item
|
||||
for promo_tag in promo_tags
|
||||
for item in chain.get_null_items(promo_tag, items_dict)
|
||||
]
|
||||
|
||||
null_items = list()
|
||||
for promo in bs_promos.find_all(chain.promotion_tag_name):
|
||||
null_items.extend(chain.get_null_items(promo, items_dict))
|
||||
|
||||
return null_items
|
||||
def get_all_promos_tags(
|
||||
chain: SupermarketChain, store_id: int, load_xml: bool
|
||||
) -> List[Tag]:
|
||||
"""
|
||||
This function gets all the promotions tags for a given store in a given chain.
|
||||
It includes both the full and not full promotions files.
|
||||
|
||||
:param chain: A given supermarket chain
|
||||
:param store_id: A given store ID
|
||||
:param load_xml: A boolean representing whether to try loading the promotions from an existing XML file
|
||||
:return: A list of promotions tags
|
||||
"""
|
||||
bs_objects = list()
|
||||
for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc="promotions_files"):
|
||||
xml_path = xml_file_gen(chain, store_id, category.name)
|
||||
bs_objects.append(
|
||||
create_bs_object(chain, store_id, category, load_xml, xml_path)
|
||||
)
|
||||
|
||||
return [
|
||||
promo
|
||||
for bs_obj in bs_objects
|
||||
for promo in bs_obj.find_all(chain.promotion_tag_name)
|
||||
]
|
||||
|
@@ -6,5 +6,12 @@ lxml==4.6.1
|
||||
requests==2.25.0
|
||||
soupsieve==2.0.1
|
||||
urllib3==1.26.2
|
||||
pandas>=1.1
|
||||
openpyxl>=3.0.1
|
||||
openpyxl
|
||||
tqdm~=4.62.1
|
||||
pytest~=6.2.2
|
||||
pandas~=1.2.0
|
||||
argparse~=1.4.0
|
||||
XlsxWriter~=1.4.3
|
||||
aenum
|
||||
selenium
|
||||
webdriver-manager
|
||||
|
@@ -1,28 +1,22 @@
|
||||
from utils import xml_file_gen, create_bs_object
|
||||
from supermarket_chain import SupermarketChain
|
||||
import logging
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from utils import xml_file_gen, create_bs_object
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
def get_store_id(city: str, load_xml: bool, chain: SupermarketChain):
|
||||
|
||||
def log_stores_ids(city: str, load_xml: bool, chain: SupermarketChain):
|
||||
"""
|
||||
This function prints the store_ids of stores in a given city.
|
||||
The city must match exactly to its spelling in Shufersal's website (hence it should be in Hebrew alphabet).
|
||||
This function prints the stores IDs of stores in a given city.
|
||||
The city must match its spelling in Shufersal's website (hence it should be in Hebrew).
|
||||
|
||||
:param chain: A given supermarket chain
|
||||
:param load_xml: A boolean representing whether to load an existing xml or load an already saved one
|
||||
:param city: A string representing the city of the requested store.
|
||||
"""
|
||||
xml_path: str = xml_file_gen(chain, -1, chain.XMLFilesCategory.Stores.name)
|
||||
bs_stores: BeautifulSoup = create_bs_object(xml_path, chain, -1, load_xml, chain.XMLFilesCategory.Stores)
|
||||
bs_stores: BeautifulSoup = create_bs_object(chain, -1, chain.XMLFilesCategory.Stores, load_xml, xml_path)
|
||||
|
||||
for store in bs_stores.find_all("STORE"):
|
||||
if store.find("CITY").text == city:
|
||||
print((store.find("ADDRESS").text, store.find("STOREID").text, store.find("SUBCHAINNAME").text))
|
||||
|
||||
|
||||
def get_all_deals(chain):
|
||||
xml_path: str = xml_file_gen(chain, -1, chain.XMLFilesCategory.Stores.name)
|
||||
bs_stores: BeautifulSoup = create_bs_object(xml_path, chain, -1, True, chain.XMLFilesCategory.Stores)
|
||||
|
||||
return [int(store.find("STOREID").text) for store in bs_stores.find_all("STORE") if store.find("SUBCHAINID").text
|
||||
== "2"]
|
||||
logging.info((store.find("ADDRESS").text, store.find("STOREID").text, store.find("SUBCHAINNAME").text))
|
||||
|
@@ -1,10 +1,9 @@
|
||||
import re
|
||||
from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from argparse import ArgumentTypeError
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
from aenum import Enum
|
||||
from bs4.element import Tag
|
||||
|
||||
from item import Item
|
||||
@@ -24,14 +23,15 @@ class SupermarketChain(object, metaclass=Meta):
|
||||
"""
|
||||
An enum class of different XML files produced by a supermarket chain
|
||||
"""
|
||||
|
||||
All, Prices, PricesFull, Promos, PromosFull, Stores = range(6)
|
||||
|
||||
_promotion_tag_name = 'Promotion'
|
||||
_promotion_update_tag_name = 'PromotionUpdateDate'
|
||||
_date_format = '%Y-%m-%d'
|
||||
_date_hour_format = '%Y-%m-%d %H:%M'
|
||||
_update_date_format = '%Y-%m-%d %H:%M'
|
||||
_item_tag_name = 'Item'
|
||||
_promotion_tag_name = "Promotion"
|
||||
_promotion_update_tag_name = "PromotionUpdateDate"
|
||||
_date_format = "%Y-%m-%d"
|
||||
_date_hour_format = "%Y-%m-%d %H:%M"
|
||||
_update_date_format = "%Y-%m-%d %H:%M"
|
||||
_item_tag_name = "Item"
|
||||
|
||||
@property
|
||||
def promotion_tag_name(self):
|
||||
@@ -75,19 +75,24 @@ class SupermarketChain(object, metaclass=Meta):
|
||||
:return: The given store_id if valid, else raise an ArgumentTypeError.
|
||||
"""
|
||||
if not SupermarketChain.is_valid_store_id(int(store_id)):
|
||||
raise ArgumentTypeError(f"Given store_id: {store_id} is not a valid store_id.")
|
||||
raise ArgumentTypeError(
|
||||
f"Given store_id: {store_id} is not a valid store_id."
|
||||
)
|
||||
return store_id
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def get_download_url(store_id: int, category: XMLFilesCategory, session: requests.Session) -> str:
|
||||
def get_download_url_or_path(
|
||||
store_id: int, category: XMLFilesCategory, session: requests.Session
|
||||
) -> str:
|
||||
"""
|
||||
This method scrapes supermarket's website and returns a url containing the data for a given store and category.
|
||||
This method scrapes the supermarket's website and according to the given store id and category,
|
||||
it returns a url containing the data or or a path to a gz file containing the data.
|
||||
|
||||
:param session:
|
||||
:param store_id: A given id of a store
|
||||
:param store_id: A given ID of a store
|
||||
:param category: A given category
|
||||
:return: A downloadable link of the data for a given store and category
|
||||
:param session: A given session object
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -100,8 +105,8 @@ class SupermarketChain(object, metaclass=Meta):
|
||||
:param items_dict: A given dictionary of products
|
||||
"""
|
||||
items = list()
|
||||
for item in promo.find_all('Item'):
|
||||
item_code = item.find('ItemCode').text
|
||||
for item in promo.find_all("Item"):
|
||||
item_code = item.find("ItemCode").text
|
||||
full_item_info = items_dict.get(item_code)
|
||||
if full_item_info:
|
||||
items.append(full_item_info)
|
||||
@@ -112,17 +117,8 @@ class SupermarketChain(object, metaclass=Meta):
|
||||
"""
|
||||
This function returns all the items in a given promotion which do not appear in the given items_dict.
|
||||
"""
|
||||
return [item.find('ItemCode').text for item in promo.find_all('Item')
|
||||
if not items_dict.get(item.find('ItemCode').text)]
|
||||
|
||||
@staticmethod
|
||||
def get_item_info(item: Tag) -> Item:
|
||||
"""
|
||||
This function returns a string containing important information about a given supermarket's product.
|
||||
"""
|
||||
return Item(
|
||||
name=item.find(re.compile(r'ItemN[a]?m[e]?')).text,
|
||||
price=float(item.find('ItemPrice').text),
|
||||
manufacturer=item.find(re.compile(r'Manufacture[r]?Name')).text,
|
||||
code=item.find('ItemCode').text
|
||||
)
|
||||
return [
|
||||
item.find("ItemCode").text
|
||||
for item in promo.find_all("Item")
|
||||
if not items_dict.get(item.find("ItemCode").text)
|
||||
]
|
||||
|
@@ -1,3 +1,5 @@
|
||||
import sys,os
|
||||
sys.path.append(os.path.abspath(os.curdir))
|
||||
from item import Item
|
||||
from promotion import RewardType, find_promo_function, get_discount_rate
|
||||
|
||||
@@ -19,7 +21,7 @@ def test_shufersal_promo_type_1():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('פטה פיראוס 20%', 113, '', '')
|
||||
item = Item('פטה פיראוס 20%', 113, 1, '', '')
|
||||
assert promo_func(item) == 100
|
||||
|
||||
|
||||
@@ -38,7 +40,7 @@ def test_shufersal_promo_type_2():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('חגיגת גרנולה פ.יבשים500ג', 26.9, '', '')
|
||||
item = Item('חגיגת גרנולה פ.יבשים500ג', 26.9, 1, '', '')
|
||||
assert promo_func(item) == 21.52
|
||||
|
||||
|
||||
@@ -57,7 +59,7 @@ def test_shufersal_promo_type_6_1():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('פסטרמה מקסיקנית במשקל', 89, '', '')
|
||||
item = Item('פסטרמה מקסיקנית במשקל', 89, 1, '', '')
|
||||
assert promo_func(item) == 89
|
||||
|
||||
|
||||
@@ -76,7 +78,7 @@ def test_shufersal_promo_type_6_2():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('מכונת לוואצה ג\'ולי אדומה', 449, '', '')
|
||||
item = Item('מכונת לוואצה ג\'ולי אדומה', 449, 1, '', '')
|
||||
assert promo_func(item) == 449
|
||||
|
||||
|
||||
@@ -95,7 +97,7 @@ def test_shufersal_promo_type_7_1():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('פינצטה 2011 שחורה/כסופה', 14.9, '', '')
|
||||
item = Item('פינצטה 2011 שחורה/כסופה', 14.9, 1, '', '')
|
||||
assert promo_func(item) == 7.45
|
||||
|
||||
|
||||
@@ -114,7 +116,7 @@ def test_shufersal_promo_type_7_2():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('יוגורט עיזים 500 גרם', 12.9, '', '')
|
||||
item = Item('יוגורט עיזים 500 גרם', 12.9, 1, '', '')
|
||||
assert promo_func(item) == 12.9 * 0.75
|
||||
|
||||
|
||||
@@ -133,7 +135,7 @@ def test_shufersal_promo_type_9_1():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('זיתים מבוקעים פיקנטי540ג', 9.3, '', '')
|
||||
item = Item('זיתים מבוקעים פיקנטי540ג', 9.3, 1, '', '')
|
||||
assert promo_func(item) == 9.3 * 0.75
|
||||
|
||||
|
||||
@@ -152,7 +154,7 @@ def test_shufersal_promo_type_9_2():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('שעועית לבנה שופרסל 800גר', 18.9, '', '')
|
||||
item = Item('שעועית לבנה שופרסל 800גר', 18.9, 1, '', '')
|
||||
assert promo_func(item) == (18.9 + 10) / 2
|
||||
|
||||
|
||||
@@ -171,7 +173,7 @@ def test_shufersal_promo_type_9_3():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price,
|
||||
)
|
||||
item = Item('גומיות שחורות 12 יח', 9.9, '', '')
|
||||
item = Item('גומיות שחורות 12 יח', 9.9, 1, '', '')
|
||||
assert promo_func(item) == 9.9 * 0.75
|
||||
|
||||
|
||||
@@ -190,7 +192,7 @@ def test_shufersal_promo_type_10_1():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price
|
||||
)
|
||||
item = Item('טופו טעם טבעי 300 גרם', 10.9, '', '7296073345763')
|
||||
item = Item('טופו טעם טבעי 300 גרם', 10.9, 1, '7296073345763', '')
|
||||
assert promo_func(item) == 5
|
||||
|
||||
|
||||
@@ -209,7 +211,7 @@ def test_shufersal_promo_type_10_2():
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price
|
||||
)
|
||||
item = Item('טופו טעם טבעי 300 גרם', 10.9, 'כפרי בריא משק ויילר', '7296073345763')
|
||||
item = Item('טופו טעם טבעי 300 גרם', 10.9, 1, '7296073345763', 'כפרי בריא משק ויילר')
|
||||
assert promo_func(item) == 7
|
||||
|
||||
|
||||
@@ -225,7 +227,7 @@ def assert_discount(discounted_price, item_barcode, item_manufacturer, item_name
|
||||
discount_rate=discount_rate,
|
||||
discounted_price=discounted_price
|
||||
)
|
||||
item = Item(item_name, orig_price, item_manufacturer, item_barcode)
|
||||
item = Item(item_name, orig_price, 1, item_barcode, item_manufacturer)
|
||||
assert abs(promo_func(item) - price_after_discount) <= 1e-5, promo_description
|
||||
|
||||
|
||||
|
@@ -0,0 +1,125 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from chains.bareket import Bareket
|
||||
from chains.co_op import CoOp
|
||||
from chains.dor_alon import DorAlon
|
||||
from chains.keshet import Keshet
|
||||
from chains.shuk_hayir import ShukHayir
|
||||
from chains.stop_market import StopMarket
|
||||
from chains.tiv_taam import TivTaam
|
||||
from chains.yeinot_bitan import YeinotBitan
|
||||
from chains.zol_vebegadol import ZolVebegadol
|
||||
from main import CHAINS_DICT
|
||||
from promotion import PROMOTION_COLS_NUM, main_latest_promos
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
pytest.main(args=["-s", os.path.abspath(__file__)])
|
||||
|
||||
session = requests.Session()
|
||||
|
||||
MIN_NUM_OF_PROMOS = 3
|
||||
|
||||
|
||||
@pytest.mark.parametrize("chain_tuple", CHAINS_DICT.items())
|
||||
def test_searching_for_download_urls(chain_tuple):
|
||||
"""
|
||||
Test that get_download_url of each chain returns the correct download url for each category in every chain.
|
||||
"""
|
||||
chain_name, chain = chain_tuple
|
||||
|
||||
logging.info(f"Checking download urls in chain {chain_name}")
|
||||
store_id: int = valid_store_id_by_chain(chain_name)
|
||||
|
||||
_test_download_url_helper(
|
||||
chain, store_id, chain.XMLFilesCategory.PromosFull, r"promo[s]?full", session
|
||||
)
|
||||
_test_download_url_helper(
|
||||
chain, store_id, chain.XMLFilesCategory.Promos, r"promo[s]?", session
|
||||
)
|
||||
_test_download_url_helper(
|
||||
chain, store_id, chain.XMLFilesCategory.PricesFull, r"price[s]?full", session
|
||||
)
|
||||
_test_download_url_helper(
|
||||
chain, store_id, chain.XMLFilesCategory.Prices, r"price[s]?", session
|
||||
)
|
||||
|
||||
|
||||
def _test_download_url_helper(
|
||||
chain: SupermarketChain,
|
||||
store_id: int,
|
||||
category: SupermarketChain.XMLFilesCategory,
|
||||
regex_pat: str,
|
||||
session: requests.session,
|
||||
):
|
||||
download_url: str = chain.get_download_url_or_path(store_id, category, session)
|
||||
if not download_url: # Not found non-full Promos/Prices file
|
||||
return
|
||||
logging.debug(download_url)
|
||||
assert re.search(
|
||||
regex_pat, download_url, re.IGNORECASE
|
||||
), f"Invalid {category.name} url in {repr(type(chain))}"
|
||||
if category in [chain.XMLFilesCategory.Prices, chain.XMLFilesCategory.Promos]:
|
||||
assert not re.search(
|
||||
"full", download_url, re.IGNORECASE
|
||||
), f"Downloaded the full {category.name} file mistakenly in {repr(type(chain))}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("chain_tuple", CHAINS_DICT.items())
|
||||
def test_promotions_scraping(chain_tuple):
|
||||
"""
|
||||
Test scraping of promotions is completed successfully and a valid xlsx file is generated as an output.
|
||||
"""
|
||||
chain_name, chain = chain_tuple
|
||||
tf = tempfile.NamedTemporaryFile(suffix=".xlsx")
|
||||
|
||||
logging.info(f"Test scraping promotions from {chain_name}")
|
||||
|
||||
store_id: int = valid_store_id_by_chain(chain_name)
|
||||
try:
|
||||
main_latest_promos(
|
||||
store_id=store_id,
|
||||
output_filename=tf.name,
|
||||
chain=chain,
|
||||
load_promos=False,
|
||||
load_prices=False,
|
||||
)
|
||||
df = pd.read_excel(tf.name)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
logging.error(f"Failed loading excel of {chain_name}")
|
||||
raise
|
||||
|
||||
assert (
|
||||
df.shape[0] > MIN_NUM_OF_PROMOS and df.shape[1] == PROMOTION_COLS_NUM
|
||||
), f"Failed scraping {chain_name}"
|
||||
|
||||
|
||||
def valid_store_id_by_chain(chain_name) -> int:
|
||||
"""
|
||||
This function returns a valid store ID for a given chain.
|
||||
|
||||
:param chain_name: The name of a chain as returned by repr(ChainClassName).
|
||||
:return: An integer representing a valid store ID in the given chain
|
||||
"""
|
||||
if chain_name == repr(DorAlon):
|
||||
store_id = 501
|
||||
elif chain_name in [repr(TivTaam), repr(Bareket)]:
|
||||
store_id = 2
|
||||
elif chain_name == repr(CoOp):
|
||||
store_id = 202
|
||||
elif chain_name in [repr(ShukHayir), repr(ZolVebegadol)]:
|
||||
store_id = 4
|
||||
elif chain_name in [repr(StopMarket), repr(Keshet)]:
|
||||
store_id = 5
|
||||
elif chain_name == repr(YeinotBitan):
|
||||
store_id = 3700
|
||||
else:
|
||||
store_id = 1
|
||||
return store_id
|
||||
|
148
utils.py
148
utils.py
@@ -1,16 +1,24 @@
|
||||
import gzip
|
||||
import io
|
||||
import logging
|
||||
import os.path
|
||||
import zipfile
|
||||
from argparse import ArgumentTypeError
|
||||
from datetime import date
|
||||
from datetime import datetime
|
||||
from os import path
|
||||
from typing import AnyStr, Dict
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from os import path
|
||||
from tqdm import tqdm
|
||||
|
||||
from item import Item
|
||||
from supermarket_chain import SupermarketChain
|
||||
|
||||
RESULTS_DIRNAME = "results"
|
||||
RAW_FILES_DIRNAME = "raw_files"
|
||||
VALID_PROMOTION_FILE_EXTENSIONS = [".csv", ".xlsx"]
|
||||
|
||||
|
||||
def xml_file_gen(chain: SupermarketChain, store_id: int, category_name: str) -> str:
|
||||
@@ -23,12 +31,22 @@ def xml_file_gen(chain: SupermarketChain, store_id: int, category_name: str) ->
|
||||
:param category_name: A given category name
|
||||
:return: An xml filename
|
||||
"""
|
||||
store_id_str: str = f"-{str(store_id)}" if SupermarketChain.is_valid_store_id(store_id) else ""
|
||||
return path.join(RAW_FILES_DIRNAME, f"{repr(type(chain))}-{category_name}{store_id_str}.xml")
|
||||
store_id_str: str = (
|
||||
f"-{str(store_id)}" if SupermarketChain.is_valid_store_id(store_id) else ""
|
||||
)
|
||||
return path.join(
|
||||
RAW_FILES_DIRNAME,
|
||||
f"{repr(type(chain))}-{category_name}{store_id_str}-{date.today()}.xml",
|
||||
)
|
||||
|
||||
|
||||
def create_bs_object(xml_path: str, chain: SupermarketChain, store_id: int, load_xml: bool,
|
||||
category: SupermarketChain.XMLFilesCategory) -> BeautifulSoup:
|
||||
def create_bs_object(
|
||||
chain: SupermarketChain,
|
||||
store_id: int,
|
||||
category: SupermarketChain.XMLFilesCategory,
|
||||
load_xml: bool,
|
||||
xml_path: str,
|
||||
) -> BeautifulSoup:
|
||||
"""
|
||||
This function creates a BeautifulSoup (BS) object according to the given parameters.
|
||||
In case the given load_xml is True and the XML file exists, the function creates the BS object from the given
|
||||
@@ -43,14 +61,18 @@ def create_bs_object(xml_path: str, chain: SupermarketChain, store_id: int, load
|
||||
:return: A BeautifulSoup object with xml content.
|
||||
"""
|
||||
if load_xml and path.isfile(xml_path):
|
||||
return create_bs_object_from_xml(xml_path)
|
||||
return create_bs_object_from_link(xml_path, chain, category, store_id)
|
||||
return get_bs_object_from_xml(xml_path)
|
||||
return get_bs_object_from_link(chain, store_id, category, xml_path)
|
||||
|
||||
|
||||
def create_bs_object_from_link(xml_path: str, chain: SupermarketChain, category: SupermarketChain.XMLFilesCategory,
|
||||
store_id: int) -> BeautifulSoup:
|
||||
def get_bs_object_from_link(
|
||||
chain: SupermarketChain,
|
||||
store_id: int,
|
||||
category: SupermarketChain.XMLFilesCategory,
|
||||
xml_path: str,
|
||||
) -> BeautifulSoup:
|
||||
"""
|
||||
This function creates a BeautifulSoup (BS) object by generating a download link from Shufersal's API.
|
||||
This function creates a BeautifulSoup (BS) object by generating a download link the given chain's API.
|
||||
|
||||
:param chain: A given supermarket chain
|
||||
:param xml_path: A given path to an XML file to load/save the BS object from/to.
|
||||
@@ -59,45 +81,72 @@ def create_bs_object_from_link(xml_path: str, chain: SupermarketChain, category:
|
||||
:return: A BeautifulSoup object with xml content.
|
||||
"""
|
||||
session = requests.Session()
|
||||
download_url: str = chain.get_download_url(store_id, category, session)
|
||||
response_content = session.get(download_url).content
|
||||
try:
|
||||
xml_content: AnyStr = gzip.decompress(response_content)
|
||||
except gzip.BadGzipFile:
|
||||
with zipfile.ZipFile(io.BytesIO(response_content)) as the_zip:
|
||||
zip_info = the_zip.infolist()[0]
|
||||
with the_zip.open(zip_info) as the_file:
|
||||
xml_content = the_file.read()
|
||||
with open(xml_path, 'wb') as f_out:
|
||||
download_url_or_path: str = chain.get_download_url_or_path(store_id, category, session)
|
||||
if not download_url_or_path:
|
||||
return BeautifulSoup()
|
||||
if os.path.isfile(download_url_or_path):
|
||||
with gzip.open(download_url_or_path) as fIn:
|
||||
xml_content = fIn.read()
|
||||
os.remove(download_url_or_path) # Delete gz file
|
||||
else:
|
||||
response_content = session.get(download_url_or_path).content
|
||||
try:
|
||||
xml_content: AnyStr = gzip.decompress(response_content)
|
||||
except gzip.BadGzipFile:
|
||||
with zipfile.ZipFile(io.BytesIO(response_content)) as the_zip:
|
||||
zip_info = the_zip.infolist()[0]
|
||||
with the_zip.open(zip_info) as the_file:
|
||||
xml_content = the_file.read()
|
||||
with open(xml_path, "wb") as f_out:
|
||||
f_out.write(xml_content)
|
||||
return BeautifulSoup(xml_content, features='xml')
|
||||
return BeautifulSoup(xml_content, features="xml")
|
||||
|
||||
|
||||
def create_bs_object_from_xml(xml_path: str) -> BeautifulSoup:
|
||||
def get_bs_object_from_xml(xml_path: str) -> BeautifulSoup:
|
||||
"""
|
||||
This function creates a BeautifulSoup (BS) object from a given XML file.
|
||||
|
||||
:param xml_path: A given path to an xml file to load/save the BS object from/to.
|
||||
:return: A BeautifulSoup object with xml content.
|
||||
"""
|
||||
with open(xml_path, 'rb') as f_in:
|
||||
return BeautifulSoup(f_in, features='xml')
|
||||
with open(xml_path, "rb") as f_in:
|
||||
return BeautifulSoup(f_in, features="xml")
|
||||
|
||||
|
||||
def create_items_dict(chain: SupermarketChain, load_xml, store_id: int) -> Dict[str, Item]:
|
||||
def create_items_dict(
|
||||
chain: SupermarketChain, store_id: int, load_xml
|
||||
) -> Dict[str, Item]:
|
||||
"""
|
||||
This function creates a dictionary where every key is an item code and its value is its corresponding Item instance.
|
||||
We take both full and not full prices files, and assume that the no full is more updated (in case of overwriting).
|
||||
|
||||
:param chain: A given supermarket chain
|
||||
:param load_xml: A boolean representing whether to load an existing prices xml file
|
||||
:param store_id: A given store id
|
||||
"""
|
||||
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PricesFull.name)
|
||||
bs_prices: BeautifulSoup = create_bs_object(xml_path, chain, store_id, load_xml, chain.XMLFilesCategory.PricesFull)
|
||||
return {item.find('ItemCode').text: chain.get_item_info(item) for item in bs_prices.find_all(chain.item_tag_name)}
|
||||
items_dict = dict()
|
||||
for category in tqdm(
|
||||
[chain.XMLFilesCategory.PricesFull, chain.XMLFilesCategory.Prices],
|
||||
desc="prices_files",
|
||||
):
|
||||
xml_path: str = xml_file_gen(chain, store_id, category.name)
|
||||
bs_prices: BeautifulSoup = create_bs_object(
|
||||
chain, store_id, category, load_xml, xml_path
|
||||
)
|
||||
items_tags = bs_prices.find_all(chain.item_tag_name)
|
||||
items_dict.update(
|
||||
{
|
||||
item_tag.find("ItemCode").text: Item.from_tag(item_tag)
|
||||
for item_tag in items_tags
|
||||
}
|
||||
)
|
||||
|
||||
return items_dict
|
||||
|
||||
|
||||
def get_products_prices(chain: SupermarketChain, store_id: int, load_xml: bool, product_name: str) -> None:
|
||||
def log_products_prices(
|
||||
chain: SupermarketChain, store_id: int, load_xml: bool, product_name: str
|
||||
) -> None:
|
||||
"""
|
||||
This function prints the products in a given store which contains a given product_name.
|
||||
|
||||
@@ -106,20 +155,37 @@ def get_products_prices(chain: SupermarketChain, store_id: int, load_xml: bool,
|
||||
:param product_name: A given product name
|
||||
:param load_xml: A boolean representing whether to load an existing xml or load an already saved one
|
||||
"""
|
||||
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PricesFull.name)
|
||||
bs_prices: BeautifulSoup = create_bs_object(xml_path, chain, store_id, load_xml, chain.XMLFilesCategory.PricesFull)
|
||||
prods = [item for item in bs_prices.find_all("Item") if product_name in item.find("ItemName").text]
|
||||
prods.sort(key=lambda x: float(x.find("UnitOfMeasurePrice").text))
|
||||
for prod in prods:
|
||||
print(
|
||||
(
|
||||
prod.find('ItemName').text[::-1],
|
||||
prod.find('ManufacturerName').text[::-1],
|
||||
prod.find('ItemPrice').text
|
||||
)
|
||||
)
|
||||
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml)
|
||||
products_by_name = [
|
||||
item for item in items_dict.values() if product_name in item.name
|
||||
]
|
||||
products_by_name_sorted_by_price = sorted(
|
||||
products_by_name, key=lambda item: item.price_by_measure
|
||||
)
|
||||
|
||||
for prod in products_by_name_sorted_by_price:
|
||||
logging.info(prod)
|
||||
|
||||
|
||||
def get_float_from_tag(tag, int_tag) -> int:
|
||||
content = tag.find(int_tag)
|
||||
return float(content.text) if content else 0
|
||||
|
||||
|
||||
def is_valid_promotion_output_file(output_file: str) -> bool:
|
||||
return any(
|
||||
output_file.endswith(extension) for extension in VALID_PROMOTION_FILE_EXTENSIONS
|
||||
)
|
||||
|
||||
|
||||
def valid_promotion_output_file(output_file: str) -> str:
|
||||
if not is_valid_promotion_output_file(output_file):
|
||||
raise ArgumentTypeError(
|
||||
f"Given output file has an invalid extension is invalid: {output_file}"
|
||||
)
|
||||
return output_file
|
||||
|
||||
|
||||
def log_message_and_time_if_debug(msg: str) -> None:
|
||||
logging.info(msg)
|
||||
logging.debug(datetime.now())
|
||||
|
Reference in New Issue
Block a user