Fixed the bug with cerberus_web_client.py by working with Selenium. To login each chain working with it must have a username for login with Selenium. in this mechanism, a path to a gz file is returned instead of url

Added the option to output a prices json file in main.py under --prices-with-promos, where the prices are updated by the latest promotions (under the 'final_price' key, where 'price' represents the price before promotions).

Fixed small bug of BinaWebCleint by checking that filename does not contain 'null'.

Changed Hierarchy of chains such that it includes the webclients.

Added the date to the output filenames to start storing the data over time.

Black formatting (according to pip 8 guidelines).

Changed the chains_dict in main to a constant one.
This commit is contained in:
korenlazar
2022-10-04 11:42:36 +03:00
parent b5db721a3d
commit ceff48dbd9
28 changed files with 796 additions and 406 deletions

View File

@@ -12,45 +12,51 @@ from aenum import Enum
from item import Item
from utils import (
create_bs_object, create_items_dict,
create_bs_object,
create_items_dict,
get_float_from_tag,
log_message_and_time_if_debug, xml_file_gen,
log_message_and_time_if_debug,
xml_file_gen,
)
from supermarket_chain import SupermarketChain
XML_FILES_PROMOTIONS_CATEGORIES = [SupermarketChain.XMLFilesCategory.PromosFull,
SupermarketChain.XMLFilesCategory.Promos]
XML_FILES_PROMOTIONS_CATEGORIES = [
SupermarketChain.XMLFilesCategory.PromosFull,
SupermarketChain.XMLFilesCategory.Promos,
]
PROMOTION_COLS_NUM = 15 # The length of the list returned by get_promotion_row_for_table function
PROMOTION_COLS_NUM = (
15 # The length of the list returned by get_promotion_row_for_table function
)
INVALID_OR_UNKNOWN_PROMOTION_FUNCTION = -1
PROMOTIONS_TABLE_HEADERS = [
'תיאור מבצע',
'הפריט המשתתף במבצע',
'מחיר לפני מבצע',
'מחיר אחרי מבצע',
'אחוז הנחה',
'סוג מבצע',
'כמות מקס',
'כפל הנחות',
'המבצע החל',
'זמן תחילת מבצע',
'זמן סיום מבצע',
'זמן עדכון אחרון',
'יצרן',
'ברקוד פריט',
'סוג מבצע לפי תקנות שקיפות מחירים',
"תיאור מבצע",
"הפריט המשתתף במבצע",
"מחיר לפני מבצע",
"מחיר אחרי מבצע",
"אחוז הנחה",
"סוג מבצע",
"כמות מקס",
"כפל הנחות",
"המבצע החל",
"זמן תחילת מבצע",
"זמן סיום מבצע",
"זמן עדכון אחרון",
"יצרן",
"ברקוד פריט",
"סוג מבצע לפי תקנות שקיפות מחירים",
]
class ClubID(Enum):
_init_ = 'value string'
_init_ = "value string"
REGULAR = 0, 'מבצע רגיל'
CLUB = 1, 'מועדון'
CREDIT_CARD = 2, 'כרטיס אשראי'
OTHER = 3, 'אחר'
REGULAR = 0, "מבצע רגיל"
CLUB = 1, "מועדון"
CREDIT_CARD = 2, "כרטיס אשראי"
OTHER = 3, "אחר"
@classmethod
def _missing_(cls, value):
@@ -79,9 +85,20 @@ class Promotion:
It contains only part of the available information in Shufersal's data.
"""
def __init__(self, content: str, start_date: datetime, end_date: datetime, update_date: datetime, items: List[Item],
promo_func: callable, club_id: ClubID, promotion_id: int, max_qty: int,
allow_multiple_discounts: bool, reward_type: RewardType):
def __init__(
self,
content: str,
start_date: datetime,
end_date: datetime,
update_date: datetime,
items: List[Item],
promo_func: callable,
club_id: ClubID,
promotion_id: int,
max_qty: int,
allow_multiple_discounts: bool,
reward_type: RewardType,
):
self.content: str = content
self.start_date: datetime = start_date
self.end_date: datetime = end_date
@@ -98,41 +115,49 @@ class Promotion:
title = self.content
dates_range = f"Between {self.start_date} and {self.end_date}"
update_line = f"Updated at {self.update_date}"
return '\n'.join([title, dates_range, update_line, str(self.items)]) + '\n'
return "\n".join([title, dates_range, update_line, str(self.items)]) + "\n"
def __eq__(self, other):
return self.promotion_id == other.promotion_id
def write_promotions_to_table(promotions: List[Promotion], output_filename: str) -> None:
def write_promotions_to_table(
promotions: List[Promotion], output_filename: str
) -> None:
"""
This function writes a List of promotions to a csv or xlsx output file.
:param promotions: A given list of promotions
:param output_filename: A given file to write to
"""
log_message_and_time_if_debug('Writing promotions to output file')
rows = [get_promotion_row_for_table(promo, item) for promo in promotions for item in promo.items]
if output_filename.endswith('.csv'):
log_message_and_time_if_debug("Writing promotions to output file")
rows = [
get_promotion_row_for_table(promo, item)
for promo in promotions
for item in promo.items
]
if output_filename.endswith(".csv"):
encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8"
with open(output_filename, mode='w', newline='', encoding=encoding_file) as f_out:
with open(
output_filename, mode="w", newline="", encoding=encoding_file
) as f_out:
promos_writer = csv.writer(f_out)
promos_writer.writerow(PROMOTIONS_TABLE_HEADERS)
promos_writer.writerows(rows)
elif output_filename.endswith('.xlsx'):
elif output_filename.endswith(".xlsx"):
df = pd.DataFrame(rows, columns=PROMOTIONS_TABLE_HEADERS)
workbook = xlsxwriter.Workbook(output_filename)
worksheet1 = workbook.add_worksheet()
worksheet1.right_to_left()
date_time_format = workbook.add_format({'num_format': 'm/d/yy h:mm;@'})
number_format = workbook.add_format({'num_format': '0.00'})
percentage_format = workbook.add_format({'num_format': '0.00%'})
worksheet1.set_column('A:A', width=35)
worksheet1.set_column('B:B', width=25)
worksheet1.set_column('C:D', cell_format=number_format)
worksheet1.set_column('E:E', cell_format=percentage_format)
worksheet1.set_column('J:L', width=15, cell_format=date_time_format)
date_time_format = workbook.add_format({"num_format": "m/d/yy h:mm;@"})
number_format = workbook.add_format({"num_format": "0.00"})
percentage_format = workbook.add_format({"num_format": "0.00%"})
worksheet1.set_column("A:A", width=35)
worksheet1.set_column("B:B", width=25)
worksheet1.set_column("C:D", cell_format=number_format)
worksheet1.set_column("E:E", cell_format=percentage_format)
worksheet1.set_column("J:L", width=15, cell_format=date_time_format)
worksheet1.add_table(
first_row=0,
first_col=0,
@@ -141,12 +166,15 @@ def write_promotions_to_table(promotions: List[Promotion], output_filename: str)
options={
"columns": [{"header": i} for i in PROMOTIONS_TABLE_HEADERS],
"data": df.values.tolist(),
'style': 'Table Style Medium 11',
}, )
"style": "Table Style Medium 11",
},
)
workbook.close()
else:
raise ValueError(f"The given output file has an invalid extension:\n{output_filename}")
raise ValueError(
f"The given output file has an invalid extension:\n{output_filename}"
)
def get_promotion_row_for_table(promo: Promotion, item: Item) -> List:
@@ -175,8 +203,9 @@ def get_promotion_row_for_table(promo: Promotion, item: Item) -> List:
]
def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool) \
-> List[Promotion]:
def get_available_promos(
chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool
) -> List[Promotion]:
"""
This function return the available promotions given a BeautifulSoup object.
@@ -186,15 +215,15 @@ def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bo
:param load_promos: A boolean representing whether to load an existing promotion file or download it
:return: Promotions that are not included in PRODUCTS_TO_IGNORE and are currently available
"""
log_message_and_time_if_debug('Importing prices XML file')
log_message_and_time_if_debug("Importing prices XML file")
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
log_message_and_time_if_debug('Importing promotions XML file')
log_message_and_time_if_debug("Importing promotions XML file")
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
log_message_and_time_if_debug('Creating promotions objects')
log_message_and_time_if_debug("Creating promotions objects")
promo_objs = list()
for promo in tqdm(promo_tags, desc='creating_promotions'):
promotion_id = int(promo.find(re.compile('PromotionId', re.IGNORECASE)).text)
for promo in tqdm(promo_tags, desc="creating_promotions"):
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
if promo_objs and promo_objs[-1].promotion_id == promotion_id:
promo_objs[-1].items.extend(chain.get_items(promo, items_dict))
continue
@@ -206,8 +235,9 @@ def get_available_promos(chain: SupermarketChain, store_id: int, load_prices: bo
return promo_objs
def create_new_promo_instance(chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int) \
-> Union[Promotion, None]:
def create_new_promo_instance(
chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int
) -> Union[Promotion, None]:
"""
This function generates a Promotion object from a promotion tag.
@@ -217,41 +247,64 @@ def create_new_promo_instance(chain: SupermarketChain, items_dict: Dict[str, Ite
:param promotion_id: An integer representing the promotion ID
:return: If the promotion expired - return None, else return the Promotion object
"""
promo_end_time = datetime.strptime(promo.find('PromotionEndDate').text + ' ' +
promo.find('PromotionEndHour').text,
chain.date_hour_format)
promo_end_time = datetime.strptime(
promo.find("PromotionEndDate").text + " " + promo.find("PromotionEndHour").text,
chain.date_hour_format,
)
if promo_end_time < datetime.now():
return None
reward_type = RewardType(int(promo.find("RewardType").text))
discounted_price = get_discounted_price(promo)
promo_description = promo.find('PromotionDescription').text
is_discount_in_percentage = reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
raw_discount_rate = promo.find('DiscountRate').text if promo.find('DiscountRate') else None
promo_description = promo.find("PromotionDescription").text
is_discount_in_percentage = (
reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
)
raw_discount_rate = (
promo.find("DiscountRate").text if promo.find("DiscountRate") else None
)
discount_rate = get_discount_rate(raw_discount_rate, is_discount_in_percentage)
min_qty = get_float_from_tag(promo, 'MinQty')
max_qty = get_float_from_tag(promo, 'MaxQty')
min_qty = get_float_from_tag(promo, "MinQty")
max_qty = get_float_from_tag(promo, "MaxQty")
remark = promo.find("Remark")
promo_func = find_promo_function(reward_type=reward_type, remark=remark.text if remark else '',
promo_description=promo_description, min_qty=min_qty,
discount_rate=discount_rate, discounted_price=discounted_price)
promo_start_time = datetime.strptime(promo.find('PromotionStartDate').text + ' ' +
promo.find('PromotionStartHour').text,
chain.date_hour_format)
promo_update_time = datetime.strptime(promo.find(chain.promotion_update_tag_name).text,
chain.update_date_format)
club_id = ClubID(int(promo.find(re.compile('ClubId', re.IGNORECASE)).text))
multiple_discounts_allowed = bool(int(promo.find('AllowMultipleDiscounts').text))
promo_func = find_promo_function(
reward_type=reward_type,
remark=remark.text if remark else "",
promo_description=promo_description,
min_qty=min_qty,
discount_rate=discount_rate,
discounted_price=discounted_price,
)
promo_start_time = datetime.strptime(
promo.find("PromotionStartDate").text
+ " "
+ promo.find("PromotionStartHour").text,
chain.date_hour_format,
)
promo_update_time = datetime.strptime(
promo.find(chain.promotion_update_tag_name).text, chain.update_date_format
)
club_id = ClubID(int(promo.find(re.compile("ClubId", re.IGNORECASE)).text))
multiple_discounts_allowed = bool(int(promo.find("AllowMultipleDiscounts").text))
items = chain.get_items(promo, items_dict)
return Promotion(content=promo_description, start_date=promo_start_time, end_date=promo_end_time,
update_date=promo_update_time, items=items, promo_func=promo_func,
club_id=club_id, promotion_id=promotion_id, max_qty=max_qty,
allow_multiple_discounts=multiple_discounts_allowed, reward_type=reward_type)
return Promotion(
content=promo_description,
start_date=promo_start_time,
end_date=promo_end_time,
update_date=promo_update_time,
items=items,
promo_func=promo_func,
club_id=club_id,
promotion_id=promotion_id,
max_qty=max_qty,
allow_multiple_discounts=multiple_discounts_allowed,
reward_type=reward_type,
)
def get_discounted_price(promo):
discounted_price = promo.find('DiscountedPrice')
discounted_price = promo.find("DiscountedPrice")
if discounted_price:
return float(discounted_price.text)
@@ -263,8 +316,14 @@ def get_discount_rate(discount_rate: Union[float, None], discount_in_percentage:
return float(discount_rate)
def find_promo_function(reward_type: RewardType, remark: str, promo_description: str, min_qty: float,
discount_rate: Union[float, None], discounted_price: Union[float, None]):
def find_promo_function(
reward_type: RewardType,
remark: str,
promo_description: str,
min_qty: float,
discount_rate: Union[float, None],
discounted_price: Union[float, None],
):
if reward_type == RewardType.SECOND_INSTANCE_DIFFERENT_DISCOUNT:
if not discounted_price:
return lambda item: item.price * (1 - (discount_rate / min_qty))
@@ -277,7 +336,9 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
return lambda item: item.price * (1 - (1 / min_qty))
if reward_type == RewardType.DISCOUNT_IN_PERCENTAGE:
return lambda item: item.price * (1 - discount_rate / (2 if "השני ב" in promo_description else 1))
return lambda item: item.price * (
1 - discount_rate / (2 if "השני ב" in promo_description else 1)
)
if reward_type == RewardType.SECOND_INSTANCE_SAME_DISCOUNT:
if "השני ב" in promo_description:
@@ -299,24 +360,73 @@ def find_promo_function(reward_type: RewardType, remark: str, promo_description:
return lambda item: INVALID_OR_UNKNOWN_PROMOTION_FUNCTION
def main_latest_promos(store_id: int, output_filename, chain: SupermarketChain, load_promos: bool,
load_xml: bool) -> None:
def main_latest_promos(
store_id: int,
output_filename,
chain: SupermarketChain,
load_promos: bool,
load_prices: bool,
) -> None:
"""
This function writes to a file the available promotions in a store with a given id sorted by their update date.
:param chain: The name of the requested supermarket chain
:param store_id: A given store id
:param load_xml: A boolean representing whether to load an existing prices xml file
:param load_prices: A boolean representing whether to load an existing prices xml file
:param load_promos: A boolean representing whether to load an existing promos xml file
:param output_filename: A path to write the promotions table
"""
promotions: List[Promotion] = get_available_promos(chain, store_id, load_xml, load_promos)
promotions.sort(key=lambda promo: (max(promo.update_date.date(), promo.start_date.date()), promo.start_date -
promo.end_date), reverse=True)
promotions: List[Promotion] = get_available_promos(
chain, store_id, load_prices, load_promos
)
promotions.sort(
key=lambda promo: (
max(promo.update_date.date(), promo.start_date.date()),
promo.start_date - promo.end_date,
),
reverse=True,
)
write_promotions_to_table(promotions, output_filename)
def log_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str, load_prices: bool, load_promos: bool):
def get_all_prices(
store_id: int,
output_filename,
chain: SupermarketChain,
load_promos: bool,
load_prices: bool,
):
log_message_and_time_if_debug("Importing prices XML file")
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
log_message_and_time_if_debug("Importing promotions XML file")
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
log_message_and_time_if_debug("Creating promotions objects")
promo_obj = None
for promo in tqdm(promo_tags, desc="creating_promotions"):
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
if promo_obj is None or promo_obj.promotion_id != promotion_id:
promo_obj = create_new_promo_instance(
chain, items_dict, promo, promotion_id
)
for item in promo.find_all("Item"):
item_code = item.find("ItemCode").text
cur_item = items_dict.get(item_code)
if cur_item is not None:
discounted_price = promo_obj.promo_func(cur_item)
if cur_item.price > discounted_price:
cur_item.final_price = discounted_price
return items_dict
def log_promos_by_name(
store_id: int,
chain: SupermarketChain,
promo_name: str,
load_prices: bool,
load_promos: bool,
):
"""
This function prints all promotions in a given chain and store_id containing a given promo_name.
@@ -326,7 +436,9 @@ def log_promos_by_name(store_id: int, chain: SupermarketChain, promo_name: str,
:param load_prices: A boolean representing whether to load an saved prices XML file or scrape a new one
:param load_promos: A boolean representing whether to load an saved XML file or scrape a new one
"""
promotions: List[Promotion] = get_available_promos(chain, store_id, load_prices, load_promos)
promotions: List[Promotion] = get_available_promos(
chain, store_id, load_prices, load_promos
)
for promo in promotions:
if promo_name in promo.content:
logging.info(promo.repr_ltr())
@@ -339,10 +451,16 @@ def get_all_null_items_in_promos(chain, store_id) -> List[str]:
"""
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml=True)
promo_tags = get_all_promos_tags(chain, store_id, load_xml=True)
return [item for promo_tag in promo_tags for item in chain.get_null_items(promo_tag, items_dict)]
return [
item
for promo_tag in promo_tags
for item in chain.get_null_items(promo_tag, items_dict)
]
def get_all_promos_tags(chain: SupermarketChain, store_id: int, load_xml: bool) -> List[Tag]:
def get_all_promos_tags(
chain: SupermarketChain, store_id: int, load_xml: bool
) -> List[Tag]:
"""
This function gets all the promotions tags for a given store in a given chain.
It includes both the full and not full promotions files.
@@ -353,8 +471,14 @@ def get_all_promos_tags(chain: SupermarketChain, store_id: int, load_xml: bool)
:return: A list of promotions tags
"""
bs_objects = list()
for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc='promotions_files'):
for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc="promotions_files"):
xml_path = xml_file_gen(chain, store_id, category.name)
bs_objects.append(create_bs_object(chain, store_id, category, load_xml, xml_path))
bs_objects.append(
create_bs_object(chain, store_id, category, load_xml, xml_path)
)
return [promo for bs_obj in bs_objects for promo in bs_obj.find_all(chain.promotion_tag_name)]
return [
promo
for bs_obj in bs_objects
for promo in bs_obj.find_all(chain.promotion_tag_name)
]