495 lines
17 KiB
Python
495 lines
17 KiB
Python
import logging
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Dict, List, Union
|
|
from bs4.element import Tag
|
|
import csv
|
|
import sys
|
|
import pandas as pd
|
|
import xlsxwriter
|
|
from tqdm import tqdm
|
|
from aenum import Enum
|
|
|
|
from item import Item
|
|
from utils import (
|
|
create_bs_object,
|
|
create_items_dict,
|
|
get_float_from_tag,
|
|
log_message_and_time_if_debug,
|
|
xml_file_gen,
|
|
)
|
|
from supermarket_chain import SupermarketChain
|
|
from utils import (create_bs_object, create_items_dict, get_float_from_tag,
|
|
xml_file_gen)
|
|
|
|
XML_FILES_PROMOTIONS_CATEGORIES = [
|
|
SupermarketChain.XMLFilesCategory.PromosFull,
|
|
SupermarketChain.XMLFilesCategory.Promos,
|
|
]
|
|
|
|
PROMOTION_COLS_NUM = (
|
|
15 # The length of the list returned by get_promotion_row_for_table function
|
|
)
|
|
|
|
INVALID_OR_UNKNOWN_PROMOTION_FUNCTION = -1
|
|
|
|
PROMOTIONS_TABLE_HEADERS = [
|
|
"תיאור מבצע",
|
|
"הפריט המשתתף במבצע",
|
|
"מחיר לפני מבצע",
|
|
"מחיר אחרי מבצע",
|
|
"אחוז הנחה",
|
|
"סוג מבצע",
|
|
"כמות מקס",
|
|
"כפל הנחות",
|
|
"המבצע החל",
|
|
"זמן תחילת מבצע",
|
|
"זמן סיום מבצע",
|
|
"זמן עדכון אחרון",
|
|
"יצרן",
|
|
"ברקוד פריט",
|
|
"סוג מבצע לפי תקנות שקיפות מחירים",
|
|
]
|
|
|
|
|
|
class ClubID(Enum):
|
|
_init_ = "value string"
|
|
|
|
REGULAR = 0, "מבצע רגיל"
|
|
CLUB = 1, "מועדון"
|
|
CREDIT_CARD = 2, "כרטיס אשראי"
|
|
OTHER = 3, "אחר"
|
|
|
|
@classmethod
|
|
def _missing_(cls, value):
|
|
return ClubID.OTHER
|
|
|
|
def __str__(self):
|
|
return self.string
|
|
|
|
|
|
class RewardType(Enum):
|
|
NO_PROMOTION = 0
|
|
DISCOUNT_IN_AMOUNT = 1
|
|
DISCOUNT_IN_PERCENTAGE = 2
|
|
DISCOUNT_BY_THRESHOLD = 3
|
|
DISCOUNT_IN_ITEM_IF_PURCHASING_OTHER_ITEMS = 6
|
|
SECOND_OR_THIRD_INSTANCE_FOR_FREE = 7
|
|
SECOND_INSTANCE_SAME_DISCOUNT = 8
|
|
SECOND_INSTANCE_DIFFERENT_DISCOUNT = 9
|
|
DISCOUNT_IN_MULTIPLE_INSTANCES = 10
|
|
OTHER = 11
|
|
|
|
|
|
class Promotion:
|
|
"""
|
|
A class of a promotion in Shufersal.
|
|
It contains only part of the available information in Shufersal's data.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
content: str,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
update_date: datetime,
|
|
items: List[Item],
|
|
promo_func: callable,
|
|
club_id: ClubID,
|
|
promotion_id: int,
|
|
max_qty: int,
|
|
allow_multiple_discounts: bool,
|
|
reward_type: RewardType,
|
|
):
|
|
self.content: str = content
|
|
self.start_date: datetime = start_date
|
|
self.end_date: datetime = end_date
|
|
self.update_date: datetime = update_date
|
|
self.promo_func: callable = promo_func
|
|
self.items: List[Item] = items
|
|
self.club_id: ClubID = club_id
|
|
self.max_qty: int = max_qty
|
|
self.allow_multiple_discounts: bool = allow_multiple_discounts
|
|
self.reward_type: RewardType = reward_type
|
|
self.promotion_id: int = promotion_id
|
|
|
|
def repr_ltr(self):
|
|
title = self.content
|
|
dates_range = f"Between {self.start_date} and {self.end_date}"
|
|
update_line = f"Updated at {self.update_date}"
|
|
return "\n".join([title, dates_range, update_line, str(self.items)]) + "\n"
|
|
|
|
def __eq__(self, other):
|
|
return self.promotion_id == other.promotion_id
|
|
|
|
|
|
def write_promotions_to_table(
|
|
promotions: List[Promotion], output_filename: str
|
|
) -> None:
|
|
"""
|
|
This function writes a List of promotions to a csv or xlsx output file.
|
|
|
|
:param promotions: A given list of promotions
|
|
:param output_filename: A given file to write to
|
|
"""
|
|
log_message_and_time_if_debug("Writing promotions to output file")
|
|
rows = [
|
|
get_promotion_row_for_table(promo, item)
|
|
for promo in promotions
|
|
for item in promo.items
|
|
]
|
|
if output_filename.endswith(".csv"):
|
|
encoding_file = "utf_8_sig" if sys.platform == "win32" else "utf_8"
|
|
with open(
|
|
output_filename, mode="w", newline="", encoding=encoding_file
|
|
) as f_out:
|
|
promos_writer = csv.writer(f_out)
|
|
promos_writer.writerow(PROMOTIONS_TABLE_HEADERS)
|
|
promos_writer.writerows(rows)
|
|
|
|
elif output_filename.endswith(".xlsx"):
|
|
df = pd.DataFrame(rows, columns=PROMOTIONS_TABLE_HEADERS)
|
|
workbook = xlsxwriter.Workbook(output_filename)
|
|
worksheet1 = workbook.add_worksheet()
|
|
worksheet1.right_to_left()
|
|
date_time_format = workbook.add_format({"num_format": "m/d/yy h:mm;@"})
|
|
number_format = workbook.add_format({"num_format": "0.00"})
|
|
percentage_format = workbook.add_format({"num_format": "0.00%"})
|
|
worksheet1.set_column("A:A", width=35)
|
|
worksheet1.set_column("B:B", width=25)
|
|
worksheet1.set_column("C:D", cell_format=number_format)
|
|
worksheet1.set_column("E:E", cell_format=percentage_format)
|
|
worksheet1.set_column("J:L", width=15, cell_format=date_time_format)
|
|
worksheet1.add_table(
|
|
first_row=0,
|
|
first_col=0,
|
|
last_row=len(df),
|
|
last_col=len(df.columns) - 1,
|
|
options={
|
|
"columns": [{"header": i} for i in PROMOTIONS_TABLE_HEADERS],
|
|
"data": df.values.tolist(),
|
|
"style": "Table Style Medium 11",
|
|
},
|
|
)
|
|
workbook.close()
|
|
|
|
else:
|
|
raise ValueError(
|
|
f"The given output file has an invalid extension:\n{output_filename}"
|
|
)
|
|
|
|
|
|
def get_promotion_row_for_table(promo: Promotion, item: Item) -> List:
|
|
"""
|
|
This function returns a row in the promotions XLSX table.
|
|
|
|
:param promo: A given Promotion object
|
|
:param item: A given item object participating in the promotion
|
|
"""
|
|
return [
|
|
promo.content,
|
|
item.name,
|
|
item.price,
|
|
promo.promo_func(item),
|
|
(item.price - promo.promo_func(item)) / max(item.price, 1),
|
|
promo.club_id.string,
|
|
promo.max_qty,
|
|
promo.allow_multiple_discounts,
|
|
promo.start_date <= datetime.now(),
|
|
promo.start_date,
|
|
promo.end_date,
|
|
promo.update_date,
|
|
item.manufacturer,
|
|
item.code,
|
|
promo.reward_type.value,
|
|
]
|
|
|
|
|
|
def get_available_promos(
|
|
chain: SupermarketChain, store_id: int, load_prices: bool, load_promos: bool
|
|
) -> List[Promotion]:
|
|
"""
|
|
This function return the available promotions given a BeautifulSoup object.
|
|
|
|
:param chain: The name of the requested supermarket chain
|
|
:param store_id: A given store ID
|
|
:param load_prices: A boolean representing whether to load an existing prices file or download it
|
|
:param load_promos: A boolean representing whether to load an existing promotion file or download it
|
|
:return: Promotions that are not included in PRODUCTS_TO_IGNORE and are currently available
|
|
"""
|
|
log_message_and_time_if_debug("Importing prices XML file")
|
|
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
|
|
log_message_and_time_if_debug("Importing promotions XML file")
|
|
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
|
|
|
|
log_message_and_time_if_debug("Creating promotions objects")
|
|
promo_objs = list()
|
|
for promo in tqdm(promo_tags, desc="creating_promotions"):
|
|
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
|
|
if promo_objs and promo_objs[-1].promotion_id == promotion_id:
|
|
promo_objs[-1].items.extend(chain.get_items(promo, items_dict))
|
|
continue
|
|
|
|
promo_inst = create_new_promo_instance(chain, items_dict, promo, promotion_id)
|
|
if len(promo_inst.items) > 1000: # Too many items -> probably illegal promotion
|
|
continue
|
|
if promo_inst:
|
|
promo_objs.append(promo_inst)
|
|
|
|
return promo_objs
|
|
|
|
|
|
def create_new_promo_instance(
|
|
chain: SupermarketChain, items_dict: Dict[str, Item], promo: Tag, promotion_id: int
|
|
) -> Union[Promotion, None]:
|
|
"""
|
|
This function generates a Promotion object from a promotion tag.
|
|
|
|
:param chain: The supermarket chain publishing the promotion
|
|
:param items_dict: A dictionary of items that might participate in the promotion
|
|
:param promo: An xml Tag representing the promotion
|
|
:param promotion_id: An integer representing the promotion ID
|
|
:return: If the promotion expired - return None, else return the Promotion object
|
|
"""
|
|
promo_end_time = datetime.strptime(
|
|
promo.find("PromotionEndDate").text + " " + promo.find("PromotionEndHour").text,
|
|
chain.date_hour_format,
|
|
)
|
|
if promo_end_time < datetime.now():
|
|
return None
|
|
|
|
reward_type = RewardType(int(promo.find("RewardType").text))
|
|
discounted_price = get_discounted_price(promo)
|
|
promo_description = promo.find("PromotionDescription").text
|
|
is_discount_in_percentage = (
|
|
reward_type == RewardType.DISCOUNT_IN_PERCENTAGE or not discounted_price
|
|
)
|
|
raw_discount_rate = (
|
|
promo.find("DiscountRate").text if promo.find("DiscountRate") else None
|
|
)
|
|
discount_rate = get_discount_rate(raw_discount_rate, is_discount_in_percentage)
|
|
min_qty = get_float_from_tag(promo, "MinQty")
|
|
max_qty = get_float_from_tag(promo, "MaxQty")
|
|
remark = promo.find("Remark")
|
|
promo_func = find_promo_function(
|
|
reward_type=reward_type,
|
|
remark=remark.text if remark else "",
|
|
promo_description=promo_description,
|
|
min_qty=min_qty,
|
|
discount_rate=discount_rate,
|
|
discounted_price=discounted_price,
|
|
)
|
|
promo_start_time = datetime.strptime(
|
|
promo.find("PromotionStartDate").text
|
|
+ " "
|
|
+ promo.find("PromotionStartHour").text,
|
|
chain.date_hour_format,
|
|
)
|
|
promo_update_time = datetime.strptime(
|
|
promo.find(chain.promotion_update_tag_name).text, chain.update_date_format
|
|
)
|
|
club_id = ClubID(int(promo.find(re.compile("ClubId", re.IGNORECASE)).text))
|
|
multiple_discounts_allowed = bool(int(promo.find("AllowMultipleDiscounts").text))
|
|
items = chain.get_items(promo, items_dict)
|
|
|
|
return Promotion(
|
|
content=promo_description,
|
|
start_date=promo_start_time,
|
|
end_date=promo_end_time,
|
|
update_date=promo_update_time,
|
|
items=items,
|
|
promo_func=promo_func,
|
|
club_id=club_id,
|
|
promotion_id=promotion_id,
|
|
max_qty=max_qty,
|
|
allow_multiple_discounts=multiple_discounts_allowed,
|
|
reward_type=reward_type,
|
|
)
|
|
|
|
|
|
def get_discounted_price(promo):
|
|
discounted_price = promo.find("DiscountedPrice")
|
|
if discounted_price:
|
|
return float(discounted_price.text)
|
|
|
|
|
|
def get_discount_rate(discount_rate: Union[float, None], discount_in_percentage: bool):
|
|
if discount_rate:
|
|
if discount_in_percentage:
|
|
return float(discount_rate) * (10 ** -(len(str(discount_rate))))
|
|
return float(discount_rate)
|
|
|
|
|
|
def find_promo_function(
|
|
reward_type: RewardType,
|
|
remark: str,
|
|
promo_description: str,
|
|
min_qty: float,
|
|
discount_rate: Union[float, None],
|
|
discounted_price: Union[float, None],
|
|
):
|
|
if reward_type == RewardType.SECOND_INSTANCE_DIFFERENT_DISCOUNT:
|
|
if not discounted_price:
|
|
return lambda item: item.price * (1 - (discount_rate / min_qty))
|
|
return lambda item: (item.price * (min_qty - 1) + discounted_price) / min_qty
|
|
|
|
if reward_type == RewardType.DISCOUNT_IN_ITEM_IF_PURCHASING_OTHER_ITEMS:
|
|
return lambda item: item.price
|
|
|
|
if reward_type == RewardType.SECOND_OR_THIRD_INSTANCE_FOR_FREE:
|
|
return lambda item: item.price * (1 - (1 / min_qty))
|
|
|
|
if reward_type == RewardType.DISCOUNT_IN_PERCENTAGE:
|
|
return lambda item: item.price * (
|
|
1 - discount_rate / (2 if "השני ב" in promo_description else 1)
|
|
)
|
|
|
|
if reward_type == RewardType.SECOND_INSTANCE_SAME_DISCOUNT:
|
|
if "השני ב" in promo_description:
|
|
return lambda item: (item.price + discounted_price) / 2
|
|
return lambda item: discounted_price / min_qty
|
|
|
|
if reward_type == RewardType.DISCOUNT_BY_THRESHOLD:
|
|
return lambda item: item.price - discount_rate
|
|
|
|
if reward_type == RewardType.OTHER:
|
|
return lambda item: item.price
|
|
|
|
if 'מחיר המבצע הינו המחיר לק"ג' in remark:
|
|
return lambda item: discounted_price
|
|
|
|
if discounted_price and min_qty:
|
|
return lambda item: discounted_price / min_qty
|
|
|
|
return lambda item: INVALID_OR_UNKNOWN_PROMOTION_FUNCTION
|
|
|
|
|
|
def main_latest_promos(
|
|
store_id: int,
|
|
output_filename,
|
|
chain: SupermarketChain,
|
|
load_promos: bool,
|
|
load_prices: bool,
|
|
) -> None:
|
|
"""
|
|
This function writes to a file the available promotions in a store with a given id sorted by their update date.
|
|
|
|
:param chain: The name of the requested supermarket chain
|
|
:param store_id: A given store id
|
|
:param load_prices: A boolean representing whether to load an existing prices xml file
|
|
:param load_promos: A boolean representing whether to load an existing promos xml file
|
|
:param output_filename: A path to write the promotions table
|
|
"""
|
|
promotions: List[Promotion] = get_available_promos(
|
|
chain, store_id, load_prices, load_promos
|
|
)
|
|
promotions.sort(
|
|
key=lambda promo: (
|
|
max(promo.update_date.date(), promo.start_date.date()),
|
|
promo.start_date - promo.end_date,
|
|
),
|
|
reverse=True,
|
|
)
|
|
write_promotions_to_table(promotions, output_filename)
|
|
|
|
|
|
def get_all_prices(
|
|
store_id: int,
|
|
output_filename,
|
|
chain: SupermarketChain,
|
|
load_promos: bool,
|
|
load_prices: bool,
|
|
):
|
|
log_message_and_time_if_debug("Importing prices XML file")
|
|
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_prices)
|
|
log_message_and_time_if_debug("Importing promotions XML file")
|
|
promo_tags = get_all_promos_tags(chain, store_id, load_promos)
|
|
|
|
log_message_and_time_if_debug("Creating promotions objects")
|
|
promo_obj = None
|
|
for promo in tqdm(promo_tags, desc="creating_promotions"):
|
|
promotion_id = int(promo.find(re.compile("PromotionId", re.IGNORECASE)).text)
|
|
|
|
if promo_obj is None or promo_obj.promotion_id != promotion_id:
|
|
promo_obj = create_new_promo_instance(
|
|
chain, items_dict, promo, promotion_id
|
|
)
|
|
if promo_obj.club_id == ClubID.REGULAR:
|
|
promo_items = promo.find_all("Item")
|
|
if len(promo_items) > 1000: # Too many items -> probably illegal promotion
|
|
continue
|
|
for item in promo_items:
|
|
item_code = item.find("ItemCode").text
|
|
cur_item = items_dict.get(item_code)
|
|
if cur_item is not None:
|
|
discounted_price = promo_obj.promo_func(cur_item)
|
|
if cur_item.price > discounted_price:
|
|
cur_item.final_price = discounted_price
|
|
|
|
return items_dict
|
|
|
|
|
|
def log_promos_by_name(
|
|
store_id: int,
|
|
chain: SupermarketChain,
|
|
promo_name: str,
|
|
load_prices: bool,
|
|
load_promos: bool,
|
|
):
|
|
"""
|
|
This function prints all promotions in a given chain and store_id containing a given promo_name.
|
|
|
|
:param store_id: A given store ID
|
|
:param chain: A given supermarket chain
|
|
:param promo_name: A given name of a promo (or part of it)
|
|
:param load_prices: A boolean representing whether to load an saved prices XML file or scrape a new one
|
|
:param load_promos: A boolean representing whether to load an saved XML file or scrape a new one
|
|
"""
|
|
promotions: List[Promotion] = get_available_promos(
|
|
chain, store_id, load_prices, load_promos
|
|
)
|
|
for promo in promotions:
|
|
if promo_name in promo.content:
|
|
logging.info(promo.repr_ltr())
|
|
|
|
|
|
def get_all_null_items_in_promos(chain, store_id) -> List[str]:
|
|
"""
|
|
This function finds all items appearing in the chain's promotions file but not in the chain's prices file.
|
|
Outdated.
|
|
"""
|
|
items_dict: Dict[str, Item] = create_items_dict(chain, store_id, load_xml=True)
|
|
promo_tags = get_all_promos_tags(chain, store_id, load_xml=True)
|
|
return [
|
|
item
|
|
for promo_tag in promo_tags
|
|
for item in chain.get_null_items(promo_tag, items_dict)
|
|
]
|
|
|
|
|
|
def get_all_promos_tags(
|
|
chain: SupermarketChain, store_id: int, load_xml: bool
|
|
) -> List[Tag]:
|
|
"""
|
|
This function gets all the promotions tags for a given store in a given chain.
|
|
It includes both the full and not full promotions files.
|
|
|
|
:param chain: A given supermarket chain
|
|
:param store_id: A given store ID
|
|
:param load_xml: A boolean representing whether to try loading the promotions from an existing XML file
|
|
:return: A list of promotions tags
|
|
"""
|
|
bs_objects = list()
|
|
for category in tqdm(XML_FILES_PROMOTIONS_CATEGORIES, desc="promotions_files"):
|
|
xml_path = xml_file_gen(chain, store_id, category.name)
|
|
bs_objects.append(
|
|
create_bs_object(chain, store_id, category, load_xml, xml_path)
|
|
)
|
|
|
|
return [
|
|
promo
|
|
for bs_obj in bs_objects
|
|
for promo in bs_obj.find_all(chain.promotion_tag_name)
|
|
]
|