Compare commits

37 Commits
0.1 ... master

Author SHA1 Message Date
6f527b12d5 prefs 2022-10-05 12:42:53 +03:00
b8ccd6dcf7 fix of download directory 2022-10-05 12:38:52 +03:00
Koren Lazar
6755ff5caf Merge pull request #3 from 1kamma/master
mistake in the requierments fixed
2022-10-05 07:51:58 +03:00
42fac846aa Merge branch 'master' of https://github.com/1kamma/supermarket-scraping 2022-10-05 03:52:43 +03:00
d047ffdcc2 added options for headless computers, changed the downloa path to raw_files 2022-10-05 03:37:43 +03:00
korenlazar
9b6f63a7f0 Added the chain Yeinot Bitan (also to tests).
Changed price with promos to include only regular promotions.
Added filtering of promotions including too many items.
2022-10-04 13:36:29 +03:00
korenlazar
86ff2ca7b7 Fixed small bug in valid_store_id_by_chain function 2022-10-04 12:11:44 +03:00
korenlazar
b1737839ce Fixed bug with Shufersal Scraping by changing xml files category back to normal Enum. 2022-10-04 12:09:42 +03:00
korenlazar
7b63eab7bd leftover from last commit 2022-10-04 11:42:57 +03:00
korenlazar
ceff48dbd9 Fixed the bug with cerberus_web_client.py by working with Selenium. To login each chain working with it must have a username for login with Selenium. in this mechanism, a path to a gz file is returned instead of url
Added the option to output a prices json file in main.py under --prices-with-promos, where the prices are updated by the latest promotions (under the 'final_price' key, where 'price' represents the price before promotions).

Fixed small bug of BinaWebCleint by checking that filename does not contain 'null'.

Changed Hierarchy of chains such that it includes the webclients.

Added the date to the output filenames to start storing the data over time.

Black formatting (according to pip 8 guidelines).

Changed the chains_dict in main to a constant one.
2022-10-04 11:42:36 +03:00
korenLazar
b5db721a3d Merge pull request #6 from korenLazar/test-scraping
Test scraping
2021-08-18 12:26:23 +03:00
KorenLazar
90cab0a2e1 Minor changes 2021-08-18 11:32:04 +03:00
KorenLazar
87b6fbe2b0 Changed ClubID enum class to include a string field used for printing, and define ClubID.OTHER as a default value for the class to handle invalid inputs. 2021-08-18 11:30:31 +03:00
KorenLazar
322995ba15 Added TODO for ordering the argparse 2021-08-18 11:16:25 +03:00
KorenLazar
294dee8cc2 Added test for searching different files' urls. Specifically, asserting the searching non-full files does not yield urls of full files. 2021-08-17 13:08:39 +03:00
KorenLazar
cffdd84086 Added specific searching for the download url of non-full promotions and prices files. Changed return value of get_download_url accordingly. 2021-08-17 13:06:42 +03:00
KorenLazar
3770352d04 Added new requirements to requirements.txt 2021-08-17 09:35:20 +03:00
KorenLazar
63fec1490c Added new requirements to requirements.txt 2021-08-17 09:18:45 +03:00
KorenLazar
c1281cb312 Added a test for scraping the promotions and exporting them to xlsx files. 2021-08-16 23:09:10 +03:00
KorenLazar
1a88ed6e01 minor changes 2021-08-16 23:08:04 +03:00
KorenLazar
9b0ab013c9 Added requirements to requirements.txt 2021-08-16 23:07:32 +03:00
KorenLazar
1a6707341d Logical fixes in promotions scraping and calculation. 2021-08-16 23:07:07 +03:00
KorenLazar
844a106c57 Added tqdm 2021-08-16 23:05:16 +03:00
KorenLazar
c793057623 Documentation and minor changes 2021-08-16 14:06:54 +03:00
KorenLazar
13991aaa40 Documentation and minor changes 2021-08-16 14:05:22 +03:00
KorenLazar
b3d410306d Removed filtering by PRODUCTS_TO_IGNORE 2021-08-16 14:04:46 +03:00
korenLazar
62089dd538 Merge pull request #5 from korenLazar/export-promotions-to-xlsx-table
Export promotions to xlsx table
2021-08-16 12:51:48 +03:00
KorenLazar
03ff6d5281 Changed create_items_dict function to included non-full prices file in the items dictionary.
Changed log_products_prices to work with an items dictionary and a __repr__ function of the Item class.
2021-08-16 12:44:32 +03:00
KorenLazar
e09b2da4a1 removed get_all_deals function 2021-08-16 12:43:01 +03:00
KorenLazar
58bb04f1dd Added get_all_promos_tags function and included the non-full promotions file in the promotions collection. 2021-08-16 12:42:38 +03:00
KorenLazar
ebb1e912b9 Change INFO logging format 2021-08-16 12:40:06 +03:00
KorenLazar
98dcc1c33d Add price_by_measure member to Item object 2021-08-16 12:39:28 +03:00
korenLazar
8a726ff605 Merge pull request #4 from korenLazar/export-promotions-to-xlsx-table
finished implementing exporting promotion to xlsx table and automatic…
2021-06-17 10:36:20 +03:00
KorenLazar
27b45a4999 finished implementing exporting promotion to xlsx table and automatically opening the xlsx file 2021-06-01 21:00:40 +03:00
KorenLazar
ec505dba67 minor rephrasing in documentation 2021-05-18 14:34:11 +03:00
3ae8d02836 correction, by comments and suggestions of Koren 2021-04-29 17:55:21 +03:00
korenLazar
e740b122ff Merge pull request #1 from 1kamma/master
this will be better for the windows and unix-bases
2021-04-17 18:34:11 +03:00
32 changed files with 1117 additions and 409 deletions

3
.gitignore vendored
View File

@@ -6,3 +6,6 @@ raw_files/
results/
all_deals.py
unknown_items.csv
helper_*
.vscode/
desktop.ini

View File

@@ -1,6 +1,5 @@
from chains.mahsaneiHashook import MahsaneiHashook
from supermarket_chain import SupermarketChain
class Bareket(MahsaneiHashook, SupermarketChain):
class Bareket(MahsaneiHashook):
pass

View File

