diff --git a/api.py b/api.py index 062ce3f..fa6c3b8 100644 --- a/api.py +++ b/api.py @@ -3,7 +3,33 @@ import base64 import requests import ssl from requests.adapters import HTTPAdapter +import logging +import os +from logging.handlers import TimedRotatingFileHandler +log_directory = "logs" + +# 🔧 Configuration du handler avec rotation quotidienne +log_file = os.path.join(log_directory, "woocommerce.log") +handler = TimedRotatingFileHandler( + filename=log_file, + when="midnight", # ⏰ Rotation tous les jours à minuit + interval=1, # 📅 Chaque 1 jour + backupCount=7, # ♻️ Garde les 7 derniers fichiers de log + encoding='utf-8' # 🧾 Pour supporter tous les caractères +) + +# 📋 Format du log +formatter = logging.Formatter( + fmt="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) +handler.setFormatter(formatter) + +# 🔌 Récupère le logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) # 👁 Niveau minimum à capturer +logger.addHandler(handler) class WoocommerceApiClient(API): def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs): @@ -59,8 +85,8 @@ class WoocommerceApiClient(API): try: if self._is_wc_api(endpoint): kwargs.update(self._get_requests_general_kwargs(endpoint)) - print(kwargs) # ✅ Montre tout le dict - print(kwargs["headers"]) # ✅ Si tu veux un champ en particulier + #print(kwargs) # ✅ Montre tout le dict + #print(kwargs["headers"]) # ✅ Si tu veux un champ en particulier return requests.get(**kwargs) else: return super().get(endpoint, **kwargs) @@ -83,7 +109,7 @@ class WoocommerceApiClient(API): kwargs['files'] = files if data: kwargs['json'] = data - print("kwargs envoyés à requests.post:", kwargs) + #print("kwargs envoyés à requests.post:", kwargs) response = requests.post(**kwargs) elif self._is_wp_api(endpoint): kwargs.update(self._get_requests_general_kwargs(endpoint)) @@ -91,10 +117,14 @@ class WoocommerceApiClient(API): response = requests.post(**kwargs) else: response = super().post(endpoint, data, **kwargs) - if response and response.status_code not in (500, 400): - return response + if response.status_code not in (500, ) or retry >= self.max_retry: + return response + else: + retry += 1 + logger.debug(f"status_code={response.status_code} - retry #{retry}") except requests.exceptions.ConnectionError: if retry < self.max_retry: + logger.debug(f"got requests.exceptions.ConnectionError - retry #{retry}") print(".", end="") retry += 1 else: raise diff --git a/api_woocommerce.py b/api_woocommerce.py index 7b0f1ed..f185674 100644 --- a/api_woocommerce.py +++ b/api_woocommerce.py @@ -5,46 +5,16 @@ import ezodf import requests import pprint import base64 -import time -import json -import pyexcel_ods3 import unicodedata -import logging import os -import time -import argparse -from logging.handlers import TimedRotatingFileHandler -from watermark import create_watermark_image -import ssl -import urllib3 +from watermark import create_watermark_image from base64 import b64encode -# Créer un dossier 'logs' s'il n'existe pas -log_directory = "logs" -os.makedirs(log_directory, exist_ok=True) - -# 🔧 Configuration du handler avec rotation quotidienne -log_file = os.path.join(log_directory, "woocommerce.log") -handler = TimedRotatingFileHandler( - filename=log_file, - when="midnight", # ⏰ Rotation tous les jours à minuit - interval=1, # 📅 Chaque 1 jour - backupCount=7, # ♻️ Garde les 7 derniers fichiers de log - encoding='utf-8' # 🧾 Pour supporter tous les caractères -) - -# 📋 Format du log -formatter = logging.Formatter( - fmt="%(asctime)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" -) -handler.setFormatter(formatter) - -# 🔌 Récupère le logger +import logging logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) # 👁 Niveau minimum à capturer -logger.addHandler(handler) + + # 🧪 Test """logger.debug("Démarrage du programme (DEBUG)") @@ -79,7 +49,8 @@ WEBSITE_URL = "https://les-creations-de-missbleue.local" #WEBSITE_URL = "https://les-creations-de-missbleue.com" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods" #BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" -BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" +#BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" +BASE_PATH = "photos/photos_site/Photos_site/" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_version_finale.ods" @@ -189,11 +160,11 @@ class OdsReader: #print(f"df après filtrage :\n{df}") except ValueError as ve: - print(f"Erreur : {ve}") + #print(f"Erreur : {ve}") logger.exception(f"🚫 Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'") except Exception as e: - print(f"Erreur lors de la recherche : {e}") + #print(f"Erreur lors de la recherche : {e}") logger.exception(f"🚫 Erreur lors de la recherche de '{search_value}' dans la colonne '{column_name}'. Exception : {e}") else: print("Aucun search_value fourni") @@ -209,6 +180,7 @@ class OdsReader: for row in sheet.rows(): data.append([cell.value for cell in row]) + logger.debug(f'extract_ods_row: {len(data)} rows found in ods sheet #{number_sheet}') df = pd.DataFrame(data) df.columns = df.iloc[0] @@ -221,6 +193,14 @@ class OdsReader: elif end_row is not None: df = df.iloc[:end_row] + """# Voir tous les noms de colonnes + print(df.columns.tolist()) + # Compter les doublons de noms + from collections import Counter + print([item for item, count in Counter(df.columns).items() if count > 1]) + # Nettoyer les noms de colonnes (supprimer espaces, etc.) + df.columns = df.columns.str.strip()""" + df = df.dropna(how='all') return df.to_dict(orient="records") @@ -235,7 +215,7 @@ class MediaManager(OdsReader): self.dict_equivalence = {'ambre':'ambré', 'meringue':'meringué', 'givree':'givrée', 'sale':'salé', 'bresilien':'brésilien', 'epices':'épices', 'noel':'noël', 'a':'à', 'petales':'pétales', 'lumiere':'lumière', 'allumee':'allumée', 'eteinte':'éteinte', 'celebration':'célébration', 'argente':'argenté', 'dore':'doré', 'accroche':'accroché', 'pose':'posé', 'colore':'coloré', 'kevin': 'Kévin', 'interieur':'intérieur', 'cafe':'café', 'bresil':'Brésil', 'dagrumes': "d'agrumes", "iles":"îles", 'apero': 'apéro', 'quebecois':'québecois', 'defendu':'défendu', - 'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé' + 'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé', 'peche' : 'pêche' } @@ -246,11 +226,12 @@ class MediaManager(OdsReader): json_data = self.get_all_media_lines() for media in json_data: - path = Path(BASE_PATH + media['Chemin']) + media_chemin = media['Chemin'].replace("\\", "/") + path = Path(BASE_PATH + media_chemin) image_name = path.name try: if not self.is_exists(media, image_name): - image_path = BASE_PATH + media['Chemin'] + image_path = BASE_PATH + media_chemin # 👇 Tentative d'ouverture et d'envoi with open(image_path, "rb") as image_file: @@ -275,65 +256,55 @@ class MediaManager(OdsReader): def create_and_update_media(self, media, image_name, path, watermark=False): + #print(f"image_path = {path}") try: if not self.is_exists(media, image_name): if watermark: image_path = path else: - image_path = BASE_PATH + media['Chemin'] - #print(f"image_path = {image_path}") + media_chemin = media['Chemin'].replace("\\", "/") + image_path = BASE_PATH + media_chemin # 👇 Tentative d'ouverture et d'envoi with open(image_path, "rb") as image_file: response = self.wcapi.post("media",files={"file": image_file}) - print("kkkkkkkkkkkkkkkkkkkkkkkkkk") - """ response = self.wcapi.post( - "media", - headers={ - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Disposition": f"attachment; filename={image_name}", - "User-Agent": "Mozilla/5.0" - }, - files={"file": image_file}, - verify=False - )""" - print(f"response = {response.status_code}") + + #print(f"response = {response.status_code}") if response.status_code == 201: - print('____') - pprint.pprint(media) media_data = response.json() self.update_data_media(media, media_data['id']) logger.info(f"✅ Image uploadée : {image_name}") else: logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}") else: - logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}") + if self.is_txt_alt_exists(media, image_name): + media_id = self.is_txt_alt_exists(media, image_name) + self.update_data_media(media, media_id) + logger.info(f"✅ Image déjà existante et mise à jour : {image_name}") + else: + logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}") except FileNotFoundError: logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})") - except requests.RequestException as e: logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}") - except Exception as e: logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}") def upload_media_from_to(self, range_start, range_end=None): - json_data = self.fetch_all_media_rows(range_start, range_end) + json_data = self.fetch_all_media_rows(range_start, range_end) + logger.debug(f'{len(json_data)} to process') for media in json_data: - pprint.pprint(media) - path = Path(BASE_PATH + media['Chemin']) + media_chemin = media['Chemin'].replace("\\", "/") + path = Path(BASE_PATH + media_chemin) image_name = path.name - first_folder = media['Chemin'].split("\\")[0] - print(f"first_folder = {first_folder}") + #first_folder = media['Chemin'].split("\\")[0] + first_folder = media_chemin.split("/")[0] watermarked_path = Path(create_watermark_image(str(path))) watermarked_name = watermarked_path.name - print('logo') if first_folder == 'Logo': - print("first_file") self.create_and_update_media(media,image_name,path) else: - print("pas logo") self.create_and_update_media(media, watermarked_name, watermarked_path, True) try: os.remove(watermarked_path) @@ -352,30 +323,35 @@ class MediaManager(OdsReader): pass return False + def is_txt_alt_exists(self, media, image_name): + all_images = self.get_all_images() + name_without_extension, extension = os.path.splitext(image_name) + for image in all_images: + if media['Slug'] == image['slug']: + if image['alt_text'] == "": + return image['id'] + else: + pass + return False + + def get_title_and_alt_text_media(self, media): sentence = media['Slug'].replace('-', ' ') sentence = sentence.replace('img', '').strip() - print(f"type = {type(sentence)}") title = sentence.capitalize() alt_text = title return title, alt_text def update_accent_in_sentence(self, sentence): - print(type(sentence)) - print(sentence) words = sentence.split() new_words = [self.dict_equivalence[word.lower()] if word.lower() in self.dict_equivalence else word for word in words] - pprint.pprint(new_words) new_sentence = " ".join(new_words) - print(f"new_sentence = {new_sentence}") return new_sentence def update_data_media(self, media, id_img): - print(f"nom = {media['Nom']}") - print(type(media['Nom'])) if media['Nom'] is None or media['Description'] is None: title, alt_text = self.get_title_and_alt_text_media(media) else: @@ -384,15 +360,14 @@ class MediaManager(OdsReader): title = self.update_accent_in_sentence(title) alt_text = self.update_accent_in_sentence(alt_text) - print(f"title = {title}") - print(f"alt_txt = {alt_text}") update_data = { "title" : title, "alt_text": alt_text, "slug": media['Slug'], } - path = Path(BASE_PATH + media['Chemin']) + media_chemin = media['Chemin'].replace("\\", "/") + path = Path(BASE_PATH + media_chemin) image_name = path.name response = self.wcapi.post(f"media/{id_img}", data=update_data) @@ -413,6 +388,13 @@ class MediaManager(OdsReader): else: return None + def find_id_by_name(self, name): + images = self.get_all_images() + for img in images: + #pprint.pprint(img) + if img['title']['rendered'] == name: + return img['id'] + def find_id_by_slug(self, slug): images = self.get_all_images() for img in images: @@ -436,7 +418,7 @@ class MediaManager(OdsReader): """Récupère toutes les images en gérant la pagination""" all_images = [] page = 1 - print(f"self.ath.auth_base64 = {self.ath.auth_base64}") + #print(f"self.ath.auth_base64 = {self.ath.auth_base64}") while True: """response = self.wcapi.get(f"wp-json/wp/v2/media?per_page=100&page={page}", headers={"Authorization": f"Basic {self.ath.auth_base64}", @@ -444,6 +426,7 @@ class MediaManager(OdsReader): verify=False )""" response = self.wcapi.get("media", params={"per_page": 100, "page": page}) + #print(f"response.status_code = {response.status_code}") if response.status_code != 200: break @@ -476,7 +459,6 @@ class MediaManager(OdsReader): images = self.get_all_images() for img in images: img_id = img['id'] - print(f"img_id = {img['id']}") response = self.wcapi.delete(f"media/{img_id}") if response.status_code in [200, 410]: # 410 = déjà supprimé @@ -521,6 +503,14 @@ class CategoryManager(OdsReader): for cat in categories: if cat['slug'] == slug: return cat['id'] + + def find_id_by_name(self, name): + response = self.wcapi.get("products/categories/",params={"per_page": 100}) + if response.status_code == 200: + categories = response.json() + for cat in categories: + if cat['name'] == name: + return cat['id'] def create_category(self, name, description, slug): category_data = { @@ -623,6 +613,24 @@ class ProductManager(OdsReader): } self.wcapi.put(f"products/{product_id}", product_data) + + def update_data_list_category_product(self, product_id, list_category_id): + #print(f"list_category_id = {list_category_id}") + categories = [{'id': cat_id} for cat_id in list_category_id if cat_id is not None] + product_data = { + 'categories': categories, + } + self.wcapi.put(f"products/{product_id}", product_data) + + def update_data_list_medias_product(self, product_id, list_medias_id): + images = [{'id': media_id} for media_id in list_medias_id if media_id is not None] + product_data = { + 'images':images, + } + logger.debug(f"{product_data} - {product_id}") + self.wcapi.put(f"products/{product_id}", product_data) + + def get_list_media_id_for_product(self, medias): list_media_id_for_product = [] for id, media_slug in self.medias.items(): @@ -635,32 +643,96 @@ class ProductManager(OdsReader): def get_list_category_for_product(self, categories): response = self.wcapi.get("products/categories",params={"per_page": 100}) list_category_for_product = [] - for category in response.json(): - for cat in categories: - if category['name'] == cat: - id_category = {'id':category['id']} - list_category_for_product.append(id_category) - return list_category_for_product + #logger.debug(f"response.json() = {response.json()}") + try : + for category in response.json(): + for cat in categories: + if category['name'] == cat: + id_category = {'id':category['id']} + list_category_for_product.append(id_category) + #logger.debug(f"list_category_for_product = {list_category_for_product}") + return list_category_for_product + except requests.exceptions.JSONDecodeError as e: + logger.debug(f"text = {response.text}") + raise e def find_product_by_id(self, id): response = self.wcapi.get(f"products/{id}") if response.status_code == 200: product = response.json() return product - + def find_id_by_slug(self, slug): - response = self.wcapi.get("products/",params={"per_page": 100}) + response = self.wcapi.get("products",params={"slug": slug}) if response.status_code == 200: products = response.json() for pro in products: if pro['slug'] == slug: return pro['id'] + return None + + """def find_id_by_slug(self, slug): + response = self.wcapi.get("products/",params={"per_page": 100}) + if response.status_code == 200: + products = response.json() + for pro in products: + if pro['slug'] == slug: + return pro['id']""" def find_media_id_by_slug(self, media_slug): for id, slug in self.medias.items(): if media_slug == slug: return id + + def is_exist_attribute_in_one_product(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response: + products_info = response.json() + if products_info['attributes']: + return True + else: + return False + else: + return False + + + def is_exist_tabs_in_one_product(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response: + #logger.info(pprint.pformat(response.json())) + is_exist_tab = False + products_info = response.json() + for data in products_info['meta_data']: + if data.get('key') == "wb_custom_tabs" and not data.get('value'): + is_exist_tab = True + break + + if is_exist_tab: + return True + else: + return False + else: + return False + + """def get_product_data_by_api(self, product_id, product_datas): + response = self.wcapi.get(f"products/{product_id}") + products_info = response.json() + new_datas = {} + for key, data in product_datas.items: + if products_info[key] == data: + pass + else: + products_info[key] = data + new_datas[key] = data + + self.wcapi.put(f"products/{product_id}", new_datas)""" + + def get_product_data_by_api(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + products_info = response.json() + return products_info + def create_tabs_from_custom_dict(self, product_id, product): product_tabs_data = {} list_product_tabs_data = [] @@ -687,27 +759,36 @@ class ProductManager(OdsReader): } res = self.wcapi.post(f"products/{product_id}", meta_data_data) else: - print(f"error") + print(f"error - {product_id} - {response.status_code}") - def create_product(self, product_data): + def create_product(self, product_data): + logger.debug(f"ProductManager::create_product called (product_data = {product_data})") + if product_data is None: + logger.info(f"ProductManager::create_product called no product_data supplied - ignored") + return None try: - response = self.wcapi.post("products/", product_data) - if response.status_code == 201: - # Le produit a été créé avec succès - logger.info(f"Produit créé avec succès. ID: {response.json()['id']}") + product_id = self.find_id_by_slug(product_data['slug']) + if product_id: + return product_id else: - if self.find_id_by_slug(product_data['slug']): - pass + response = self.wcapi.post("products/", product_data) + if response.status_code == 201: + logger.debug(f"ProductManager::create_product product created {response.json()['id']} - end") + return response.json()['id'] else: + logger.error(f"Erreur lors de la création du produit. {product_data['name']} Code: {response.status_code}, Message: {response.text}") # Le produit n'a pas été créé, mais il y a une réponse avec un code d'erreur - logger.error(f"Erreur lors de la création du produit. Code: {response.status_code}, Message: {response.text}") + except Exception as e: logger.error(f"Erreur inattendue lors de l'envoi du produit à WooCommerce: {e}") - + logger.debug(f"ProductManager::create_product - end") + + def update_data_product(self, product_data, categories, medias, json_data): + #json_data = self.get_all_product_lines() - for product in json_data: - self.create_product(product_data) + for field in json_data: + #logger.info(f"update_data_product = {product}") product_id = self.find_id_by_slug(product_data['slug']) list_category_id = self.get_list_category_for_product(categories) list_img_id = self.get_list_media_id_for_product(medias) @@ -744,6 +825,30 @@ class ProductManager(OdsReader): return all_products + def get_all_categories_as_list(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response: + product = response.json() + return product['categories'] + else: + return [] + + def get_all_medias_as_list(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response: + product = response.json() + return product['images'] + else: + return [] + + def get_all_attributes_as_list(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response: + product = response.json() + return product["attributes"] + else: + [] + def delete_product(self): json_data = self.get_all_product_lines() @@ -757,7 +862,6 @@ class ProductManager(OdsReader): products = self.get_all_products() if products: for pro in products: - print(f"pro['id'] = {pro['id']}") self.wcapi.delete(f"products/{pro['id']}", params={"force": True}) def delete_media_product(self, media_slug): @@ -871,31 +975,32 @@ class AttributeManager(OdsReader): def create_for_product(self, product_id, name, value, variation=False): - print(f"variation = {variation}") - print(f"value = {value}") data_attribute = { 'name': name, 'options':value } + #logger.info(f"name = {name} - value = {value}") #list_product_tabs_data.append(data_tab) response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: + print("passe d'abord ici 1") product_meta_data = response.json() existing_attributes_data = product_meta_data.get("attributes", []) already_exist = False - for data in existing_attributes_data: + """for data in existing_attributes_data: for key_data, value_data in data.items(): if key_data == "value": if isinstance(value_data, list): for value in value_data: if value['name'] == name: - already_exist = True + already_exist = True""" if already_exist == False: found = False - print(f"attributes_data = {existing_attributes_data}") - print(f"data_attribute = {data_attribute}") + #print(f"attributes_data = {existing_attributes_data}") + #print(f"data_attribute = {data_attribute}") for attribute in existing_attributes_data: + #logger.info(f"name = {name} - value = {value}") if attribute["name"] == name: attribute["options"].append(data_attribute) found = True @@ -903,11 +1008,10 @@ class AttributeManager(OdsReader): # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée if not found: - print(f"value = {value}") if value is not None: value = [v.strip() for v in value.split(",")] - print(f"value = {value}") + logger.info(f"value = {value}") existing_attributes_data.append({ "name": name, "options": value, @@ -918,11 +1022,12 @@ class AttributeManager(OdsReader): attributes_data = { 'attributes': existing_attributes_data } + logger.debug(f"existing_attributes_data = {existing_attributes_data}") res = self.wcapi.put(f"products/{product_id}", attributes_data) else: print('already_exist') else: - print(f"error") + print(f"error 1") def delete_all_for_product(self): response_product = self.wcapi.get(f"products/", params={"per_page": 100}) @@ -987,7 +1092,9 @@ class TabManager(OdsReader): 'tab_type': 'local' } response = self.wcapi.get(f"products/{product_id}") + #logger.debug(response) if response.status_code == 200: + print("passe d'abord ici 2") product_meta_data = response.json() existing_meta_data = product_meta_data.get("meta_data", []) already_exist = False @@ -1026,7 +1133,7 @@ class TabManager(OdsReader): } res = self.wcapi.put(f"products/{product_id}", meta_data_data) else: - print(f"error") + print(f"error 2") def delete_by_product_id(self, product_id): response = self.wcapi.get(f"products/{product_id}") @@ -1152,8 +1259,6 @@ class VariationsManager(OdsReader): pprint.pprint(price_per_product_variable)""" response = self.wcapi.get(f"products/{product_id}") - print(f"product_id = {product_id}") - print(f"response = {response.status_code}") try: if response.status_code == 200: #existing_product = response.json() @@ -1173,8 +1278,6 @@ class VariationsManager(OdsReader): } print(f"Posting variation: {data}") response = self.wcapi.post(f"products/{product_id}/variations", data) - print(response.status_code) - print(response.json()) logger.info(f"Variation de parfums a bien été créé") if volumes is not None: for volume in volumes: @@ -1258,6 +1361,21 @@ class WooCommerceManager(OdsReader): self.variation_manager = variation_manager self.filename_ods = filename_ods + # access to managers + @property + def mm(self): return self.media_manager + @property + def cm(self): return self.category_manager + @property + def pm(self): return self.product_manager + @property + def tm(self): return self.tab_manager + @property + def am(self): return self.attribute_manager + @property + def vm(self): return self.variation_manager + + def tab_exists(self, product_id, name_tab): return self.product_manager.tab_exists(product_id, name_tab) @@ -1309,35 +1427,16 @@ class WooCommerceManager(OdsReader): variation=self.is_variable(product_ods['Type'])) def update_product(self): - #self.product_manager.update_data_product() self.update_product_tab() - #self.update_product_attribute() - - """def update_product_by_slug(self): - self.product_manager.update_data_product() - self.update_product_tab() - self.update_product_attribute()""" - - """def update_product_variation(self, product_id, product_data): - pass""" - + def update_product_by_slug(self, slug): self.product_manager.update_data_product_by_slug(slug) self.update_product_tab_by_slug(slug) #self.update_product_attribute_by_slug(slug) def create_all_informations(self): - #medias = self.media_manager.get_all_as_slug_dict() - #self.product_manager.medias = medias - #self.update_product_by_slug("chope-citron-meringue") - #self.media_manager.upload_media() - #self.media_manager.assign_image_logo() medias = self.media_manager.get_all_as_slug_dict() self.product_manager.medias = medias - #self.category_manager.medias = medias - #self.category_manager.update_data_categories() - #self.attribute_manager.create() - #self.attribute_manager.configure_term() self.process_file(FILENAME_ODS) self.update_product() @@ -1356,26 +1455,11 @@ class WooCommerceManager(OdsReader): list_media_by_doc.append(product['Image2']) list_media_by_doc.append(product['Image3']) list_media_by_doc.append(product['Pyramide']) + list_media_by_doc.append(product['Clp']) return list_media_by_doc - """def is_variable(self, type): - #print(f"type.lower = { type.lower}") - #print(f"type = {type}") - return type.lower() == "parfums" - #if type.lower() == 'Choix parfums' or type.lower() == 'Volume' or type.lower() == 'Prix pour': - # return True""" - - """def is_variable(self, type): - return type.lower() == "parfums""" - - """def is_variable(self, product_data): - if product_data['Choix parfums'] is not None or product_data['Volume'] is not None or product_data['Prix pour'] is not None: - return True""" def is_variable(self, name, value): - """print(f"typeee = {type}") - if type == 'variable': - return True""" if name == "Volume" or name =="Choix parfums": if value is not None: return True @@ -1388,7 +1472,6 @@ class WooCommerceManager(OdsReader): product_id = self.product_manager.find_id_by_slug(product_data['slug']) for name, value in attributes.items(): - print(f"self.is_variable(product_data['type'])) = {self.is_variable(name, value)}") self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(name, value)) #self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data)) @@ -1399,32 +1482,38 @@ class WooCommerceManager(OdsReader): def update_product_tab(self, product_data): for product in product_data: self.update_product_tab_by_id(product['id']) + + + def create_product(self, product_data, attributes, tabs, categories, medias, json_data): + self.product_manager.create_product(product_data=product_data) + self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) + self.update_product_attribute(attributes=attributes, product_data=product_data) + product_id = self.product_manager.find_id_by_slug(product_data['slug']) + #self.update_product_variations(product_data) + self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) def create_or_update_product(self, product_data, attributes, tabs, categories, medias, json_data): try: - self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) + product_id = self.product_manager.find_id_by_slug(product_data['slug']) + if product_id: + #self.product_manager.update_data_product(attributes=attributes, product_data=product_data) + self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) + else: + self.product_manager.create_product(product_data=product_data) + self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) + self.update_product_attribute(attributes=attributes, product_data=product_data) product_id = self.product_manager.find_id_by_slug(product_data['slug']) - self.update_product_variations(product_data) + #self.update_product_variations(product_data) self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) except Exception as e: print(f"Erreur lors de la mise à jour du produit: {e}") logger.exception(f"Erreur lors de la mise à jour du produit: {e}") - - """def create_or_update_product(self, product_data, attributes, tabs, categories, medias): - try: - self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias) - self.update_product_attribute(attributes=attributes, product_data=product_data) - product_id = self.product_manager.find_id_by_slug(product_data['slug']) - self.update_product_variations(product_data) - self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) - except Exception as e: - print(f"Erreur lors de la mise à jour du produit: {e}") - logger.exception(f"Erreur lors de la mise à jour du produit: {e}")""" + logger.debug() + def get_product_lines(self, search_value=None): if search_value: - print('là') return self.get_product_line_by_value(search_value) else: return self.get_all_product_lines() @@ -1435,7 +1524,6 @@ class WooCommerceManager(OdsReader): self.product_manager.medias = medias # read provided file products_lines = self.get_product_lines(search_value) - print('yoooo') #pprint.pprint(products_lines) for product_line in products_lines: # standard product data @@ -1454,13 +1542,13 @@ class WooCommerceManager(OdsReader): if product_line['Promo'] is not None: product_data['sale_price'] = product_line['Promo'] - if product_line['Type'] == "parfums": + """if product_line['Type'] == "parfums": product_data['type'] = "variable" if product_line['Volume'] is not None: values_attributes = self.get_list_variable_attributes(product_line['Volume']) attributes['Volume'] = values_attributes else: - product_data['type'] = "simple" + product_data['type'] = "simple""" attributes = { "Temps de combustion" : product_line['Temps de combustion'], @@ -1487,84 +1575,273 @@ class WooCommerceManager(OdsReader): # create or update product self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias) + + def get_product_data_as_dict(self, product_line): + product_data = { + 'name' : product_line['Nom'], + 'price': product_line['Prix'], + 'regular_price': product_line['Prix'], + 'stock_quantity': product_line['Stock'], + 'manage_stock':True, + 'weight':str(product_line['Poids']), + 'sku':str(product_line['Numéro de référence']), + 'description': product_line['Description'], + 'short_description': product_line['Courte Description'], + 'slug':product_line['Slug'] + } + + if product_line['Promo'] is not None: + product_data['sale_price'] = product_line['Promo'] + + if product_line['Type'] is not None: + product_data['type'] = "variable" + + if product_line['Fabrication sur commande'] is not None: + #backoreded = [b.strip() for b in product_line['Fabrication sur commande'].split(",")] + product_data['backorders'] = "notify" + product_data['backorders_allowed'] = True + product_data['backordered'] = True + + if product_line['Status du produit']: + if product_line['Status du produit'] == 'P': + product_data['status'] = 'publish' + logger.info(f"Article publié {product_line['Nom']}") + elif product_line['Status du produit'] == 'B': + product_data['status'] = 'draft' + logger.info(f"Article brouillon {product_line['Nom']}") + else: + logger.debug(f"Article ignoré {product_line['Nom']}") + return None + + return product_data + + + def get_product_attributes_as_dict(self, product_line): + attributes = { + "Temps de combustion" : product_line['Temps de combustion'], + "Type de cire" : product_line['Type de cire'], + "Mèche" : product_line['Mèche'], + "Fabrication" : product_line['Fabrication'], + "Composition" : product_line['Composition'], + "Ingrédients et engagements" : product_line['Ingrédients et engagements'], + #"Parfums" : product_line['Choix parfums'] + #"Volume" : product_line["Volume"] + } + if product_line["Volume"]: + attributes["volume"] = product_line["Volume"] + if product_line['Type de cire']: + attributes['Type de cire'] = product_line['Type de cire'] + if product_line['Mèche']: + attributes['Mèche'] = product_line['Mèche'] + if product_line['Parfums']: + attributes["Parfums"] = product_line['Parfums'] + + return attributes + + def get_product_tabs_as_dict(self, product_line): + tabs ={ + #"Description" : product_line["Description"], + "Conseils d'utilisation" : product_line["Conseils d’utilisation"], + "Précautions articles" : product_line["Précautions articles"], + #"Allergènes" : product_line["Allergènes"] + } + return tabs + +# --------------------------------- Medias ----------------------------------------------- + + def has_product_all_medias(self, product_id, medias): + print(f"medias={medias}") + list_medias_by_api = self.product_manager.get_all_medias_as_list(product_id) + names_in_list_medias_by_api = [media['name'] for media in list_medias_by_api] + + if set(medias) != set(names_in_list_medias_by_api): + return False + else: + return True + + def assign_medias_to_product(self, id, media_slugs): + list_medias_id = [] + for slug in media_slugs: + list_medias_id.append(self.media_manager.find_id_by_slug(slug)) + logger.info(f"liste des medias id = {list_medias_id}") + self.product_manager.update_data_list_medias_product(id, list_medias_id) + + + +# --------------------------------- Category ---------------------------------------------- + + def has_product_all_categories(self, product_id, categories): + + list_categories_by_api = self.product_manager.get_all_categories_as_list(product_id) + names_in_list_categories_by_api = [category['name'] for category in list_categories_by_api] + if set(categories) != set(names_in_list_categories_by_api): + return False + else: + return True + + + def assign_categories_to_product(self, id, categorie_slugs): + list_categories_id = [] + for slug in categorie_slugs: + list_categories_id.append(self.category_manager.find_id_by_slug(slug)) + self.product_manager.update_data_list_category_product(id, list_categories_id) + + +# --------------------------------- Attributes ---------------------------------------------- + + + def is_exist_attribute_in_one_product(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response: + products_info = response.json() + if products_info['attributes']: + return True + else: + return False + else: + return False + + def has_product_all_attributes(self, product_id, attributes): + #pprint.pprint(self.product_manager.get_all_attributes_as_list(product_id)) + pprint.pprint(attributes) + list_attributes_by_api = self.product_manager.get_all_attributes_as_list(product_id) + pprint.pprint(list_attributes_by_api) + + """if list_attributes_by_api == []: + return True + # Comparaison directe + not_found_same_value = False + for i, d in enumerate(list_attributes_by_api): + for key, value in attributes.items(): + for k, v in d.items(): + if k == 'options': + if value in d[k]: + not_found_same_value = False + print(f"v = {v}, value = {value} d[k] = {d[k]}") + print("trouvé") + else: + print(f"v = {v}, value = {value} d[k] = {d[k]}") + print("pas trouvé") + #return True""" + + + + """common_keys = list_attributes_by_api.keys() & attributes.keys() + + found_same_value = False + for key in common_keys: + pprint.pprint(common_keys) + if list_attributes_by_api[key] != attributes[key]: + found_same_value = True + break + else: + pass + + if found_same_value == True: + return True + else: + return False""" + + + def assign_attributes_to_product(self, product_id, attributes): + + for attr in attributes: + for key, value in attributes.items(): + #logger.debug(f"key = {key} - value = {value} - attr = {attr}") + if attr == key : + self.attribute_manager.create_for_product(product_id=product_id, name=key, value=value, variation=self.is_variable(key, value)) + +# --------------------------------- Product ---------------------------------------------- + + def has_product_all_datas(self, product_id, product_data): + + list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id) + common_keys = list_product_data_by_api.keys() & product_data.keys() + + # Étape 2 : comparaison des valeurs + found_same_value = False + for key in common_keys: + if list_product_data_by_api[key] != product_data[key]: + found_same_value = True + break + else: + pass + + if found_same_value == True: + return True + else: + return False + + def assign_product_changement(self, product_id, product_data): + list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id) + common_keys = list_product_data_by_api.keys() & product_data.keys() + + # Étape 2 : comparaison des valeurs + differences = {} + for key in common_keys: + if list_product_data_by_api[key] != product_data[key]: + differences[key] = product_data[key] + self.wcapi.put(f"products/{product_id}", differences) + +# -------------------------------------------- Tabs ------------------------------------------------------ + + #def missing_tabs_for_product(self, product_id, tabs): + +# --------------------------------------------------------------------------------------------------------- + + def assign_tabs_to_product(self, product_id, tabs): + self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) + + + def process_file_from_to(self, range_start, range_end=None): - # refresh media cache + medias = self.media_manager.get_all_as_slug_dict() self.product_manager.medias = medias - # read provided file - #reader = OdsReader(filename) - #json_data = self.fetch_all_product_rows(range_start, range_end) - + #logger.info(f"self.fetch_all_product_rows(range_start, range_end): = {self.fetch_all_product_rows(range_start, range_end)}") for product_line in self.fetch_all_product_rows(range_start, range_end): - if self.product_manager.find_id_by_slug(product_line['Slug']): - logger.debug(f"Produit contenant comme slug '{product_line['Slug']}' existe déjà") - else: - print(f"process_file_from_to {product_line['Nom']}") - # standard product data - product_data = { - 'name' : product_line['Nom'], - 'price': product_line['Prix'], - 'regular_price': product_line['Prix'], - 'stock_quantity': product_line['Stock'], - 'manage_stock':True, - 'weight':str(product_line['Poids']), - 'sku':str(product_line['Numéro de référence']), - 'description': product_line['Description'], - 'short_description': product_line['Courte Description'], - 'slug':product_line['Slug'] - } - attributes = { - "Temps de combustion" : product_line['Temps de combustion'], - "Type de cire" : product_line['Type de cire'], - "Mèche" : product_line['Mèche'], - "Fabrication" : product_line['Fabrication'], - "Composition" : product_line['Composition'], - "Ingrédients et engagements" : product_line['Ingrédients et engagements'], - #"Parfums" : product_line['Choix parfums'] - #"Volume" : product_line["Volume"] - } + # define expected settings + expected_product_data = self.get_product_data_as_dict(product_line) + expected_categories = self.get_list_category_for_product(product_line['Catégories']) + expected_medias = self.get_list_media_id_for_product(product_line) + expected_attributes = self.get_product_attributes_as_dict(product_line) + expected_tabs = self.get_product_tabs_as_dict(product_line) - if product_line['Promo'] is not None: - product_data['sale_price'] = product_line['Promo'] + # step1: make sure product exists + product_id = self.product_manager.find_id_by_slug(product_line['Slug']) + if product_id is None: + #product_id = self.create_product(product_data=expected_product_data, attributes=expected_attributes, tabs=expected_tabs, categories=expected_categories, medias=expected_medias, json_data=product_line) + product_id = self.product_manager.create_product(product_data=expected_product_data) + logger.info('produit créé') + + # step2: make sure categories are assigned to product + if not self.has_product_all_categories(product_id=product_id, categories=expected_categories): + self.assign_categories_to_product(id=product_id, categorie_slugs=expected_categories) + logger.info('catégorie créée') - if product_line['Type'] == "Variable": - product_data['type'] = "variable" - if product_line['Choix parfums'] is not None: - values_attributes = self.get_list_variable_attributes(product_line['Choix parfums']) - print(f"values_attributes = {values_attributes}") - attributes['Choix parfums'] = values_attributes - if product_line['Volume'] is not None: - values_attributes = self.get_list_variable_attributes(product_line['Volume']) - print(f"values_attributes = {values_attributes}") - attributes['Volume'] = product_line['Volume'] - """if product_line['Prix pour'] is not None: - values_attributes = self.get_list_variable_attributes(product_line['Prix pour']) - print(f"values_attributes = {values_attributes}") - attributes['Prix pour'] = values_attributes""" - else: - product_data['type'] = "simple" + # step3: make sure attributes are assigned to product + #print(f"attr = {self.product_manager.is_exist_attribute_in_one_product(product_id)}") + # if self.product_manager.is_exist_attribute_in_one_product(product_id): + if self.has_product_all_attributes(product_id=product_id, attributes=expected_attributes): + #self.assign_attributes_to_product(product_id=product_id, attributes=expected_attributes) + logger.info('attributs créés') - print('attributes') - pprint.pprint(attributes) + # step4: make sure tabs are assigned to product + if not self.product_manager.is_exist_tabs_in_one_product(product_id): + self.assign_tabs_to_product(product_id=product_id, tabs=expected_tabs) + logger.info('onglets crées') - tabs ={ - #"Description" : product_line["Description"], - "Conseils d'utilisation" : product_line["Conseils d’utilisation"], - "Précautions articles" : product_line["Précautions articles"], - #"Allergènes" : product_line["Allergènes"] - } - # ... associated categories - categories = self.get_list_category_for_product(product_line['Catégories']) - - # ... associated medias - medias = self.get_list_media_id_for_product(product_line) - - # create or update product - self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias, json_data=product_line) - + # step5: make sure media are assigned to product + if not self.has_product_all_medias(product_id=product_id, medias=expected_medias): + self.assign_medias_to_product(id=product_id, media_slugs=expected_medias) + logger.info('medias crées') + + if not self.has_product_all_datas(product_id, expected_product_data): + logger.info('ici') + def delete_all_informations(self): self.media_manager.delete_all_images() @@ -1594,18 +1871,8 @@ class OrderManager: if response.status_code == 200: orders = response.json() for index, order in enumerate(orders): - #print(f"index = {index}") - #print(f"order = {order}") self.wcapi.delete(f"orders/{order['id']}", params={"force": True}).json() - - """def find_order_id_by_slug(self, slug): - response = self.wcapi.get("orders/",params={"per_page": 100}) - if response.status_code == 200: - orders = response.json() - for cat in categories: - if cat['slug'] == slug: - return cat['id']""" class SeoManager(OdsReader): @@ -1683,53 +1950,8 @@ class SeoManager(OdsReader): #ALL_TABS = ["Allergènes", "Conseils d’utilisation", "Description", "Précautions articles"] #ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"] if __name__ == "__main__": - #seo_manager = SeoManager(ath=ath, filename_ods=FILENAME_ODS) - #pages = seo_manager.get_all_pages() - #seo_manager.update_seo_page() media_manager = MediaManager(ath=ath, filename_ods=FILENAME_ODS) - #media_manager.delete_media_by_slug('fondtzel-perlimpinpin') - #media_manager.upload_media() - #media_manager.delete_all_images() - #media_manager.assign_image_logo() - #category_manager = CategoryManager(wcapi=wcapi,ath=ath) - #category_manager.delete_all_category() - #order_manager = OrderManager(wcapi=wcapi,ath=ath) - #order_manager.delete_all_orders() - #product_manager = ProductManager(wcapi=wcapi,ath=ath) - #product_manager.delete_all_product() - #medias=media_manager.get_all_as_slug_dict() - #media_manager.delete_media_by_slug('pyramide-olfactive-frangipanier') - #product_manager.delete_product_by_slug("citron-meringue") - #product_manager.update_data_product() - #tab_manager = TabManager(wcapi=wcapi) - #attribute_manager = AttributeManager(wcapi=wcapi) - #variation_manager = VariationsManager(wcapi=wcapi) - #attribute_manager.create(ALL_ATTRIBUTES) - #attribute_manager.create() - #attribute_manager.configure_term() - #attribute_manager.delete_all_term() - #product_id = product_manager.find_id_by_slug("citron-meringue")""" - #woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager, variation_manager=variation_manager) - ##woocommerce_manager.delete_all_informations() # - #woocommerce_manager.create_all_informations() - ##woocommerce_manager.process_file(FILENAME_ODS) - #category_manager.update_data_categories() - #woocommerce_manager.delete_all_informations() - #woocommerce_manager.delete_information_by_slug() - #woocommerce_manager.create_all_informations() - #woocommerce_manager.create_all_categories_and_products() - #woocommerce_manager.update_product_tab() - #woocommerce_manager.tab_manager.delete_by_product_id(1890) - #woocommerce_manager.tab_manager.delete_all() - #woocommerce_manager.update_product() - #woocommerce_manager.attribute_manager.delete_all_for_product() - #woocommerce_manager.update_product_attribute_by_slug('citron-meringue') - #woocommerce_manager.attribute_manager.delete_all_for_product() -"""tabs_in_product = [] -for tab in ALL_TABS: - tab_in_product = woocommerce_manager.tab_exists(1890, tab) - tabs_in_product.append(tab_in_product)""" """ utilisation diff --git a/wcctl.py b/wcctl.py index 4df38a4..c96c234 100644 --- a/wcctl.py +++ b/wcctl.py @@ -3,60 +3,151 @@ import argparse from woocommerce import API as WoocommerceApi from api import WoocommerceApiClient from api_woocommerce import AuthentificationWpApi, MediaManager, CategoryManager, ProductManager, AttributeManager, VariationsManager, TabManager, WooCommerceManager -import sys -from base64 import b64encode +import os, sys + +from logging.handlers import TimedRotatingFileHandler +import logging +logger = logging.getLogger(__name__) -def import_medias_ods(args, media_manager): - if args.media: - if args.media_regex: - try: - media_manager.upload_media(args.media_regex) - except ValueError: - print("error name product_regex") - elif args.media_range: - try: - parts = args.media_range.split(':') - start = int(parts[0]) -1 if parts[0] else 0 - end = int(parts[1]) if len(parts) > 1 and parts[1] else None - print(f"start = {start}, end = {end or 'fin'}") - media_manager.upload_media_from_to(start, end) - except ValueError: - print("❌ Mauvais format pour --media-range. Utilisez par exemple --media-range=1:40") - else: - start, end = 0, None - print("ℹ️ --media activé, mais aucune plage spécifiée.") +class WcCtlBase: + def __init__(self, args): + self.args = args + self._wcm = None + + @property + def wcm(self): + if self._wcm is None: + wcapi = WoocommerceApiClient( + url=self.args.wc_url, + wc_consumer_key=self.args.wc_key, + wc_consumer_secret=self.args.wc_secret, + wp_application_user="admin_lcdm", + wp_application_password= "Do4p tLYF 5uje PF2M 21Zo x3OR", #"eAB4 49W6 ZRBj zIZc fH62 tv5c", #"yTW8 Mc6J FUCN tPSq bnuJ 0Sdw", # + wp_api=True, + version="wc/v3", + verify_ssl=True, + timeout=30 + ) + + ath = AuthentificationWpApi() + media_manager = MediaManager(ath, wcapi, filename_ods=self.args.ods_path) + category_manager = CategoryManager(wcapi, ath, filename_ods=self.args.ods_path) + product_manager = ProductManager(wcapi, ath, filename_ods=self.args.ods_path) + attribute_manager = AttributeManager(wcapi, filename_ods=self.args.ods_path) + tab_manager = TabManager(wcapi, filename_ods=self.args.ods_path) + variation_manager = VariationsManager(wcapi, filename_ods=self.args.ods_path) + self._wcm = WooCommerceManager(wcapi=wcapi, + media_manager=media_manager, + category_manager=category_manager, + product_manager=product_manager, + tab_manager=tab_manager, + attribute_manager=attribute_manager, + variation_manager=variation_manager, + filename_ods=self.args.ods_path) -def import_products_ods(args, woocommerce_manager): - if args.product: - if args.product_regex: - try: - woocommerce_manager.process_file(args.product_regex) - except ValueError: - print("error name product_regex") - elif args.product_range: - try: - parts = args.product_range.split(':') - start = int(parts[0]) -1 if parts[0] else 0 - end = int(parts[1]) if len(parts) > 1 and parts[1] else None - print(f"start = {start}, end = {end or 'fin'}") - woocommerce_manager.process_file_from_to(start, end) - except ValueError: - print("❌ Mauvais format pour --product-range. Utilisez par exemple --product-range=1:40") - else: - start, end = 0, None - print("ℹ️ --product activé, mais aucune plage spécifiée.") + return self._wcm + +class DeleteWcCtl(WcCtlBase): + def action(self): + logger.info(f"DeleteWcCtl:action({self.args}) - begin") + if self.args.all: + return self.wcm.delete_all_informations() + + if self.args.attributes: + return None + +class ImportOdsCtl(WcCtlBase): + def action(self): + logger.info(f"ImportOdsCtl:action({self.args}) - begin") + + if self.args.media: + self.import_medias_ods() + + if self.args.category: + medias = self.wmc.mm.get_all_as_slug_dict() + self.wmc.cm.medias = medias + regex = self.args.category_regex if self.args.category_regex else None + self.wcm.cm.update_data_categories(regex) + + if self.args.attribute: + regex = self.args.attribute_regex if self.args.attribute_regex else None + self.wcm.am.create(regex) + self.wcm.am.configure_term() + + if self.args.product: + self.import_products_ods() + + if self.args.logo: + self.wcm.mm.assign_image_logo() + + + def import_medias_ods(self): + if self.args.media: + if self.args.media_regex: + try: + self.wcm.mm.upload_media(self.args.media_regex) + except ValueError: + logger.error("error name product_regex") + elif self.args.media_range: + try: + parts = self.args.media_range.split(':') + start = int(parts[0]) -1 if parts[0] else 0 + end = int(parts[1]) if len(parts) > 1 and parts[1] else None + logger.debug(f"start = {start}, end = {end or 'fin'}") + self.wcm.mm.upload_media_from_to(start, end) + except ValueError: + print("❌ Mauvais format pour --media-range. Utilisez par exemple --media-range=1:40") + else: + start, end = 0, None + print("ℹ️ --media activé, mais aucune plage spécifiée.") + + def import_products_ods(self): + if self.args.product: + if self.args.product_regex: + try: + self.wcm.process_file(self.args.product_regex) + except ValueError: + print("error name product_regex") + elif self.args.product_range: + try: + parts = self.args.product_range.split(':') + start = int(parts[0]) -1 if parts[0] else 0 + end = int(parts[1]) if len(parts) > 1 and parts[1] else None + print(f"start = {start}, end = {end or 'fin'}") + self.wcm.process_file_from_to(start, end) + except ValueError: + print("❌ Mauvais format pour --product-range. Utilisez par exemple --product-range=1:40") + else: + start, end = 0, None + print("ℹ️ --product activé, mais aucune plage spécifiée.") def main(): - ath = AuthentificationWpApi() + # initialize logging + log_directory = "logs" - #ctx = ssl.create_default_context() - #ctx.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK') - #http = urllib3.PoolManager(ssl_context=ctx) - #credentials = b64encode(f"{args.wc_key}:{args.wc_secret}").decode("utf-8") - - + # 🔧 Configuration du handler avec rotation quotidienne + log_file = os.path.join(log_directory, "woocommerce.log") + handler = TimedRotatingFileHandler( + filename=log_file, + when="midnight", # ⏰ Rotation tous les jours à minuit + interval=1, # 📅 Chaque 1 jour + backupCount=7, # ♻️ Garde les 7 derniers fichiers de log + encoding='utf-8' # 🧾 Pour supporter tous les caractères + ) + + # 📋 Format du log + formatter = logging.Formatter( + fmt="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + handler.setFormatter(formatter) + + # 🔌 Récupère le logger + logger.setLevel(logging.DEBUG) # 👁 Niveau minimum à capturer + logger.addHandler(handler) + parser = argparse.ArgumentParser(prog='wcctl', description='WooCommerce CLI controller') # 🌐 Options globales @@ -70,6 +161,10 @@ def main(): # 🧱 Sous-commandes subparsers = parser.add_subparsers(dest='command', required=True) + delete_parser = subparsers.add_parser('delete', help='delete entities') + delete_parser.add_argument('--attributes', action="store_true", required=False, default=False, help='delete all attributes') + delete_parser.add_argument('--all', action='store_true', required=False, default=False, help='Delete media, categories, products, attributes, tabs') + # 📥 Commande : import-ods import_parser = subparsers.add_parser('import-ods', help='Import ODS file data') import_parser.add_argument('--ods-path', required=True, help='Path to the ODS file') @@ -95,11 +190,7 @@ def main(): # product import_parser.add_argument('--product', action='store_true', help='import all products') import_parser.add_argument('--product-regex', type=str, help='Regex to filter and import product by name') - import_parser.add_argument('--product-range', type=str, help='Range of product rows to process (e.g., 10:30)') - - - # delete all informations - import_parser.add_argument('--delete-all', action='store_true', help='Delete media, categories, products, attributes, tabs') + import_parser.add_argument('--product-range', type=str, help='Range of product rows to process (e.g., 10:30)') # 📥 Commande : import-ods checkssl_parser = subparsers.add_parser('checkssl', help='Check ssl connectivity') @@ -109,29 +200,13 @@ def main(): # Analyse des arguments args = parser.parse_args() - - """wcapi = WoocommerceApi( - url=args.wc_url, - consumer_key=args.wc_key, - consumer_secret=args.wc_secret, - wp_api=True, - version="wc/v3", - verify_ssl=False, - timeout=30 -)""" - wcapi = WoocommerceApiClient( - url=args.wc_url, - wc_consumer_key=args.wc_key, - wc_consumer_secret=args.wc_secret, - wp_application_user="admin_lcdm", - wp_application_password= "Do4p tLYF 5uje PF2M 21Zo x3OR", #"eAB4 49W6 ZRBj zIZc fH62 tv5c", #"yTW8 Mc6J FUCN tPSq bnuJ 0Sdw", # - wp_api=True, - version="wc/v3", - verify_ssl=True, - timeout=30 - ) - + if args.command == "delete": + sys.exit(DeleteWcCtl(args).action()) + + if args.command == "import-ods": + sys.exit(ImportOdsCtl(args).action()) + if args.command == "check": #wcctl-api-rw #class WoocommerceApiClient(API) def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs): @@ -146,20 +221,6 @@ def main(): verify_ssl=True, timeout=30 ) - print("recupération d'un produit", end="") - response = wcapi.get("products", params={"per_page": 1, "page": 1}) - print(response) - print("OK") - - """wcapi = WoocommerceApi( - url=args.wc_url, - consumer_key="wcctl-api-rw", - consumer_secret="NUrp R7tI oDKr FSqT hhf8 KxOT", - wp_api=True, - version="wp/v2", - verify_ssl=False, - timeout=30 - )""" print("recupération d'un media", end="") response = wcapi.get("media", params={"per_page": 100, "page": 1}) print(response) @@ -173,53 +234,6 @@ def main(): print(f'connection OK') sys.exit(0) - - ath = AuthentificationWpApi() - media_manager = MediaManager(ath, wcapi, filename_ods=args.ods_path) - category_manager = CategoryManager(wcapi, ath, filename_ods=args.ods_path) - product_manager = ProductManager(wcapi, ath, filename_ods=args.ods_path) - attribute_manager = AttributeManager(wcapi, filename_ods=args.ods_path) - tab_manager = TabManager(wcapi, filename_ods=args.ods_path) - variation_manager = VariationsManager(wcapi, filename_ods=args.ods_path) - woocommerce_manager = WooCommerceManager(wcapi=wcapi, - media_manager=media_manager, - category_manager=category_manager, - product_manager=product_manager, - tab_manager=tab_manager, - attribute_manager=attribute_manager, - variation_manager=variation_manager, - filename_ods=args.ods_path) - - # Dispatch en fonction de la commande - #if args.command == 'import-ods': - # import_medias_ods(args) - #print(f"🔍 args.media = {args.media}") - #print(f"🔍 args.media_range = {args.media_range}") - print(f"args = {args}") - - if args.media: - import_medias_ods(args, media_manager) - - if args.delete_all: - woocommerce_manager.delete_all_informations() - - if args.category: - medias = media_manager.get_all_as_slug_dict() - category_manager.medias = medias - regex = args.category_regex if args.category_regex else None - category_manager.update_data_categories(regex) - - if args.attribute: - regex = args.attribute_regex if args.attribute_regex else None - attribute_manager.create(regex) - attribute_manager.configure_term() - - if args.product: - import_products_ods(args, woocommerce_manager) - - if args.logo: - media_manager.assign_image_logo() - if __name__ == "__main__": main()