@@ -1,25 +1,38 @@
import json
import re
import requests
from supermarket_chain import SupermarketChain
FNAME_KEY = "FileNm"
class BinaProjectWebClient:
class BinaProjectWebClient(SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
_update_date_format = '%Y-%m-%d %H:%M:%S'
_path_prefix = ""
_hostname_suffix = ".binaprojects.com"
def get_download_url(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \
def get_download_url_or_path(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \
-> str:
if not SupermarketChain.is_valid_store_id(store_id):
raise ValueError(f"Invalid {store_id=} (store id must be a natural number)")
hostname = f"http://{self.hostname_prefix}{self.hostname_suffix}"
url = '/'.join([hostname, self.path_prefix, "MainIO_Hok.aspx"])
req_res: requests.Response = session.get(url)
jsons_files = json.loads(req_res.text)
suffix = next(cur_json["FileNm"] for cur_json in jsons_files if f'-{store_id:03d}-20' in cur_json["FileNm"]
and category.name.replace('s', '') in cur_json["FileNm"])
if category in [SupermarketChain.XMLFilesCategory.Promos, SupermarketChain.XMLFilesCategory.Prices]:
filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname \
and not re.search('full', fname, re.IGNORECASE)
if not any(filter_func(cur_json[FNAME_KEY]) for cur_json in jsons_files):
return "" # Could not find non-full Promos/Prices file
else:
filter_func = lambda fname: f'-{store_id:03d}-20' in fname and category.name.replace('s', '') in fname and 'null' not in fname
suffix = next(
cur_json[FNAME_KEY] for cur_json in jsons_files if filter_func(cur_json[FNAME_KEY]))
down_url: str = '/'.join([hostname, self.path_prefix, "Download", suffix])
print(down_url)
return down_url
@property

View File

@@ -1,28 +1,113 @@
import json
import logging
import os
import shutil
import platform
import sys
import time
from abc import abstractmethod
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from supermarket_chain import SupermarketChain
class CerberusWebClient:
def get_download_url(self, store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) \
-> str:
hostname: str = "https://publishedprices.co.il"
# Post the payload to the site to log in
session.post(hostname + "/login/user", data={'username': self.username})
# Scrape the data
ajax_dir_payload: dict = {'iDisplayLength': 100000, 'sSearch': category.name.replace('s', '')}
s: requests.Response = session.post(hostname + "/file/ajax_dir", data=ajax_dir_payload)
s_json: dict = json.loads(s.text)
suffix: str = next(d['name'] for d in s_json['aaData'] if f'-{store_id:03d}-20' in d['name'])
download_url: str = hostname + "/file/d/" + suffix
print(download_url)
return download_url
class CerberusWebClient(SupermarketChain):
@property
@abstractmethod
def username(self):
return repr(type(self))
pass
download_dir = f"{os.path.abspath(os.path.curdir)}/raw_files"
def is_system_headless(self) -> bool:
return sys.platform == "linux" and not os.environ.get("DISPLAY")
def set_browser_options(self) -> webdriver.ChromeOptions:
options = webdriver.ChromeOptions()
options.add_experimental_option("prefs",{"download.default_directory": self.download_dir})
options.add_argument("ignore-certificate-errors")
options.add_argument("--ignore-ssl-errors=yes")
options.headless = self.is_system_headless()
return options
def set_browser(self,options: webdriver.ChromeOptions) -> webdriver.Chrome:
if self.is_system_headless() and platform.machine() == 'aarch64':
return webdriver.Chrome(service=Service('/usr/bin/chromedriver'), options=options)
return webdriver.Chrome(
service=Service(ChromeDriverManager().install()), options=options
)
def get_download_url_or_path(
self,
store_id: int,
category: SupermarketChain.XMLFilesCategory,
session: requests.Session,
) -> str:
options=self.set_browser_options()
driver = self.set_browser(options)
driver.get("https://url.retail.publishedprices.co.il/login#")
time.sleep(2)
userElem = driver.find_element(By.NAME, "username")
userElem.send_keys(self.username)
driver.find_element(By.NAME, "Submit").click()
time.sleep(2)
searchElem = driver.find_element(By.CLASS_NAME, "form-control")
searchElem.send_keys(category.name.lower().replace('s', ''))
time.sleep(5)
conns = driver.find_elements(By.CLASS_NAME, "f")
best_link = ""
for conn in conns:
link = conn.get_attribute("href").lower()
if category == SupermarketChain.XMLFilesCategory.Promos:
filter_func = (
lambda l: "promo" in l
and "full" not in l
and f"-{store_id:03d}-20" in l
)
elif category == SupermarketChain.XMLFilesCategory.PromosFull:
filter_func = (
lambda l: "promo" in l
and "full" in l
and f"-{store_id:03d}-20" in l
)
elif category == SupermarketChain.XMLFilesCategory.Prices:
filter_func = (
lambda l: "price" in l
and "full" not in l
and f"-{store_id:03d}-20" in l
)
elif category == SupermarketChain.XMLFilesCategory.PricesFull:
filter_func = (
lambda l: "price" in l
and "full" in l
and f"-{store_id:03d}-20" in l
)
elif category == SupermarketChain.XMLFilesCategory.Stores:
filter_func = lambda l: "store" in l and "full" in l and f"-000-20" in l
else:
raise ValueError(f"Unknown category type: {category=}")
if filter_func(link):
if not best_link or int(link[-7:-3]) > int(best_link[-7:-3]):
best_link = link
if not best_link:
return ""
driver.get(best_link)
time.sleep(3)
filename = best_link.split("/")[-1] # don't be an idiot. it is stupid to count letters
# split and grab, or rename it by yourself.
path_download = os.path.join(self.download_dir, filename)
logging.info(f"{path_download=}")
path_to_save = f"raw_files/{self.username}-{filename}"
try:
shutil.move(path_download, path_to_save)
print(f"Downloaded {filename} and moved file to {path_to_save}")
except:
print(f"{filename} already exists in {path_to_save}")
return path_to_save

View File

@@ -1,6 +1,5 @@
from chains.mahsaneiHashook import MahsaneiHashook
from supermarket_chain import SupermarketChain
class CoOp(MahsaneiHashook, SupermarketChain):
class CoOp(MahsaneiHashook):
pass

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class DorAlon(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class DorAlon(CerberusWebClient):
@property
def username(self):
return "doralon"
_date_hour_format = "%Y-%m-%d %H:%M:%S"

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class Freshmarket(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class Freshmarket(CerberusWebClient):
_date_hour_format = "%Y-%m-%d %H:%M:%S"
@property
def username(self):
return "freshmarket"

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class HaziHinam(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class HaziHinam(CerberusWebClient):
@property
def username(self):
return "HaziHinam"
_date_hour_format = "%Y-%m-%d %H:%M:%S"

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class Keshet(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class Keshet(CerberusWebClient):
@property
def username(self):
return "Keshet"
_date_hour_format = "%Y-%m-%d %H:%M:%S"

View File

@@ -1,7 +1,6 @@
from chains.binaproject_web_client import BinaProjectWebClient
from supermarket_chain import SupermarketChain
class KingStore(BinaProjectWebClient, SupermarketChain):
class KingStore(BinaProjectWebClient):
_path_prefix = "Food_Law"
_hostname_suffix = ".co.il"

View File

@@ -1,6 +1,5 @@
from chains.binaproject_web_client import BinaProjectWebClient
from supermarket_chain import SupermarketChain
class Maayan2000(BinaProjectWebClient, SupermarketChain):
pass
class Maayan2000(BinaProjectWebClient):
pass

View File

@@ -1,4 +1,6 @@
import re
from typing import Dict, List
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
@@ -8,26 +10,46 @@ from supermarket_chain import SupermarketChain
class MahsaneiHashook(SupermarketChain):
_promotion_tag_name = 'Sale'
_promotion_update_tag_name = 'PriceUpdateDate'
_date_format = '%Y/%m/%d'
_date_hour_format = '%Y/%m/%d %H:%M:%S'
_update_date_format = '%Y/%m/%d %H:%M:%S'
_item_tag_name = 'Product'
_promotion_tag_name = "Sale"
_promotion_update_tag_name = "PriceUpdateDate"
_date_format = "%Y/%m/%d"
_date_hour_format = "%Y/%m/%d %H:%M:%S"
_update_date_format = "%Y/%m/%d %H:%M:%S"
_item_tag_name = "Product"
@staticmethod
def get_download_url(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str:
def get_download_url_or_path(
store_id: int,
category: SupermarketChain.XMLFilesCategory,
session: requests.Session,
) -> str:
prefix = "http://matrixcatalog.co.il/"
url = prefix + "NBCompetitionRegulations.aspx"
req_res: requests.Response = requests.get(url)
soup = BeautifulSoup(req_res.text, features='lxml')
suffix: str = soup.find('a', href=lambda value: value and category.name.replace('s', '') in value
and f'-{store_id:03d}-20' in value).attrs['href']
soup = BeautifulSoup(req_res.text, features="lxml")
if category in [
SupermarketChain.XMLFilesCategory.Promos,
SupermarketChain.XMLFilesCategory.Prices,
]:
fname_filter_func = (
lambda fname: fname
and category.name.replace("s", "") in fname
and f"-{store_id:03d}-20" in fname
and not re.search("full", fname, re.IGNORECASE)
)
if soup.find("a", href=fname_filter_func) is None:
return "" # Could not find non-full Promos/Prices file
else:
fname_filter_func = (
lambda fname: fname
and category.name.replace("s", "") in fname
and f"-{store_id:03d}-20" in fname
)
suffix: str = soup.find("a", href=fname_filter_func).attrs["href"]
down_url: str = prefix + suffix
print(down_url)
return down_url
@staticmethod
def get_items(promo: Tag, items_dict: Dict[str, Item]) -> List[Item]:
promo_item = items_dict.get(promo.find('ItemCode').text)
promo_item = items_dict.get(promo.find("ItemCode").text)
return [promo_item] if promo_item else []

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class OsherAd(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class OsherAd(CerberusWebClient):
@property
def username(self):
return "osherad"
_date_hour_format = "%Y-%m-%d %H:%M:%S"

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class RamiLevi(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class RamiLevi(CerberusWebClient):
@property
def username(self):
return "RamiLevi"
_date_hour_format = "%Y-%m-%d %H:%M:%S"

View File

@@ -1,6 +1,5 @@
from chains.binaproject_web_client import BinaProjectWebClient
from supermarket_chain import SupermarketChain
class ShefaBirkatHashem(BinaProjectWebClient, SupermarketChain):
pass
class ShefaBirkatHashem(BinaProjectWebClient):
pass

View File

@@ -7,12 +7,11 @@ from supermarket_chain import SupermarketChain
class Shufersal(SupermarketChain):
@staticmethod
def get_download_url(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str:
def get_download_url_or_path(store_id: int, category: SupermarketChain.XMLFilesCategory, session: requests.Session) -> str:
url = f"http://prices.shufersal.co.il/FileObject/UpdateCategory?catID={category.value}"
if SupermarketChain.is_valid_store_id(int(store_id)):
url += f"&storeId={store_id}"
req_res: requests.Response = requests.get(url)
soup: BeautifulSoup = BeautifulSoup(req_res.text, features='lxml')
down_url: str = soup.find('a', text="לחץ להורדה")['href']
print(down_url)
return down_url

View File

@@ -1,7 +1,7 @@
from chains.binaproject_web_client import BinaProjectWebClient
from supermarket_chain import SupermarketChain
class ShukHayir(BinaProjectWebClient, SupermarketChain):
class ShukHayir(BinaProjectWebClient):
@property
def hostname_prefix(self): return "shuk-hayir"
def hostname_prefix(self):
return "shuk-hayir"

View File

@@ -1,9 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class StopMarket(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class StopMarket(CerberusWebClient):
_date_hour_format = "%Y-%m-%d %H:%M:%S"
@property
def username(self):
return 'Stop_Market'
return "Stop_Market"

View File

@@ -1,6 +1,7 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class TivTaam(CerberusWebClient, SupermarketChain):
pass
class TivTaam(CerberusWebClient):
@property
def username(self):
return "TivTaam"

View File

@@ -1,6 +1,5 @@
from chains.mahsaneiHashook import MahsaneiHashook
from supermarket_chain import SupermarketChain
class Victory(MahsaneiHashook, SupermarketChain):
class Victory(MahsaneiHashook):
pass

35
chains/yeinot_bitan.py Normal file
View File

@@ -0,0 +1,35 @@
import re
from datetime import datetime
import numpy as np
import requests
from bs4 import BeautifulSoup
from supermarket_chain import SupermarketChain
class YeinotBitan(SupermarketChain):
_date_hour_format = "%Y-%m-%d %H:%M:%S"
@staticmethod
def get_download_url_or_path(
store_id: int,
category: SupermarketChain.XMLFilesCategory,
session: requests.Session,
) -> str:
today_date_suffix = datetime.today().date().strftime("%Y%m%d")
url = f"http://publishprice.ybitan.co.il/{today_date_suffix}/"
req_res = requests.get(url)
soup = BeautifulSoup(req_res.text, features="lxml")
promo_tags = soup.findAll(
"a",
attrs={
"href": re.compile(
rf"^{category.name.replace('s', '')}.*-{store_id:04d}-"
)
},
)
most_recent_tag_ind = np.argmax(
[int(promo_tag["href"][-7:-3]) for promo_tag in promo_tags]
)
return url + promo_tags[most_recent_tag_ind]["href"]

View File

@@ -1,6 +1,9 @@
from chains.cerberus_web_client import CerberusWebClient
from supermarket_chain import SupermarketChain
class Yohananof(CerberusWebClient, SupermarketChain):
_date_hour_format = '%Y-%m-%d %H:%M:%S'
class Yohananof(CerberusWebClient):
@property
def username(self):
return "yohananof"
_date_hour_format = "%Y-%m-%d %H:%M:%S"

View File

@@ -1,6 +1,5 @@
from chains.binaproject_web_client import BinaProjectWebClient
from supermarket_chain import SupermarketChain
class ZolVebegadol(BinaProjectWebClient, SupermarketChain):
class ZolVebegadol(BinaProjectWebClient):
pass

35
item.py
View File

@@ -1,13 +1,44 @@
import json
import re
from bs4.element import Tag
class Item:
"""
A class representing a product in some supermarket.
"""
def __init__(self, name: str, price: float, manufacturer: str, code: str):
def __init__(
self,
name: str,
price: float,
price_by_measure: float,
code: str,
manufacturer: str,
):
self.name: str = name
self.price: float = price
self.final_price: float = price
self.price_by_measure = price_by_measure
self.manufacturer: str = manufacturer
self.code: str = code
@classmethod
def from_tag(cls, item: Tag):
"""
This method creates an Item instance from an xml tag.
"""
return cls(
name=item.find(re.compile(r"ItemN[a]?m[e]?")).text,
price=float(item.find("ItemPrice").text),
price_by_measure=float(item.find("UnitOfMeasurePrice").text),
code=item.find("ItemCode").text,
manufacturer=item.find(re.compile(r"Manufacture[r]?Name")).text,
)
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__)
def __repr__(self):
return str((self.name, self.price, self.manufacturer, self.code))
return f"\nשם: {self.name}\nמחיר: {self.price}\nיצרן: {self.manufacturer}\nקוד: {self.code}\n"

290
main.py
View File

@@ -1,105 +1,225 @@
import json
import logging
import os
import subprocess
import sys
from argparse import ArgumentParser
from datetime import datetime, date
from pathlib import Path
from promotion import main_latest_promos, get_promos_by_name
from store_utils import get_store_id
from utils import RESULTS_DIRNAME, RAW_FILES_DIRNAME, get_products_prices
from chains.bareket import Bareket
from chains.co_op import CoOp
from chains.dor_alon import DorAlon
from chains.freshmarket import Freshmarket
from chains.hazi_hinam import HaziHinam
from chains.keshet import Keshet
from chains.king_store import KingStore
from chains.maayan2000 import Maayan2000
from chains.mahsaneiHashook import MahsaneiHashook
from chains.osher_ad import OsherAd
from chains.rami_levi import RamiLevi
from chains.shefa_birkat_hashem import ShefaBirkatHashem
from chains.shufersal import Shufersal
from chains.shuk_hayir import ShukHayir
from chains.stop_market import StopMarket
from chains.tiv_taam import TivTaam
from chains.victory import Victory
from chains.yeinot_bitan import YeinotBitan
from chains.yohananof import Yohananof
from chains.zol_vebegadol import ZolVebegadol
from promotion import main_latest_promos, log_promos_by_name, get_all_prices
from store_utils import log_stores_ids
from supermarket_chain import SupermarketChain
from chains import (
bareket,
mahsaneiHashook,
dor_alon,
freshmarket,
hazi_hinam,
keshet,
stop_market,
tiv_taam,
shufersal,
co_op,
victory,
yohananof,
zol_vebegadol,
rami_levi,
osher_ad,
maayan2000,
shuk_hayir,
king_store,
shefa_birkat_hashem,
from utils import (
RESULTS_DIRNAME,
RAW_FILES_DIRNAME,
VALID_PROMOTION_FILE_EXTENSIONS,
log_products_prices,
valid_promotion_output_file,
is_valid_promotion_output_file,
)
# TODO: fix problem of left-to-right printing
CHAINS_LIST = [
Bareket,
MahsaneiHashook,
DorAlon,
Freshmarket,
HaziHinam,
Keshet,
StopMarket,
TivTaam,
Shufersal,
CoOp,
Victory,
Yohananof,
ZolVebegadol,
RamiLevi,
OsherAd,
Maayan2000,
ShukHayir,
KingStore,
ShefaBirkatHashem,
YeinotBitan,
]
Path(RESULTS_DIRNAME).mkdir(exist_ok=True)
Path(RAW_FILES_DIRNAME).mkdir(exist_ok=True)
chain_dict = {repr(chain): chain() if callable(chain) else None for chain in SupermarketChain.__subclasses__()}
CHAINS_DICT = {
repr(chain): chain() if callable(chain) else None for chain in CHAINS_LIST
}
if __name__ == '__main__':
# TODO: change functions arguments to include all necessary parameters (e.g. chain) or split arguments
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('--promos',
help="generates a CSV file with all the promotions in the requested store",
metavar='store_id',
nargs=1,
type=SupermarketChain.store_id_type,
)
parser.add_argument('--find_promos_by_name',
help="prints all promos containing the given promo_name in the given store",
metavar=('store_id', 'promo_name'),
nargs=2,
# type=store_id_type, # TODO: add type-checking of first parameter
)
parser.add_argument('--price',
help='prints all products that contain the given name in the requested store',
metavar=('store_id', 'product_name'),
nargs=2,
)
parser.add_argument('--find_store_id',
help='prints all Shufersal stores in a given city. Input should be a city name in Hebrew',
metavar='city',
nargs=1,
)
# parser.add_argument('--all_deals',
# action='store_true',
# )
parser.add_argument('--load_prices',
help='boolean flag representing whether to load an existing price XML file',
action='store_true',
)
parser.add_argument('--load_promos',
help='boolean flag representing whether to load an existing promo XML file',
action='store_true',
)
parser.add_argument('--load_stores',
help='boolean flag representing whether to load an existing stores XML file',
action='store_true',
)
parser.add_argument('--chain',
required=True,
help='The name of the requested chain',
choices=chain_dict.keys(),
)
parser.add_argument('--type',
choices=("excel", "csv"),
default='excel',
help="a switch flag to set set the output file to a CSV file",
required=False,
)
parser.add_argument(
"--promos",
help="generates a CSV file with all the promotions in the requested store",
metavar="store_id",
nargs=1,
type=SupermarketChain.store_id_type,
)
parser.add_argument(
"--find_promos_by_name",
help="prints all promos containing the given promo_name in the given store",
metavar=("store_id", "promo_name"),
nargs=2,
)
parser.add_argument(
"--price",
help="prints all products that contain the given name in the requested store",
metavar=("store_id", "product_name"),
nargs=2,
)
parser.add_argument(
"--prices-with-promos",
help="logs all products with prices updated by promos",
metavar="store_id",
nargs=1,
type=SupermarketChain.store_id_type,
)
parser.add_argument(
"--find_store_id",
help="prints all Shufersal stores in a given city. Input should be a city name in Hebrew",
metavar="city",
nargs=1,
)
parser.add_argument(
"--load_prices",
help="boolean flag representing whether to load an existing price XML file",
action="store_true",
)
parser.add_argument(
"--load_promos",
help="boolean flag representing whether to load an existing promo XML file",
action="store_true",
)
parser.add_argument(
"--load_stores",
help="boolean flag representing whether to load an existing stores XML file",
action="store_true",
)
parser.add_argument(
"--chain",
required=True,
help="The name of the requested chain",
choices=CHAINS_DICT.keys(),
)
parser.add_argument(
"--output_filename",
help="The path to write the promotions/prices to",
type=valid_promotion_output_file,
)
parser.add_argument(
"--only_export_to_file",
help="Boolean flag representing whether only export or also open the promotion output file",
action="store_true",
)
parser.add_argument(
"--debug",
help="Boolean flag representing whether to run in debug mode",
action="store_true",
)
args = parser.parse_args()
file_type = '.xlsx' if not args.type or args.type == 'excel' else '.csv'
chain: SupermarketChain = chain_dict[args.chain]
if args.promos:
arg_store_id = int(args.promos[0])
main_latest_promos(store_id=arg_store_id, load_xml=args.load_prices, chain=chain, load_promos=args.load_promos, file_type=file_type)
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s")
chain: SupermarketChain = CHAINS_DICT[args.chain]
if args.promos or args.prices_with_promos:
arg_store_id = (
int(args.promos[0]) if args.promos else int(args.prices_with_promos[0])
)
if args.output_filename:
output_filename = args.output_filename
if args.promos and not is_valid_promotion_output_file(output_filename):
raise ValueError(
f"Output filename for promos must end with: {VALID_PROMOTION_FILE_EXTENSIONS}"
)
if args.prices_with_promos and not output_filename.endswith(".json"):
raise ValueError(f"Output filename for promos must be a json file")
directory = os.path.dirname(output_filename)
Path(directory).mkdir(parents=True, exist_ok=True)
else:
Path(RESULTS_DIRNAME).mkdir(exist_ok=True)
file_extension = ".xlsx" if args.promos else ".json"
file_type = "promos" if args.promos else "prices"
output_filename = f"{RESULTS_DIRNAME}/{repr(type(chain))}-{file_type}-{arg_store_id}-{date.today()}{file_extension}"
if args.promos:
main_latest_promos(
store_id=arg_store_id,
output_filename=output_filename,
chain=chain,
load_promos=args.load_promos,
load_prices=args.load_prices,
)
else:
items_dict = get_all_prices(
store_id=arg_store_id,
output_filename=output_filename,
chain=chain,
load_promos=args.load_promos,
load_prices=args.load_prices,
)
items_dict_to_json = {
item_code: {
k: v
for k, v in item.__dict__.items()
if not k.startswith("__") and not callable(k)
}
for item_code, item in items_dict.items()
}
with open(output_filename, "w") as fOut:
json.dump(items_dict_to_json, fOut)
if not args.only_export_to_file:
opener = "open" if sys.platform == "darwin" else "xdg-open"
subprocess.call([opener, Path(output_filename)])
# os.startfile(Path(output_filename))
logging.debug(f"Process finished at: {datetime.now()}")
elif args.price:
get_products_prices(chain, store_id=args.price[0], load_xml=args.load_prices, product_name=args.price[1])
log_products_prices(
chain,
store_id=args.price[0],
load_xml=args.load_prices,
product_name=args.price[1],
)
elif args.find_store_id:
arg_city = args.find_store_id[0]
get_store_id(city=arg_city, load_xml=args.load_stores, chain=chain)
log_stores_ids(city=arg_city, load_xml=args.load_stores, chain=chain)
elif args.find_promos_by_name:
arg_store_id = int(args.find_promos_by_name[0])
get_promos_by_name(store_id=arg_store_id, chain=chain, promo_name=args.find_promos_by_name[1],
load_prices=args.load_prices, load_promos=args.load_promos)
log_promos_by_name(
store_id=arg_store_id,
chain=chain,
promo_name=args.find_promos_by_name[1],
load_prices=args.load_prices,
load_promos=args.load_promos,
)

View File

@@ -1,28 +1,72 @@
import logging
import re
import sys
from datetime import datetime
from enum import Enum
from typing import Dict, List, Union
from bs4.element import Tag
import csv
import sys
import pandas as pd
import xlsxwriter
from tqdm import tqdm
from aenum import Enum
from item import Item
from utils import (
create_items_dict,
get_float_from_tag, xml_file_gen,
create_bs_object,
create_items_dict,
get_float_from_tag,
log_message_and_time_if_debug,
xml_file_gen,
)
from supermarket_chain import SupermarketChain
import pandas as pd
from utils import (create_bs_object, create_items_dict, get_float_from_tag,
xml_file_gen)
XML_FILES_PROMOTIONS_CATEGORIES = [
SupermarketChain.XMLFilesCategory.PromosFull,
SupermarketChain.XMLFilesCategory.Promos,
]
PROMOTION_COLS_NUM = (
15 # The length of the list returned by get_promotion_row_for_table function
)
INVALID_OR_UNKNOWN_PROMOTION_FUNCTION = -1
PRODUCTS_TO_IGNORE = ['סירים', 'מגבות', 'מגבת', 'מפות', 'פסטיגל', 'ביגי']
PROMOTIONS_TABLE_HEADERS = [
"תיאור מבצע",
"הפריט המשתתף במבצע",
"מחיר לפני מבצע",
"מחיר אחרי מבצע",
"אחוז הנחה",
"סוג מבצע",
"כמות מקס",
"כפל הנחות",
"המבצע החל",
"זמן תחילת מבצע",
"זמן סיום מבצע",
"זמן עדכון אחרון",
"יצרן",
"ברקוד פריט",
"סוג מבצע לפי תקנות שקיפות מחירים",
]
class ClubID(Enum):
מבצע_רגיל = 0
מועדון = 1
כרטיס_אשראי = 2
אחר = 3
_init_ = "value string"
REGULAR = 0, "מבצע רגיל"
CLUB = 1, "מועדון"
CREDIT_CARD = 2, "כרטיס אשראי"
OTHER = 3, "אחר"
@classmethod
def _missing_(cls, value):
return ClubID.OTHER
def __str__(self):
return self.string
class RewardType(Enum):
@@ -35,6 +79,7 @@ class RewardType(Enum):
SECOND_INSTANCE_SAME_DISCOUNT = 8
SECOND_INSTANCE_DIFFERENT_DISCOUNT = 9
DISCOUNT_IN_MULTIPLE_INSTANCES = 10
OTHER = 11
class Promotion:
@@ -43,9 +88,20 @@ class Promotion:
It contains only part of the available information in Shufersal's data.
"""
def __init__(self, content: str, start_date: datetime, end_date: datetime, update_date: datetime, items: List[Item],
promo_func: callable, club_id: ClubID, promotion_id: float, max_qty: int,
allow_multiple_discounts: bool, reward_type: RewardType, type_file: str = "excel"):
def __init__(
self,
content: str,
start_date: datetime,
end_date: datetime,
update_date: datetime,
items: List[Item],
promo_func: callable,
club_id: ClubID,
promotion_id: int,
max_qty: int,
allow_multiple_discounts: bool,
reward_type: RewardType,
):
self.content: str = content
self.start_date: datetime = start_date
self.end_date: datetime = end_date
@@ -54,148 +110,206 @@ class Promotion:
self.items: List[Item] = items
self.club_id: ClubID = club_id
self.max_qty: int = max_qty
self.allow_multiple_discounts = allow_multiple_discounts
self.reward_type = reward_type
self.promotion_id = promotion_id
self.type_file = type_file
self.allow_multiple_discounts: bool = allow_multiple_discounts
self.reward_type: RewardType = reward_type
self.promotion_id: int = promotion_id
def repr_ltr(self):
title = self.content
dates_range = f"Between {self.start_date} and {self.end_date}"
update_line = f"Updated at {self.update_date}"
return '\n'.join([title, dates_range, update_line, str(self.items)]) + '\n'
return "\n".join([title, dates_range, update_line, str(self.items)]) + "\n"
def __eq__(self, other):
return self.promotion_id == other.promotion_id
def write_promotions_to_csv(promotions: List[Promotion], output_filename: str) -> None:
def write_promotions_to_table(
promotions: List[Promotion], output_filename: str
) -> None:
"""
This function writes a given list of promotions to a given output file in a CSV format.
This function writes a List of promotions to a csv or xlsx output file.
:param promotions: A given list of promotions
:param output_filename: A given file to write to
"""
encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8"
columns = [
'תיאור מבצע',
'הפריט המשתתף במבצע',
'מחיר לפני מבצע',
'מחיר אחרי מבצע',
'אחוז הנחה',
'סוג מבצע',
'כמות מקס',
'כפל הנחות',
'המבצע החל',
'זמן תחילת מבצע',
'זמן סיום מבצע',
'זמן עדכון אחרון',
'יצרן',
'ברקוד פריט',
'סוג מבצע לפי תקנות שקיפות מחירים',
log_message_and_time_if_debug("Writing promotions to output file")
rows = [
get_promotion_row_for_table(promo, item)
for promo in promotions
for item in promo.items
]
if output_filename.endswith(".csv"):
with open(output_filename, mode='w', newline='', encoding=encoding_file) as f_out:
encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8"
with open(
output_filename, mode="w", newline="", encoding=encoding_file
) as f_out:
promos_writer = csv.writer(f_out)
promos_writer.writerow(columns)
for promo in promotions:
promos_writer.writerows([get_promotion_row_in_csv(promo, item) for item in promo.items])
promos_writer.writerow(PROMOTIONS_TABLE_HEADERS)
promos_writer.writerows(rows)
elif output_filename.endswith(".xlsx"):
df = pd.DataFrame(rows, columns=PROMOTIONS_TABLE_HEADERS)
workbook = xlsxwriter.Workbook(output_filename)
worksheet1 = workbook.add_worksheet()
worksheet1.right_to_left()
date_time_format = workbook.add_format({"num_format": "m/d/yy h:mm;@"})
number_format = workbook.add_format({"num_format": "0.00"})
percentage_format = workbook.add_format({"num_format": "0.00%"})
worksheet1.set_column("A:A", width=35)
worksheet1.set_column("B:B", width=25)
worksheet1.set_column("C:D", cell_format=number_format)
worksheet1.set_column("E:E", cell_format=percentage_format)
worksheet1.set_column("J:L", width=15, cell_format=date_time_format)
worksheet1.add_table(
first_row=0,
first_col=0,
last_row=len(df),
last_col=len(df.columns) - 1,
options={
"columns": [{"header": i} for i in PROMOTIONS_TABLE_HEADERS],
"data": df.values.tolist(),
"style": "Table Style Medium 11",
},
)
workbook.close()
else:
with pd.ExcelWriter(output_filename, 'openpyxl', datetime_format='DD/MM/YYYY') as xl:
dt = pd.DataFrame(columns=columns)
for promo in promotions:
prms = dict_promos([get_promotion_row_in_csv(promo, item) for item in promo.items], columns)
if prms:
dt = dt.append(prms, True)
else:
continue
dt.to_excel(xl, index=False, sheet_name="name")
raise ValueError(
f"The given output file has an invalid extension:\n{output_filename}"
)
def dict_promos(promos: list, columns: list):
return {col: p for prom in promos for col, p in zip(columns, prom)}
def get_promotion_row_for_table(promo: Promotion, item: Item) -> List:
"""
This function returns a row in the promotions XLSX table.
:param promo: A given Promotion object
:param item: A given item object participating in the promotion
"""
return [
promo.content,
item.name,
item.price,
promo.promo_func(item),
(item.price - promo.promo_func(item)) / max(item.price, 1),
promo.club_id.string,
promo.max_qty,
promo.allow_multiple_discounts,
promo.start_date <= datetime.now(),
promo.start_date,
promo.end_date,
promo.update_date,
item.manufacturer,
item.code,
promo.reward_type.value,
]
def get_promotion_row_in_csv(promo: Promotion, item: Item):
return [promo.content,
item.name,
item.price,
f'{promo.promo_func(item):.3f}',
f'{(item.price - promo.promo_func(item)) / item.price:.3%}',
promo.club_id.name.replace('_', ' '),
promo.max_qty,
promo.allow_multiple_discounts,
promo.start_date <= datetime.now(),
promo.start_date,
promo.end_date,
promo.update_date,
item.manufacturer,
item.code,
promo.reward_type.value]
def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bool, load_promos) -> List[Promotion]:
def get_available_promos(
chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool
) -> List[Promotion]:
"""
This function return the available promotions given a BeautifulSoup object.
:param load_promos:
:param chain: The name of the requested supermarket chain
:param store_id: A given store id
:param load_prices: A boolean representing whether to load an existing xml or load an already saved one
:param store_id: A given store ID
:param load_prices: A boolean representing whether to load an existing prices file or download it
:param load_promos: A boolean representing whether to load an existing promotion file or download it
:return: Promotions that are not included in PRODUCTS_TO_IGNORE and are currently available
"""
items_dict: Dict[str, Item] = create_items_dict(chain, load_prices, store_id)
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PromosFull.name)
bs_promos = create_bs_object(xml_path, chain, store_id, load_promos, chain.XMLFilesCategory.PromosFull)
log_message_and_time_if_debug("Importing prices XML file")
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
log_message_and_time_if_debug("Importing promotions XML file")
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
log_message_and_time_if_debug("Creating promotions objects")
promo_objs = list()
for promo in bs_promos.find_all(chain.promotion_tag_name):
promotion_id = promo.find(re.compile('PromotionId', re.IGNORECASE))
for promo in tqdm(promo_tags, desc="creating_promotions"):
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
if promo_objs and promo_objs[-1].promotion_id == promotion_id:
promo_objs[-1].items.extend(chain.get_items(promo, items_dict))
continue
promo_inst = create_new_promo_instance(chain, items_dict, promo, promotion_id)
if len(promo_inst.items) > 1000: # Too many items -> probably illegal promotion
continue
if promo_inst:
promo_objs.append(promo_inst)
return promo_objs
def create_new_promo_instance(chain, items_dict, promo, promotion_id):
def create_new_promo_instance(
chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int
) -> Union[Promotion, None]:
"""
This function generates a Promotion object from a promotion tag.
:param chain: The supermarket chain publishing the promotion
:param items_dict: A dictionary of items that might participate in the promotion
:param promo: An xml Tag representing the promotion
:param promotion_id: An integer representing the promotion ID
:return: If the promotion expired - return None, else return the Promotion object
"""
promo_end_time = datetime.strptime(
promo.find("PromotionEndDate").text + " " + promo.find("PromotionEndHour").text,
chain.date_hour_format,
)
if promo_end_time < datetime.now():
return None
reward_type = RewardType(int(promo.find("RewardType").text))
discounted_price = get_discounted_price(promo)
promo_description = promo.find('PromotionDescription').text
is_discount_in_percentage = reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
raw_discount_rate = promo.find('DiscountRate').text if promo.find('DiscountRate') else None
promo_description = promo.find("PromotionDescription").text
is_discount_in_percentage = (
reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
)
raw_discount_rate = (
promo.find("DiscountRate").text if promo.find("DiscountRate") else None
)
discount_rate = get_discount_rate(raw_discount_rate, is_discount_in_percentage)
min_qty = get_float_from_tag(promo, 'MinQty')
max_qty = get_float_from_tag(promo, 'MaxQty')
min_qty = get_float_from_tag(promo, "MinQty")
max_qty = get_float_from_tag(promo, "MaxQty")
remark = promo.find("Remark")
promo_func = find_promo_function(reward_type=reward_type, remark=remark.text if remark else '',
promo_description=promo_description, min_qty=min_qty,
discount_rate=discount_rate, discounted_price=discounted_price)
promo_start_time = datetime.strptime(promo.find('PromotionStartDate').text + ' ' +
promo.find('PromotionStartHour').text,
chain.date_hour_format)
promo_end_time = datetime.strptime(promo.find('PromotionEndDate').text + ' ' +
promo.find('PromotionEndHour').text,
chain.date_hour_format)
promo_update_time = datetime.strptime(promo.find(chain.promotion_update_tag_name).text,
chain.update_date_format)
club_id = ClubID(int(promo.find(re.compile('ClubId', re.IGNORECASE)).text))
multiple_discounts_allowed = bool(int(promo.find('AllowMultipleDiscounts').text))
promo_func = find_promo_function(
reward_type=reward_type,
remark=remark.text if remark else "",
promo_description=promo_description,
min_qty=min_qty,
discount_rate=discount_rate,
discounted_price=discounted_price,
)
promo_start_time = datetime.strptime(
promo.find("PromotionStartDate").text
+ " "
+ promo.find("PromotionStartHour").text,
chain.date_hour_format,
)
promo_update_time = datetime.strptime(
promo.find(chain.promotion_update_tag_name).text, chain.update_date_format
)
club_id = ClubID(int(promo.find(re.compile("ClubId", re.IGNORECASE)).text))
multiple_discounts_allowed = bool(int(promo.find("AllowMultipleDiscounts").text))
items = chain.get_items(promo, items_dict)
if is_valid_promo(end_time=promo_end_time, description=promo_description):
return Promotion(content=promo_description, start_date=promo_start_time, end_date=promo_end_time,
update_date=promo_update_time, items=items, promo_func=promo_func,
club_id=club_id, promotion_id=promotion_id, max_qty=max_qty,
allow_multiple_discounts=multiple_discounts_allowed, reward_type=reward_type)
return Promotion(
content=promo_description,
start_date=promo_start_time,
end_date=promo_end_time,
update_date=promo_update_time,
items=items,
promo_func=promo_func,
club_id=club_id,
promotion_id=promotion_id,
max_qty=max_qty,
allow_multiple_discounts=multiple_discounts_allowed,
reward_type=reward_type,
)
def get_discounted_price(promo):
discounted_price = promo.find('DiscountedPrice')
discounted_price = promo.find("DiscountedPrice")
if discounted_price:
return float(discounted_price.text)
@@ -203,12 +317,18 @@ def get_discounted_price(promo):
def get_discount_rate(discount_rate: Union[float, None], discount_in_percentage: bool):
if discount_rate:
if discount_in_percentage:
return int(discount_rate) * (10 ** -(len(str(discount_rate))))
return float(discount_rate) * (10 ** -(len(str(discount_rate))))
return float(discount_rate)
def find_promo_function(reward_type: RewardType, remark: str, promo_description: str, min_qty: float,
discount_rate: Union[float, None], discounted_price: Union[float, None]):
def find_promo_function(
reward_type: RewardType,
remark: str,
promo_description: str,
min_qty: float,
discount_rate: Union[float, None],
discounted_price: Union[float, None],
):
if reward_type == RewardType.SECOND_INSTANCE_DIFFERENT_DISCOUNT:
if not discounted_price:
return lambda item: item.price * (1 - (discount_rate / min_qty))
@@ -221,7 +341,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
return lambda item: item.price * (1 - (1 / min_qty))
if reward_type == RewardType.DISCOUNT_IN_PERCENTAGE:
return lambda item: item.price * (1 - discount_rate / (2 if "השני ב" in promo_description else 1))
return lambda item: item.price * (
1 - discount_rate / (2 if "השני ב" in promo_description else 1)
)
if reward_type == RewardType.SECOND_INSTANCE_SAME_DISCOUNT:
if "השני ב" in promo_description:
@@ -231,6 +353,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
if reward_type == RewardType.DISCOUNT_BY_THRESHOLD:
return lambda item: item.price - discount_rate
if reward_type == RewardType.OTHER:
return lambda item: item.price
if 'מחיר המבצע הינו המחיר לק"ג' in remark:
return lambda item: discounted_price
@@ -240,34 +365,78 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
return lambda item: INVALID_OR_UNKNOWN_PROMOTION_FUNCTION
def is_valid_promo(end_time: datetime, description) -> bool:
"""
This function returns whether a given Promotion object is currently valid.
"""
not_expired: bool = end_time >= datetime.now()
in_promo_ignore_list: bool = any(product in description for product in PRODUCTS_TO_IGNORE)
return not_expired and not in_promo_ignore_list
def main_latest_promos(
store_id: int, load_xml: bool, chain: SupermarketChain, load_promos: bool, file_type: str) -> None:
store_id: int,
output_filename,
chain: SupermarketChain,
load_promos: bool,
load_prices: bool,
) -> None:
"""
This function writes to a CSV file the available promotions in a store with a given id sorted by their update date.
This function writes to a file the available promotions in a store with a given id sorted by their update date.
:param chain: The name of the requested supermarket chain
:param store_id: A given store id
:param load_xml: A boolean representing whether to load an existing prices xml file
:param load_prices: A boolean representing whether to load an existing prices xml file
:param load_promos: A boolean representing whether to load an existing promos xml file
:param output_filename: A path to write the promotions table
"""
promotions: List[Promotion] = get_available_promos(chain, store_id, load_xml, load_promos)
promotions.sort(key=lambda promo: (max(promo.update_date.date(), promo.start_date.date()), promo.start_date -
promo.end_date), reverse=True)
ex_file = f'results/{repr(type(chain))}_promos_{store_id}{file_type}'
write_promotions_to_csv(promotions, ex_file)
promotions: List[Promotion] = get_available_promos(
chain, store_id, load_prices, load_promos
)
promotions.sort(
key=lambda promo: (
max(promo.update_date.date(), promo.start_date.date()),
promo.start_date - promo.end_date,
),
reverse=True,
)
write_promotions_to_table(promotions, output_filename)
def get_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, load_prices: bool, load_promos: bool):
def get_all_prices(
store_id: int,
output_filename,
chain: SupermarketChain,
load_promos: bool,
load_prices: bool,
):
log_message_and_time_if_debug("Importing prices XML file")
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
log_message_and_time_if_debug("Importing promotions XML file")
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
log_message_and_time_if_debug("Creating promotions objects")
promo_obj = None
for promo in tqdm(promo_tags, desc="creating_promotions"):
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
if promo_obj is None or promo_obj.promotion_id != promotion_id:
promo_obj = create_new_promo_instance(
chain, items_dict, promo, promotion_id
)
if promo_obj.club_id == ClubID.REGULAR:
promo_items = promo.find_all("Item")
if len(promo_items) > 1000: # Too many items -> probably illegal promotion
continue
for item in promo_items:
item_code = item.find("ItemCode").text
cur_item = items_dict.get(item_code)
if cur_item is not None:
discounted_price = promo_obj.promo_func(cur_item)
if cur_item.price > discounted_price:
cur_item.final_price = discounted_price
return items_dict
def log_promos_by_name(
store_id: int,
chain: SupermarketChain,
promo_name: str,
load_prices: bool,
load_promos: bool,
):
"""
This function prints all promotions in a given chain and store_id containing a given promo_name.
@@ -277,23 +446,49 @@ def get_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str,
:param load_prices: A boolean representing whether to load an saved prices XML file or scrape a new one
:param load_promos: A boolean representing whether to load an saved XML file or scrape a new one
"""
promotions: List[Promotion] = get_available_promos(chain, store_id, load_prices, load_promos)
promotions: List[Promotion] = get_available_promos(
chain, store_id, load_prices, load_promos
)
for promo in promotions:
if promo_name in promo.content:
print(promo.repr_ltr())
logging.info(promo.repr_ltr())
# TODO: change to returning list of Items
def get_all_null_items_in_promos(chain, store_id) -> List[str]:
"""
This function finds all items appearing in the chain's promotions file but not in the chain's prices file.
Outdated.
"""
items_dict: Dict[str, Item] = create_items_dict(chain, True, store_id)
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PromosFull.name)
bs_promos = create_bs_object(xml_path, chain, store_id, True, chain.XMLFilesCategory.PromosFull)
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml=True)
promo_tags = get_all_promos_tags(chain, store_id, load_xml=True)
return [
item
for promo_tag in promo_tags
for item in chain.get_null_items(promo_tag, items_dict)
]
null_items = list()
for promo in bs_promos.find_all(chain.promotion_tag_name):
null_items.extend(chain.get_null_items(promo, items_dict))
return null_items
def get_all_promos_tags(
chain: SupermarketChain, store_id: int, load_xml: bool
) -> List[Tag]:
"""
This function gets all the promotions tags for a given store in a given chain.
It includes both the full and not full promotions files.
:param chain: A given supermarket chain
:param store_id: A given store ID
:param load_xml: A boolean representing whether to try loading the promotions from an existing XML file
:return: A list of promotions tags
"""
bs_objects = list()
for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc="promotions_files"):
xml_path = xml_file_gen(chain, store_id, category.name)
bs_objects.append(
create_bs_object(chain, store_id, category, load_xml, xml_path)
)
return [
promo
for bs_obj in bs_objects
for promo in bs_obj.find_all(chain.promotion_tag_name)
]

View File

@@ -6,5 +6,12 @@ lxml==4.6.1
requests==2.25.0
soupsieve==2.0.1
urllib3==1.26.2
pandas>=1.1
openpyxl>=3.0.1
openpyxl
tqdm~=4.62.1
pytest~=6.2.2
pandas~=1.2.0
argparse~=1.4.0
XlsxWriter~=1.4.3
aenum
selenium
webdriver-manager

View File

@@ -1,28 +1,22 @@
from utils import xml_file_gen, create_bs_object
from supermarket_chain import SupermarketChain
import logging
from bs4 import BeautifulSoup
from utils import xml_file_gen, create_bs_object
from supermarket_chain import SupermarketChain
def get_store_id(city: str, load_xml: bool, chain: SupermarketChain):
def log_stores_ids(city: str, load_xml: bool, chain: SupermarketChain):
"""
This function prints the store_ids of stores in a given city.
The city must match exactly to its spelling in Shufersal's website (hence it should be in Hebrew alphabet).
This function prints the stores IDs of stores in a given city.
The city must match its spelling in Shufersal's website (hence it should be in Hebrew).
:param chain: A given supermarket chain
:param load_xml: A boolean representing whether to load an existing xml or load an already saved one
:param city: A string representing the city of the requested store.
"""
xml_path: str = xml_file_gen(chain, -1, chain.XMLFilesCategory.Stores.name)
bs_stores: BeautifulSoup = create_bs_object(xml_path, chain, -1, load_xml, chain.XMLFilesCategory.Stores)
bs_stores: BeautifulSoup = create_bs_object(chain, -1, chain.XMLFilesCategory.Stores, load_xml, xml_path)
for store in bs_stores.find_all("STORE"):
if store.find("CITY").text == city:
print((store.find("ADDRESS").text, store.find("STOREID").text, store.find("SUBCHAINNAME").text))
def get_all_deals(chain):
xml_path: str = xml_file_gen(chain, -1, chain.XMLFilesCategory.Stores.name)
bs_stores: BeautifulSoup = create_bs_object(xml_path, chain, -1, True, chain.XMLFilesCategory.Stores)
return [int(store.find("STOREID").text) for store in bs_stores.find_all("STORE") if store.find("SUBCHAINID").text
== "2"]
logging.info((store.find("ADDRESS").text, store.find("STOREID").text, store.find("SUBCHAINNAME").text))

View File

@@ -1,10 +1,9 @@
import re
from abc import abstractmethod
from enum import Enum
from argparse import ArgumentTypeError
from typing import Dict, List
import requests
from aenum import Enum
from bs4.element import Tag
from item import Item
@@ -24,14 +23,15 @@ class SupermarketChain(object, metaclass=Meta):
"""
An enum class of different XML files produced by a supermarket chain
"""
All, Prices, PricesFull, Promos, PromosFull, Stores = range(6)
_promotion_tag_name = 'Promotion'
_promotion_update_tag_name = 'PromotionUpdateDate'
_date_format = '%Y-%m-%d'
_date_hour_format = '%Y-%m-%d %H:%M'
_update_date_format = '%Y-%m-%d %H:%M'
_item_tag_name = 'Item'
_promotion_tag_name = "Promotion"
_promotion_update_tag_name = "PromotionUpdateDate"
_date_format = "%Y-%m-%d"
_date_hour_format = "%Y-%m-%d %H:%M"
_update_date_format = "%Y-%m-%d %H:%M"
_item_tag_name = "Item"
@property
def promotion_tag_name(self):
@@ -75,19 +75,24 @@ class SupermarketChain(object, metaclass=Meta):
:return: The given store_id if valid, else raise an ArgumentTypeError.
"""
if not SupermarketChain.is_valid_store_id(int(store_id)):
raise ArgumentTypeError(f"Given store_id: {store_id} is not a valid store_id.")
raise ArgumentTypeError(
f"Given store_id: {store_id} is not a valid store_id."
)
return store_id
@staticmethod
@abstractmethod
def get_download_url(store_id: int, category: XMLFilesCategory, session: requests.Session) -> str:
def get_download_url_or_path(
store_id: int, category: XMLFilesCategory, session: requests.Session
) -> str:
"""
This method scrapes supermarket's website and returns a url containing the data for a given store and category.
This method scrapes the supermarket's website and according to the given store id and category,
it returns a url containing the data or or a path to a gz file containing the data.
:param session:
:param store_id: A given id of a store
:param store_id: A given ID of a store
:param category: A given category
:return: A downloadable link of the data for a given store and category
:param session: A given session object
"""
pass
@@ -100,8 +105,8 @@ class SupermarketChain(object, metaclass=Meta):
:param items_dict: A given dictionary of products
"""
items = list()
for item in promo.find_all('Item'):
item_code = item.find('ItemCode').text
for item in promo.find_all("Item"):
item_code = item.find("ItemCode").text
full_item_info = items_dict.get(item_code)
if full_item_info:
items.append(full_item_info)
@@ -112,17 +117,8 @@ class SupermarketChain(object, metaclass=Meta):
"""
This function returns all the items in a given promotion which do not appear in the given items_dict.
"""
return [item.find('ItemCode').text for item in promo.find_all('Item')
if not items_dict.get(item.find('ItemCode').text)]
@staticmethod
def get_item_info(item: Tag) -> Item:
"""
This function returns a string containing important information about a given supermarket's product.
"""
return Item(
name=item.find(re.compile(r'ItemN[a]?m[e]?')).text,
price=float(item.find('ItemPrice').text),
manufacturer=item.find(re.compile(r'Manufacture[r]?Name')).text,
code=item.find('ItemCode').text
)
return [
item.find("ItemCode").text
for item in promo.find_all("Item")
if not items_dict.get(item.find("ItemCode").text)
]

View File

@@ -1,3 +1,5 @@
import sys,os
sys.path.append(os.path.abspath(os.curdir))
from item import Item
from promotion import RewardType, find_promo_function, get_discount_rate
@@ -19,7 +21,7 @@ def test_shufersal_promo_type_1():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('פטה פיראוס 20%', 113, '', '')
item = Item('פטה פיראוס 20%', 113, 1, '', '')
assert promo_func(item) == 100
@@ -38,7 +40,7 @@ def test_shufersal_promo_type_2():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('חגיגת גרנולה פ.יבשים500ג', 26.9, '', '')
item = Item('חגיגת גרנולה פ.יבשים500ג', 26.9, 1, '', '')
assert promo_func(item) == 21.52
@@ -57,7 +59,7 @@ def test_shufersal_promo_type_6_1():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('פסטרמה מקסיקנית במשקל', 89, '', '')
item = Item('פסטרמה מקסיקנית במשקל', 89, 1, '', '')
assert promo_func(item) == 89
@@ -76,7 +78,7 @@ def test_shufersal_promo_type_6_2():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('מכונת לוואצה ג\'ולי אדומה', 449, '', '')
item = Item('מכונת לוואצה ג\'ולי אדומה', 449, 1, '', '')
assert promo_func(item) == 449
@@ -95,7 +97,7 @@ def test_shufersal_promo_type_7_1():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('פינצטה 2011 שחורה/כסופה', 14.9, '', '')
item = Item('פינצטה 2011 שחורה/כסופה', 14.9, 1, '', '')
assert promo_func(item) == 7.45
@@ -114,7 +116,7 @@ def test_shufersal_promo_type_7_2():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('יוגורט עיזים 500 גרם', 12.9, '', '')
item = Item('יוגורט עיזים 500 גרם', 12.9, 1, '', '')
assert promo_func(item) == 12.9 * 0.75
@@ -133,7 +135,7 @@ def test_shufersal_promo_type_9_1():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('זיתים מבוקעים פיקנטי540ג', 9.3, '', '')
item = Item('זיתים מבוקעים פיקנטי540ג', 9.3, 1, '', '')
assert promo_func(item) == 9.3 * 0.75
@@ -152,7 +154,7 @@ def test_shufersal_promo_type_9_2():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('שעועית לבנה שופרסל 800גר', 18.9, '', '')
item = Item('שעועית לבנה שופרסל 800גר', 18.9, 1, '', '')
assert promo_func(item) == (18.9 + 10) / 2
@@ -171,7 +173,7 @@ def test_shufersal_promo_type_9_3():
discount_rate=discount_rate,
discounted_price=discounted_price,
)
item = Item('גומיות שחורות 12 יח', 9.9, '', '')
item = Item('גומיות שחורות 12 יח', 9.9, 1, '', '')
assert promo_func(item) == 9.9 * 0.75
@@ -190,7 +192,7 @@ def test_shufersal_promo_type_10_1():
discount_rate=discount_rate,
discounted_price=discounted_price
)
item = Item('טופו טעם טבעי 300 גרם', 10.9, '', '7296073345763')
item = Item('טופו טעם טבעי 300 גרם', 10.9, 1, '7296073345763', '')
assert promo_func(item) == 5
@@ -209,7 +211,7 @@ def test_shufersal_promo_type_10_2():
discount_rate=discount_rate,
discounted_price=discounted_price
)
item = Item('טופו טעם טבעי 300 גרם', 10.9, 'כפרי בריא משק ויילר', '7296073345763')
item = Item('טופו טעם טבעי 300 גרם', 10.9, 1, '7296073345763', 'כפרי בריא משק ויילר')
assert promo_func(item) == 7
@@ -225,7 +227,7 @@ def assert_discount(discounted_price, item_barcode, item_manufacturer, item_name
discount_rate=discount_rate,
discounted_price=discounted_price
)
item = Item(item_name, orig_price, item_manufacturer, item_barcode)
item = Item(item_name, orig_price, 1, item_barcode, item_manufacturer)
assert abs(promo_func(item) - price_after_discount) <= 1e-5, promo_description

View File

@@ -0,0 +1,125 @@
import logging
import os
import re
import tempfile
import pandas as pd
import pytest
import requests
from chains.bareket import Bareket
from chains.co_op import CoOp
from chains.dor_alon import DorAlon
from chains.keshet import Keshet
from chains.shuk_hayir import ShukHayir
from chains.stop_market import StopMarket
from chains.tiv_taam import TivTaam
from chains.yeinot_bitan import YeinotBitan
from chains.zol_vebegadol import ZolVebegadol
from main import CHAINS_DICT
from promotion import PROMOTION_COLS_NUM, main_latest_promos
from supermarket_chain import SupermarketChain
pytest.main(args=["-s", os.path.abspath(__file__)])
session = requests.Session()
MIN_NUM_OF_PROMOS = 3
@pytest.mark.parametrize("chain_tuple", CHAINS_DICT.items())
def test_searching_for_download_urls(chain_tuple):
"""
Test that get_download_url of each chain returns the correct download url for each category in every chain.
"""
chain_name, chain = chain_tuple
logging.info(f"Checking download urls in chain {chain_name}")
store_id: int = valid_store_id_by_chain(chain_name)
_test_download_url_helper(
chain, store_id, chain.XMLFilesCategory.PromosFull, r"promo[s]?full", session
)
_test_download_url_helper(
chain, store_id, chain.XMLFilesCategory.Promos, r"promo[s]?", session
)
_test_download_url_helper(
chain, store_id, chain.XMLFilesCategory.PricesFull, r"price[s]?full", session
)
_test_download_url_helper(
chain, store_id, chain.XMLFilesCategory.Prices, r"price[s]?", session
)
def _test_download_url_helper(
chain: SupermarketChain,
store_id: int,
category: SupermarketChain.XMLFilesCategory,
regex_pat: str,
session: requests.session,
):
download_url: str = chain.get_download_url_or_path(store_id, category, session)
if not download_url: # Not found non-full Promos/Prices file
return
logging.debug(download_url)
assert re.search(
regex_pat, download_url, re.IGNORECASE
), f"Invalid {category.name} url in {repr(type(chain))}"
if category in [chain.XMLFilesCategory.Prices, chain.XMLFilesCategory.Promos]:
assert not re.search(
"full", download_url, re.IGNORECASE
), f"Downloaded the full {category.name} file mistakenly in {repr(type(chain))}"
@pytest.mark.parametrize("chain_tuple", CHAINS_DICT.items())
def test_promotions_scraping(chain_tuple):
"""
Test scraping of promotions is completed successfully and a valid xlsx file is generated as an output.
"""
chain_name, chain = chain_tuple
tf = tempfile.NamedTemporaryFile(suffix=".xlsx")
logging.info(f"Test scraping promotions from {chain_name}")
store_id: int = valid_store_id_by_chain(chain_name)
try:
main_latest_promos(
store_id=store_id,
output_filename=tf.name,
chain=chain,
load_promos=False,
load_prices=False,
)
df = pd.read_excel(tf.name)
except Exception as e:
logging.error(e)
logging.error(f"Failed loading excel of {chain_name}")
raise
assert (
df.shape[0] > MIN_NUM_OF_PROMOS and df.shape[1] == PROMOTION_COLS_NUM
), f"Failed scraping {chain_name}"
def valid_store_id_by_chain(chain_name) -> int:
"""
This function returns a valid store ID for a given chain.
:param chain_name: The name of a chain as returned by repr(ChainClassName).
:return: An integer representing a valid store ID in the given chain
"""
if chain_name == repr(DorAlon):
store_id = 501
elif chain_name in [repr(TivTaam), repr(Bareket)]:
store_id = 2
elif chain_name == repr(CoOp):
store_id = 202
elif chain_name in [repr(ShukHayir), repr(ZolVebegadol)]:
store_id = 4
elif chain_name in [repr(StopMarket), repr(Keshet)]:
store_id = 5
elif chain_name == repr(YeinotBitan):
store_id = 3700
else:
store_id = 1
return store_id

148
utils.py
View File

@@ -1,16 +1,24 @@
import gzip
import io
import logging
import os.path
import zipfile
from argparse import ArgumentTypeError
from datetime import date
from datetime import datetime
from os import path
from typing import AnyStr, Dict
import requests
from bs4 import BeautifulSoup
from os import path
from tqdm import tqdm
from item import Item
from supermarket_chain import SupermarketChain
RESULTS_DIRNAME = "results"
RAW_FILES_DIRNAME = "raw_files"
VALID_PROMOTION_FILE_EXTENSIONS = [".csv", ".xlsx"]
def xml_file_gen(chain: SupermarketChain, store_id: int, category_name: str) -> str:
@@ -23,12 +31,22 @@ def xml_file_gen(chain: SupermarketChain, store_id: int, category_name: str) ->
:param category_name: A given category name
:return: An xml filename
"""
store_id_str: str = f"-{str(store_id)}" if SupermarketChain.is_valid_store_id(store_id) else ""
return path.join(RAW_FILES_DIRNAME, f"{repr(type(chain))}-{category_name}{store_id_str}.xml")
store_id_str: str = (
f"-{str(store_id)}" if SupermarketChain.is_valid_store_id(store_id) else ""
)
return path.join(
RAW_FILES_DIRNAME,
f"{repr(type(chain))}-{category_name}{store_id_str}-{date.today()}.xml",
)
def create_bs_object(xml_path: str, chain: SupermarketChain, store_id: int, load_xml: bool,
category: SupermarketChain.XMLFilesCategory) -> BeautifulSoup:
def create_bs_object(
chain: SupermarketChain,
store_id: int,
category: SupermarketChain.XMLFilesCategory,
load_xml: bool,
xml_path: str,
) -> BeautifulSoup:
"""
This function creates a BeautifulSoup (BS) object according to the given parameters.
In case the given load_xml is True and the XML file exists, the function creates the BS object from the given
@@ -43,14 +61,18 @@ def create_bs_object(xml_path: str, chain: SupermarketChain, store_id: int, load
:return: A BeautifulSoup object with xml content.
"""
if load_xml and path.isfile(xml_path):
return create_bs_object_from_xml(xml_path)
return create_bs_object_from_link(xml_path, chain, category, store_id)
return get_bs_object_from_xml(xml_path)
return get_bs_object_from_link(chain, store_id, category, xml_path)
def create_bs_object_from_link(xml_path: str, chain: SupermarketChain, category: SupermarketChain.XMLFilesCategory,
store_id: int) -> BeautifulSoup:
def get_bs_object_from_link(
chain: SupermarketChain,
store_id: int,
category: SupermarketChain.XMLFilesCategory,
xml_path: str,
) -> BeautifulSoup:
"""
This function creates a BeautifulSoup (BS) object by generating a download link from Shufersal's API.
This function creates a BeautifulSoup (BS) object by generating a download link the given chain's API.
:param chain: A given supermarket chain
:param xml_path: A given path to an XML file to load/save the BS object from/to.
@@ -59,45 +81,72 @@ def create_bs_object_from_link(xml_path: str, chain: SupermarketChain, category:
:return: A BeautifulSoup object with xml content.
"""
session = requests.Session()
download_url: str = chain.get_download_url(store_id, category, session)
response_content = session.get(download_url).content
try:
xml_content: AnyStr = gzip.decompress(response_content)
except gzip.BadGzipFile:
with zipfile.ZipFile(io.BytesIO(response_content)) as the_zip:
zip_info = the_zip.infolist()[0]
with the_zip.open(zip_info) as the_file:
xml_content = the_file.read()
with open(xml_path, 'wb') as f_out:
download_url_or_path: str = chain.get_download_url_or_path(store_id, category, session)
if not download_url_or_path:
return BeautifulSoup()
if os.path.isfile(download_url_or_path):
with gzip.open(download_url_or_path) as fIn:
xml_content = fIn.read()
os.remove(download_url_or_path) # Delete gz file
else:
response_content = session.get(download_url_or_path).content
try:
xml_content: AnyStr = gzip.decompress(response_content)
except gzip.BadGzipFile:
with zipfile.ZipFile(io.BytesIO(response_content)) as the_zip:
zip_info = the_zip.infolist()[0]
with the_zip.open(zip_info) as the_file:
xml_content = the_file.read()
with open(xml_path, "wb") as f_out:
f_out.write(xml_content)
return BeautifulSoup(xml_content, features='xml')
return BeautifulSoup(xml_content, features="xml")
def create_bs_object_from_xml(xml_path: str) -> BeautifulSoup:
def get_bs_object_from_xml(xml_path: str) -> BeautifulSoup:
"""
This function creates a BeautifulSoup (BS) object from a given XML file.
:param xml_path: A given path to an xml file to load/save the BS object from/to.
:return: A BeautifulSoup object with xml content.
"""
with open(xml_path, 'rb') as f_in:
return BeautifulSoup(f_in, features='xml')
with open(xml_path, "rb") as f_in:
return BeautifulSoup(f_in, features="xml")
def create_items_dict(chain: SupermarketChain, load_xml, store_id: int) -> Dict[str, Item]:
def create_items_dict(
chain: SupermarketChain, store_id: int, load_xml
) -> Dict[str, Item]:
"""
This function creates a dictionary where every key is an item code and its value is its corresponding Item instance.
We take both full and not full prices files, and assume that the no full is more updated (in case of overwriting).
:param chain: A given supermarket chain
:param load_xml: A boolean representing whether to load an existing prices xml file
:param store_id: A given store id
"""
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PricesFull.name)
bs_prices: BeautifulSoup = create_bs_object(xml_path, chain, store_id, load_xml, chain.XMLFilesCategory.PricesFull)
return {item.find('ItemCode').text: chain.get_item_info(item) for item in bs_prices.find_all(chain.item_tag_name)}
items_dict = dict()
for category in tqdm(
[chain.XMLFilesCategory.PricesFull, chain.XMLFilesCategory.Prices],
desc="prices_files",
):
xml_path: str = xml_file_gen(chain, store_id, category.name)
bs_prices: BeautifulSoup = create_bs_object(
chain, store_id, category, load_xml, xml_path
)
items_tags = bs_prices.find_all(chain.item_tag_name)
items_dict.update(
{
item_tag.find("ItemCode").text: Item.from_tag(item_tag)
for item_tag in items_tags
}
)
return items_dict
def get_products_prices(chain: SupermarketChain, store_id: int, load_xml: bool, product_name: str) -> None:
def log_products_prices(
chain: SupermarketChain, store_id: int, load_xml: bool, product_name: str
) -> None:
"""
This function prints the products in a given store which contains a given product_name.
@@ -106,20 +155,37 @@ def get_products_prices(chain: SupermarketChain, store_id: int, load_xml: bool,
:param product_name: A given product name
:param load_xml: A boolean representing whether to load an existing xml or load an already saved one
"""
xml_path: str = xml_file_gen(chain, store_id, chain.XMLFilesCategory.PricesFull.name)
bs_prices: BeautifulSoup = create_bs_object(xml_path, chain, store_id, load_xml, chain.XMLFilesCategory.PricesFull)
prods = [item for item in bs_prices.find_all("Item") if product_name in item.find("ItemName").text]
prods.sort(key=lambda x: float(x.find("UnitOfMeasurePrice").text))
for prod in prods:
print(
(
prod.find('ItemName').text[::-1],
prod.find('ManufacturerName').text[::-1],
prod.find('ItemPrice').text
)
)
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml)
products_by_name = [
item for item in items_dict.values() if product_name in item.name
]
products_by_name_sorted_by_price = sorted(
products_by_name, key=lambda item: item.price_by_measure
)
for prod in products_by_name_sorted_by_price:
logging.info(prod)
def get_float_from_tag(tag, int_tag) -> int:
content = tag.find(int_tag)
return float(content.text) if content else 0
def is_valid_promotion_output_file(output_file: str) -> bool:
return any(
output_file.endswith(extension) for extension in VALID_PROMOTION_FILE_EXTENSIONS
)
def valid_promotion_output_file(output_file: str) -> str:
if not is_valid_promotion_output_file(output_file):
raise ArgumentTypeError(
f"Given output file has an invalid extension is invalid: {output_file}"
)
return output_file
def log_message_and_time_if_debug(msg: str) -> None:
logging.info(msg)
logging.debug(datetime.now())