from woocommerce import API as WoocommerceApi from pathlib import Path import pandas as pd import ezodf import requests import pprint import base64 import unicodedata import os from watermark import create_watermark_image from base64 import b64encode import logging logger = logging.getLogger(__name__) # 🧪 Test """logger.debug("Démarrage du programme (DEBUG)") logger.info("Traitement en cours (INFO)") logger.warning("Avertissement (WARNING)") logger.error("Erreur (ERROR)") logger.critical("Erreur critique (CRITICAL)")""" # via consumer key and consumer secret : # https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768 #url="https://lescreationsdemissbleue.local", #url="https://les-creations-de-missbleue.local", #consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e", #consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768", class AuthentificationWpApi(): # Identifiants WordPress (et non WooCommerce) wordpress_username = "admin_lcdm" # Remplace par ton username WordPress wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #'W6Zt N5CU 2Gj6 TlKm clGn LvIz' #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs # Générer l'authentification Basic en base64 auth_str = f"{wordpress_username}:{wordpress_application_password}" auth_bytes = auth_str.encode("utf-8") auth_base64 = base64.b64encode(auth_bytes).decode("utf-8") ath = AuthentificationWpApi() WEBSITE_URL = "https://les-creations-de-missbleue.local" #WEBSITE_URL = "https://les-creations-de-missbleue.com" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods" #BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" #BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" BASE_PATH = "photos/photos_site/Photos_site/" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_version_finale.ods" class OdsReader: def __init__(self, filename_ods=FILENAME_ODS): self.filename_ods = filename_ods def get_all_product_lines(self): return self.get_doc_ods(2) def fetch_all_product_rows(self, start, end=None): return self.extract_ods_row(2, start, end) def get_product_line_by_value(self, search_value): return self.get_doc_ods_by_value(2, search_value) def get_product_by_slug_from_ods(self, slug): for product in self.get_all_product_lines(): if product['Slug'] == slug: return product return None def get_all_media_lines(self): return self.get_doc_ods(0) def fetch_all_media_rows(self, start, end=None): return self.extract_ods_row(0, start, end) def get_media_line_by_value(self, search_value): return self.get_doc_ods_by_value(0, search_value) def get_all_attribute_and_tab_lines(self): return self.get_doc_ods(3) def get_attribute_and_tab_lines(self, search_value): return self.get_doc_ods_by_value(3, search_value) def get_all_category_lines(self): return self.get_doc_ods(1) def get_category_line_by_value(self, search_value): return self.get_doc_ods_by_value(1, search_value) def get_all_seo_lines(self): return self.get_doc_ods(6) def get_doc_ods(self, number_sheet): doc = ezodf.opendoc(self.filename_ods) sheet = doc.sheets[number_sheet] data = [] for row in sheet.rows(): data.append([cell.value for cell in row]) df = pd.DataFrame(data) df.columns = df.iloc[0] df = df[1:].reset_index(drop=True) df = df.dropna(how='all') json_data = df.to_dict(orient="records") return json_data def get_doc_ods_by_value(self, number_sheet, search_value=None): doc = ezodf.opendoc(self.filename_ods) sheet = doc.sheets[number_sheet] data = [] for row in sheet.rows(): data.append([cell.value for cell in row]) df = pd.DataFrame(data) df.columns = df.iloc[0] df = df[1:].reset_index(drop=True) df = df.dropna(how='all') if search_value: try: print(f"Recherche de la valeur : {search_value}") # Vérifier que le DataFrame n'est pas vide if df.empty: raise ValueError("Le DataFrame est vide") # Nettoyer le search_value pour enlever les espaces superflus search_value = str(search_value).strip() # Dynamique sur la colonne à rechercher column_name = 'Nom' # à modifier selon la situation if column_name not in df.columns: raise ValueError(f"La colonne '{column_name}' n'existe pas dans le DataFrame") # Supprimer les espaces avant et après dans la colonne cible df[column_name] = df[column_name].str.strip() # Remplir les NaN par des chaînes vides df[column_name] = df[column_name].fillna('') # Recherche avec contains sur la colonne mask = df[column_name].str.contains(str(search_value), case=False, na=False) #print(f"Masque généré :\n{mask}") if mask.sum() == 0: # Si aucune ligne ne correspond raise ValueError(f"Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'") # Filtrage du DataFrame df = df[mask] #print(f"df après filtrage :\n{df}") except ValueError as ve: #print(f"Erreur : {ve}") logger.exception(f"🚫 Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'") except Exception as e: #print(f"Erreur lors de la recherche : {e}") logger.exception(f"🚫 Erreur lors de la recherche de '{search_value}' dans la colonne '{column_name}'. Exception : {e}") else: print("Aucun search_value fourni") # Convertir en json_data pour le retour json_data = df.to_dict(orient="records") return json_data def extract_ods_row(self, number_sheet, start_row=None, end_row=None): doc = ezodf.opendoc(self.filename_ods) sheet = doc.sheets[number_sheet] data = [] for row in sheet.rows(): data.append([cell.value for cell in row]) logger.debug(f'extract_ods_row: {len(data)} rows found in ods sheet #{number_sheet}') df = pd.DataFrame(data) df.columns = df.iloc[0] df = df[1:].reset_index(drop=True) if start_row is not None and end_row is not None: df = df.iloc[start_row:end_row] elif start_row is not None: df = df.iloc[start_row:] elif end_row is not None: df = df.iloc[:end_row] """# Voir tous les noms de colonnes print(df.columns.tolist()) # Compter les doublons de noms from collections import Counter print([item for item, count in Counter(df.columns).items() if count > 1]) # Nettoyer les noms de colonnes (supprimer espaces, etc.) df.columns = df.columns.str.strip()""" df = df.dropna(how='all') return df.to_dict(orient="records") class MediaManager(OdsReader): def __init__(self, ath, wcapi, filename_ods):# filename_ods super().__init__(filename_ods) # filename_ods self.ath = ath self.wcapi = wcapi #self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" #self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings" self.dict_equivalence = {'ambre':'ambré', 'meringue':'meringué', 'givree':'givrée', 'sale':'salé', 'bresilien':'brésilien', 'epices':'épices', 'noel':'noël', 'a':'à', 'petales':'pétales', 'lumiere':'lumière', 'allumee':'allumée', 'eteinte':'éteinte', 'celebration':'célébration', 'argente':'argenté', 'dore':'doré', 'accroche':'accroché', 'pose':'posé', 'colore':'coloré', 'kevin': 'Kévin', 'interieur':'intérieur', 'cafe':'café', 'bresil':'Brésil', 'dagrumes': "d'agrumes", "iles":"îles", 'apero': 'apéro', 'quebecois':'québecois', 'defendu':'défendu', 'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé', 'peche' : 'pêche' } def upload_media(self, search_value=None): if search_value: json_data = self.get_media_line_by_value(search_value) else: json_data = self.get_all_media_lines() for media in json_data: media_chemin = media['Chemin'].replace("\\", "/") path = Path(BASE_PATH + media_chemin) image_name = path.name try: if not self.is_exists(media, image_name): image_path = BASE_PATH + media_chemin # 👇 Tentative d'ouverture et d'envoi with open(image_path, "rb") as image_file: response = self.wcapi.post("media",files={"file": image_file}) if response.status_code == 201: media_data = response.json() self.update_data_media(media, media_data['id']) logger.info(f"✅ Image uploadée : {image_name}") else: logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}") else: logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}") except FileNotFoundError: logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})") except requests.RequestException as e: logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}") except Exception as e: logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}") def create_and_update_media(self, media, image_name, path, watermark=False): #print(f"image_path = {path}") try: if not self.is_exists(media, image_name): if watermark: image_path = path else: media_chemin = media['Chemin'].replace("\\", "/") image_path = BASE_PATH + media_chemin # 👇 Tentative d'ouverture et d'envoi with open(image_path, "rb") as image_file: response = self.wcapi.post("media",files={"file": image_file}) #print(f"response = {response.status_code}") if response.status_code == 201: media_data = response.json() self.update_data_media(media, media_data['id']) logger.info(f"✅ Image uploadée : {image_name}") else: logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}") else: if self.is_txt_alt_exists(media, image_name): media_id = self.is_txt_alt_exists(media, image_name) self.update_data_media(media, media_id) logger.info(f"✅ Image déjà existante et mise à jour : {image_name}") else: logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}") except FileNotFoundError: logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})") except requests.RequestException as e: logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}") except Exception as e: logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}") def upload_media_from_to(self, range_start, range_end=None): json_data = self.fetch_all_media_rows(range_start, range_end) logger.debug(f'{len(json_data)} to process') for media in json_data: media_chemin = media['Chemin'].replace("\\", "/") path = Path(BASE_PATH + media_chemin) image_name = path.name #first_folder = media['Chemin'].split("\\")[0] first_folder = media_chemin.split("/")[0] watermarked_path = Path(create_watermark_image(str(path))) watermarked_name = watermarked_path.name if first_folder == 'Logo': self.create_and_update_media(media,image_name,path) else: self.create_and_update_media(media, watermarked_name, watermarked_path, True) try: os.remove(watermarked_path) except FileNotFoundError: logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})") def is_exists(self, media, image_name): all_images = self.get_all_images() name_without_extension, extension = os.path.splitext(image_name) for image in all_images: if media['Slug'] == image['slug']: return True else: pass return False def is_txt_alt_exists(self, media, image_name): all_images = self.get_all_images() name_without_extension, extension = os.path.splitext(image_name) for image in all_images: if media['Slug'] == image['slug']: if image['alt_text'] == "": return image['id'] else: pass return False def get_title_and_alt_text_media(self, media): sentence = media['Slug'].replace('-', ' ') sentence = sentence.replace('img', '').strip() title = sentence.capitalize() alt_text = title return title, alt_text def update_accent_in_sentence(self, sentence): words = sentence.split() new_words = [self.dict_equivalence[word.lower()] if word.lower() in self.dict_equivalence else word for word in words] new_sentence = " ".join(new_words) return new_sentence def update_data_media(self, media, id_img): if media['Nom'] is None or media['Description'] is None: title, alt_text = self.get_title_and_alt_text_media(media) else: title = media['Nom'] alt_text = media['Description'] title = self.update_accent_in_sentence(title) alt_text = self.update_accent_in_sentence(alt_text) update_data = { "title" : title, "alt_text": alt_text, "slug": media['Slug'], } media_chemin = media['Chemin'].replace("\\", "/") path = Path(BASE_PATH + media_chemin) image_name = path.name response = self.wcapi.post(f"media/{id_img}", data=update_data) """response = self.wcapi.post( f"media/{id_img}", headers={ "Authorization": f"Basic {self.ath.auth_base64}", #"Authorization": f"Basic {self.ath['auth_base64']}", "Content-Disposition": f"attachment; filename={image_name}", "User-Agent": "Mozilla/5.0" }, json=update_data, verify=False )""" if response.status_code == 200: return response.json() else: return None def find_id_by_name(self, name): images = self.get_all_images() for img in images: #pprint.pprint(img) if img['title']['rendered'] == name: return img['id'] def find_id_by_slug(self, slug): images = self.get_all_images() for img in images: if img['slug'] == slug: return img['id'] def get_all_as_slug_dict(self): all_slug_dict = {} images = self.get_all_images() for img in images: all_slug_dict[img['id']] = img['slug'] return all_slug_dict def delete_media_by_slug(self, slug): images = self.get_all_images() for img in images: if img['slug'] == slug: self.wcapi.delete(f"media/{img['id']}", params = {"force": True}) def get_all_images(self): """Récupère toutes les images en gérant la pagination""" all_images = [] page = 1 #print(f"self.ath.auth_base64 = {self.ath.auth_base64}") while True: """response = self.wcapi.get(f"wp-json/wp/v2/media?per_page=100&page={page}", headers={"Authorization": f"Basic {self.ath.auth_base64}", "User-Agent": "Mozilla/5.0"}, verify=False )""" response = self.wcapi.get("media", params={"per_page": 100, "page": page}) #print(f"response.status_code = {response.status_code}") if response.status_code != 200: break images = response.json() if not images: break all_images.extend(images) page += 1 return all_images def delete_images(self, images): """Supprime toutes les images récupérées""" for img in images: img_id = img['id'] response = self.wcapi.delete(f"media/{img_id}") """delete_url = f"media/{img_id}?force=true" response = requests.delete(delete_url, headers={"Authorization": f"Basic {self.ath.auth_base64}", "User-Agent": "Mozilla/5.0"}, #{"Authorization": f"Basic {self.ath['auth_base64']}"}, verify=False)""" if response.status_code in [200, 410]: # 410 = déjà supprimé print(f"Image {img_id} supprimée.") else: print(f"Erreur suppression {img_id} :", response.status_code, response.text) def delete_all_images(self): images = self.get_all_images() for img in images: img_id = img['id'] response = self.wcapi.delete(f"media/{img_id}") if response.status_code in [200, 410]: # 410 = déjà supprimé print(f"Image {img_id} supprimée.") else: print(f"Erreur suppression {img_id} :", response.status_code, response.text) def assign_image_logo(self): images = self.get_all_images() for img in images: if img['slug'] == "img-logo-lescreationsdemissbleue": data = { "site_logo":img['id'], "site_icon" : img['id'] } response = self.wcapi.post("settings", data=data) if response.status_code == 200: print("Logo mis à jour avec succès !") else: print(f"Erreur lors de la mise à jour du logo : {response.text}") class CategoryManager(OdsReader): def __init__(self, wcapi, ath, filename_ods, medias=None): super().__init__(filename_ods) self.wcapi = wcapi self.ath = ath self.medias = medias self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", "Content-Type": "application/json" } def find_id_by_slug(self, slug): response = self.wcapi.get("products/categories/",params={"per_page": 100}) if response.status_code == 200: categories = response.json() for cat in categories: if cat['slug'] == slug: return cat['id'] def find_id_by_name(self, name): response = self.wcapi.get("products/categories/",params={"per_page": 100}) if response.status_code == 200: categories = response.json() for cat in categories: if cat['name'] == name: return cat['id'] def create_category(self, name, description, slug): category_data = { "name": name, "description": description, "slug":slug } if self.find_id_by_slug(slug): logger.debug(f"Catégorie contenant comme slug '{slug}' existe déjà") else: try: response = self.wcapi.post("products/categories/", category_data) if response.status_code == 201: logger.info(f"Catégorie créé avec succès. ID: {response.json()['id']}") else: logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}") except Exception as e: logger.error(f"Erreur inattendue lors de l'envoi de la catégorie à WooCommerce: {e}") def assign_parent_category(self, parent_slug, slug): response = self.wcapi.get("products/categories/",params={"per_page": 100}) if response.status_code == 200: categories = response.json() for cat in categories: parent_id = self.find_id_by_parent_slug(parent_slug) if parent_id: if cat['slug'] == slug: self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id}) def find_id_by_parent_slug(self, parent_slug): response = self.wcapi.get("products/categories/",params={"per_page": 100}) if response.status_code == 200: categories = response.json() for cat in categories: if cat['slug'] == parent_slug: return cat['id'] def find_media_id_by_slug(self, media_slug): for id, slug in self.medias.items(): if media_slug == slug: return id def update_media_id_for_category(self, media_id, cat_id): response = self.wcapi.get(f"media/{media_id}", params={"per_page": 1, "page": 1}) update_category_data = { "image" : {'id':media_id}, } self.wcapi.put(f"products/categories/{cat_id}", update_category_data) def update_data_categories(self, search_value=None): if search_value: json_data = self.get_category_line_by_value(search_value) else: json_data = self.get_all_category_lines() for category in json_data: self.create_category(category['Nom'], category['Description'], category['Slug']) cat_id = self.find_id_by_slug(category['Slug']) media_id = self.find_media_id_by_slug(category['Media Slug']) self.assign_parent_category(category['Parent Slug'], category['Slug']) self.update_media_id_for_category(media_id,cat_id) def delete_all_category(self): response = self.wcapi.get(f"products/categories",params={"per_page": 100}) for cat in response.json(): self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True}) def delete_media_category(self, media_slug): media_id = self.find_media_id_by_slug(media_slug) self.wcapi.delete(f"media/{media_id}") def delete_category_by_id(self, category_id): self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) def delete_category_by_slug(self, slug): category_id = self.find_id_by_slug(slug) #print(f"category_id = {category_id}") self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) def get_errors(self): return print(f"self.error_log = {self.error_log}") class ProductManager(OdsReader): def __init__(self, wcapi, ath, filename_ods, medias=None): super().__init__(filename_ods) self.wcapi = wcapi self.ath = ath self.medias = medias self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0" } self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" def update_data_list_cat_product(self, list_category_id, list_img_id, product_id): product_data = { 'categories':list_category_id, 'images':list_img_id, } self.wcapi.put(f"products/{product_id}", product_data) def update_data_list_category_product(self, product_id, list_category_id): #print(f"list_category_id = {list_category_id}") categories = [{'id': cat_id} for cat_id in list_category_id if cat_id is not None] product_data = { 'categories': categories, } self.wcapi.put(f"products/{product_id}", product_data) def update_data_list_medias_product(self, product_id, list_medias_id): images = [{'id': media_id} for media_id in list_medias_id if media_id is not None] product_data = { 'images':images, } logger.debug(f"{product_data} - {product_id}") self.wcapi.put(f"products/{product_id}", product_data) def get_list_media_id_for_product(self, medias): list_media_id_for_product = [] for id, media_slug in self.medias.items(): for media in medias: if media == media_slug: image_id = {'id':id} list_media_id_for_product.append(image_id) return list_media_id_for_product[::-1] def get_list_category_for_product(self, categories): response = self.wcapi.get("products/categories",params={"per_page": 100}) list_category_for_product = [] #logger.debug(f"response.json() = {response.json()}") try : for category in response.json(): for cat in categories: if category['name'] == cat: id_category = {'id':category['id']} list_category_for_product.append(id_category) #logger.debug(f"list_category_for_product = {list_category_for_product}") return list_category_for_product except requests.exceptions.JSONDecodeError as e: logger.debug(f"text = {response.text}") raise e def find_product_by_id(self, id): response = self.wcapi.get(f"products/{id}") if response.status_code == 200: product = response.json() return product def find_id_by_slug(self, slug): response = self.wcapi.get("products",params={"slug": slug}) if response.status_code == 200: products = response.json() for pro in products: if pro['slug'] == slug: return pro['id'] return None """def find_id_by_slug(self, slug): response = self.wcapi.get("products/",params={"per_page": 100}) if response.status_code == 200: products = response.json() for pro in products: if pro['slug'] == slug: return pro['id']""" def find_media_id_by_slug(self, media_slug): for id, slug in self.medias.items(): if media_slug == slug: return id def is_exist_attribute_in_one_product(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response: products_info = response.json() if products_info['attributes']: return True else: return False else: return False def is_exist_tabs_in_one_product(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response: #logger.info(pprint.pformat(response.json())) is_exist_tab = False products_info = response.json() for data in products_info['meta_data']: if data.get('key') == "wb_custom_tabs" and not data.get('value'): is_exist_tab = True break if is_exist_tab: return True else: return False else: return False """def get_product_data_by_api(self, product_id, product_datas): response = self.wcapi.get(f"products/{product_id}") products_info = response.json() new_datas = {} for key, data in product_datas.items: if products_info[key] == data: pass else: products_info[key] = data new_datas[key] = data self.wcapi.put(f"products/{product_id}", new_datas)""" def get_product_data_by_api(self, product_id): response = self.wcapi.get(f"products/{product_id}") products_info = response.json() return products_info def create_tabs_from_custom_dict(self, product_id, product): product_tabs_data = {} list_product_tabs_data = [] x = 1 for key in product.keys(): if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description": #or key == "Allergènes": product_tabs_data['title'] = key product_tabs_data['content'] = product[key] product_tabs_data['nickname'] = '' product_tabs_data['position'] = x product_tabs_data['tab_type'] = 'local' list_product_tabs_data.append(product_tabs_data) product_tabs_data = {} x += 1 response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: meta_data = [] meta_data.append( {'key': 'wb_custom_tabs', 'value': list_product_tabs_data} ) meta_data_data = { 'meta_data': meta_data } res = self.wcapi.post(f"products/{product_id}", meta_data_data) else: print(f"error - {product_id} - {response.status_code}") def create_product(self, product_data): logger.debug(f"ProductManager::create_product called (product_data = {product_data})") if product_data is None: logger.info(f"ProductManager::create_product called no product_data supplied - ignored") return None try: product_id = self.find_id_by_slug(product_data['slug']) if product_id: return product_id else: response = self.wcapi.post("products/", product_data) if response.status_code == 201: logger.debug(f"ProductManager::create_product product created {response.json()['id']} - end") return response.json()['id'] else: logger.error(f"Erreur lors de la création du produit. {product_data['name']} Code: {response.status_code}, Message: {response.text}") # Le produit n'a pas été créé, mais il y a une réponse avec un code d'erreur except Exception as e: logger.error(f"Erreur inattendue lors de l'envoi du produit à WooCommerce: {e}") logger.debug(f"ProductManager::create_product - end") def update_data_product(self, product_data, categories, medias, json_data): #json_data = self.get_all_product_lines() for field in json_data: #logger.info(f"update_data_product = {product}") product_id = self.find_id_by_slug(product_data['slug']) list_category_id = self.get_list_category_for_product(categories) list_img_id = self.get_list_media_id_for_product(medias) self.update_data_list_cat_product(list_category_id, list_img_id, product_id) def update_data_product_by_slug(self, slug): json_data = self.get_all_product_lines() for product in json_data: if product['Slug'] == slug: self.create_product(product) product_id = self.find_id_by_slug(product['Slug']) list_category_id = self.get_list_category_for_product(product['Catégories']) list_img_id = self.get_list_media_id_for_product(product['Media Slugs']) self.update_data_list_cat_product(list_category_id, list_img_id, product_id) def get_all_products(self): """Récupère tous les produits en gérant la pagination""" all_products = [] page = 1 while True: response = self.wcapi.get("products", params={"per_page": 100, "page": page}) if response.status_code != 200: print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}") break products = response.json() if not products: # Si la page est vide, on arrête la boucle break all_products.extend(products) page += 1 # On passe à la page suivante return all_products def get_all_categories_as_list(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response: product = response.json() return product['categories'] else: return [] def get_all_medias_as_list(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response: product = response.json() return product['images'] else: return [] def get_all_attributes_as_list(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response: product = response.json() return product["attributes"] else: [] def delete_product(self): json_data = self.get_all_product_lines() for product in json_data: list_products = self.wcapi.get(f"products/") for pro in list_products.json(): if product['Nom'] == pro['name']: self.wcapi.delete(f"products/{pro['id']}") def delete_all_product(self): products = self.get_all_products() if products: for pro in products: self.wcapi.delete(f"products/{pro['id']}", params={"force": True}) def delete_media_product(self, media_slug): media_id = self.find_media_id_by_slug(media_slug) self.wcapi.delete(f"media/{media_id['id']}") def delete_product_by_id(self, product_id): self.wcapi.delete(f"products/{product_id}", params={"force": True}) def delete_product_by_slug(self, slug): product_id = self.find_id_by_slug(slug) self.wcapi.delete(f"products/{product_id}", params={"force": True}) def normalize_string(text): return unicodedata.normalize("NFKC", text).strip().lower() def tab_exists(self, product_id, name_tab): response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: response_json = self.wcapi.get(f"products/{product_id}").json() for meta_data in response_json['meta_data']: for key_meta_data, value_meta_data in meta_data.items(): if key_meta_data == "value": if isinstance(value_meta_data, list): for tab in value_meta_data: if name_tab == tab['title']: return True return False class AttributeManager(OdsReader): def __init__(self, wcapi, filename_ods): super().__init__(filename_ods) self.wcapi = wcapi def get_attributes(self): attributes = self.wcapi.get(f"products/attributes").json() one_attribute = self.wcapi.get(f"products/attributes/1/terms").json() return attributes def get_by_name(self, name): attributes = self.wcapi.get(f"products/attributes").json() for attr in attributes: if attr['name'] == name: attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json() return attribute def get_list_name_data(self): list_name_data = [] json_data = self.get_all_attribute_and_tab_lines() for item in json_data: #if item['Onglet'].strip() == "Informations Complémentaires": if item['Onglet'] == "Informations Complémentaires": list_name_data.append(item['Nom']) return list_name_data def create(self, search_value=None): if search_value: features_json_data = self.get_attribute_and_tab_lines(search_value) else: features_json_data = self.get_all_attribute_and_tab_lines() for item in features_json_data: #if item['Onglet'].strip() == "Informations Complémentaires": if item['Onglet'] == "Informations Complémentaires": attribute_data = { 'name' : item["Nom"] } self.wcapi.post(f"products/attributes", attribute_data) def get_term(self, search_value=None): term_dict = {} if search_value: term_json_data = self.get_attribute_and_tab_lines(search_value) else: term_json_data = self.get_all_attribute_and_tab_lines() for item in term_json_data: list_item = [] #if item['Onglet'].strip() == "Informations Complémentaires": if item['Onglet'] == "Informations Complémentaires": if "," in item["Valeurs"]: list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")] else: item['Valeurs'].strip() if list_item: term_dict[item['Nom']] = list_item else: term_dict[item['Nom']] = item['Valeurs'] return term_dict def configure_term(self): term_dict = self.get_term() response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) if response.status_code == 200: attributes = response.json() for attribute in attributes: for name, value in term_dict.items(): if attribute['name'] == name: if isinstance(value, list): for v in value: term = { 'name' : v } self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) else: term = { 'name' : value } self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) def create_for_product(self, product_id, name, value, variation=False): data_attribute = { 'name': name, 'options':value } #logger.info(f"name = {name} - value = {value}") #list_product_tabs_data.append(data_tab) response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: print("passe d'abord ici 1") product_meta_data = response.json() existing_attributes_data = product_meta_data.get("attributes", []) already_exist = False """for data in existing_attributes_data: for key_data, value_data in data.items(): if key_data == "value": if isinstance(value_data, list): for value in value_data: if value['name'] == name: already_exist = True""" if already_exist == False: found = False #print(f"attributes_data = {existing_attributes_data}") #print(f"data_attribute = {data_attribute}") for attribute in existing_attributes_data: #logger.info(f"name = {name} - value = {value}") if attribute["name"] == name: attribute["options"].append(data_attribute) found = True break # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée if not found: if value is not None: value = [v.strip() for v in value.split(",")] logger.info(f"value = {value}") existing_attributes_data.append({ "name": name, "options": value, "visible":True, "variation": variation, #"parent_id":product_id }) attributes_data = { 'attributes': existing_attributes_data } logger.debug(f"existing_attributes_data = {existing_attributes_data}") res = self.wcapi.put(f"products/{product_id}", attributes_data) else: print('already_exist') else: print(f"error 1") def delete_all_for_product(self): response_product = self.wcapi.get(f"products/", params={"per_page": 100}) if response_product.status_code == 200: products = response_product.json() for product in products: existing_attributes_data = product.get("attributes", []) if existing_attributes_data == []: pass else: attribute_data = { 'attributes': [] } res = self.wcapi.post(f"products/{product['id']}", attribute_data) def delete_all_term(self): response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100}) if response_attribute.status_code == 200: attributes = response_attribute.json() for attribute in attributes: response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100}) if response_attribute_term.status_code == 200: attributes_term = response_attribute_term.json() for term in attributes_term: self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True}) def delete_all(self): response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) if response.status_code == 200: attributes = response.json() for attribute in attributes: self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True}) class TabManager(OdsReader): def __init__(self, wcapi,filename_ods): super().__init__(filename_ods) self.wcapi = wcapi def get_list_name_data(self, search_value=None): list_name_data = [] """if search_value: json_data = self.get_attribute_and_tab_lines(search_value) else:""" json_data = self.get_all_attribute_and_tab_lines() for item in json_data: if item['Onglet'].strip() != "Informations Complémentaires": list_name_data.append(item['Nom']) return list_name_data def create_or_update_for_product(self, product_id, tabs): position = 1 for title, content in tabs.items(): position += 1 data_tab = { 'title': title, 'content':content, 'nickname':'', 'position':position, 'tab_type': 'local' } response = self.wcapi.get(f"products/{product_id}") #logger.debug(response) if response.status_code == 200: print("passe d'abord ici 2") product_meta_data = response.json() existing_meta_data = product_meta_data.get("meta_data", []) already_exist = False for data in existing_meta_data: for key_data, value_data in data.items(): if key_data == "value": if isinstance(value_data, list): for value in value_data: if value['title'] == title: already_exist = True if already_exist == False: found = False for meta in existing_meta_data: if meta["key"] == "wb_custom_tabs": meta["value"].append(data_tab) found = True break # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée if not found: existing_meta_data.append({ "key": "wb_custom_tabs", "value": [data_tab] }) meta_data_data = { 'meta_data': existing_meta_data } res = self.wcapi.put(f"products/{product_id}", meta_data_data) else: #print('else') data_tab = { 'content':content, } meta_data_data = { 'meta_data': existing_meta_data } res = self.wcapi.put(f"products/{product_id}", meta_data_data) else: print(f"error 2") def delete_by_product_id(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: product_meta_data = response.json() existing_meta_data = product_meta_data.get("meta_data", []) if existing_meta_data == []: pass else: meta_data = { 'meta_data': [{"key": "wb_custom_tabs","value":[]}] } res = self.wcapi.post(f"products/{product_id}", meta_data) def delete_all(self): response = self.wcapi.get(f"products/", params={"per_page": 100}) if response.status_code == 200: product_meta_data = response.json() for product in product_meta_data: existing_meta_data = product.get("meta_data", []) if existing_meta_data == []: pass else: meta_data = { 'meta_data': [{"key": "wb_custom_tabs","value":[]}] } res = self.wcapi.post(f"products/{product['id']}", meta_data) class VariationsManager(OdsReader): def __init__(self, wcapi, filename_ods): super().__init__(filename_ods) self.wcapi = wcapi def get_attribute_id(self, product_data): response = self.wcapi.get(f"products/attributes") if response.status_code == 200: attributes = response.json() for key, value in product_data.items(): for attr_key, attr_value in attributes.items(): if attr_value['name'] == key: attribute_id = attr_value['id'] return attribute_id def update_product_attributes_merged(self, wcapi, product_id, attribute_name, new_options): """ Met à jour l'attribut d'un produit WooCommerce en ajoutant de nouvelles options, sans écraser les autres attributs existants. :param wcapi: Instance API WooCommerce (wcapi = API(...)) :param product_id: ID du produit à mettre à jour :param attribute_name: Nom de l'attribut à enrichir (ex: "Parfums") :param new_options: Liste des nouvelles valeurs à ajouter (ex: ["Lavande", "Citron"]) """ # Nettoyer les nouvelles options new_options = [opt.strip() for opt in new_options.split('|') if opt.strip()] # 1. Récupérer le produit existant response = wcapi.get(f"products/{product_id}") if response.status_code != 200: print(f"❌ Impossible de récupérer le produit {product_id}") return product = response.json() attributes = product.get("attributes", []) # 2. Chercher l'attribut ciblé found = False for attr in attributes: if attr["name"].lower() == attribute_name.lower(): existing_options = attr.get("options", []) merged_options = list(set(existing_options + new_options)) attr["options"] = merged_options attr["variation"] = True attr["visible"] = True attr["parent_id"] = product_id attr["manage_stock"] = "parent" found = True break # 3. Si l'attribut n'existe pas, on l'ajoute if not found: attributes.append({ "name": attribute_name, "variation": True, "visible": True, "options": new_options }) # 4. Mettre à jour le produit avec les attributs fusionnés update_data = { "attributes": attributes } update_res = wcapi.put(f"products/{product_id}", update_data) if update_res.status_code == 200: print(f"✅ Attribut '{attribute_name}' mis à jour avec succès.") else: print(f"❌ Erreur lors de la mise à jour : {update_res.status_code}") print(update_res.json()) def create_variations_products(self, product_id, product_data): products_lines = self.get_all_product_lines() product_line = self.get_product_by_slug_from_ods(product_data['slug']) parfums = None volumes = None price_per_product_variable = None if product_line['Type'] == "Variable": if product_line['Choix parfums'] is not None: parfums = [p.strip() for p in product_line['Choix parfums'].split(",")] print(f"parfums = {parfums}") if product_line['Volume'] is not None: volumes = [v.strip() for v in product_line['Volume'].split(",")] print(f"volumes = {volumes}") """if product_line['Prix pour'] is not None: #products = [v.strip() for v in product_line['Prix pour'].split(",")] products = product_line['Prix pour'].split(",") price_per_product_variable = {} for p in products: name, price = p.split("=") price_per_product_variable[name.strip()] = price.strip() pprint.pprint(price_per_product_variable)""" response = self.wcapi.get(f"products/{product_id}") try: if response.status_code == 200: #existing_product = response.json() #self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums) if parfums is not None: for parfum in parfums: data = { 'attributes': [ { 'name': 'Choix parfums', 'option': parfum } ], 'manage_stock': False, 'in_stock':True, 'regular_price': product_data['price'], } print(f"Posting variation: {data}") response = self.wcapi.post(f"products/{product_id}/variations", data) logger.info(f"Variation de parfums a bien été créé") if volumes is not None: for volume in volumes: data = { 'attributes': [ { 'name': 'Volume', 'option': volume } ], 'manage_stock': False, 'in_stock':True, 'regular_price': product_data['price'], } print(f"Posting variation: {data}") result = self.wcapi.post(f"products/{product_id}/variations", data) logger.info(f"Variation de volumes a bien été créé") """if price_per_product_variable is not None: for name, price in price_per_product_variable.items(): data = { 'attributes': [ { 'name': 'Volume', 'option': name } ], 'manage_stock': False, 'in_stock':True, 'regular_price': price, } result = self.wcapi.post(f"products/{product_id}/variations", data) logger.info(f"Variation de prix selon objet bien créé")""" except Exception as e: logger.exception(f"Erreur lors de la création du produit de variation : {e}") #logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}") """ for product_line_key, products_line_value in product_line.items(): if product_line_key == "Choix parfums": name_attribute = product_line_key parfums = products_line_value if product_line_key == "Type": if product_data['type'] == "Variable": response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: existing_product = response.json() self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums) parfums = [p.strip() for p in parfums.split("|") if p.strip()] response = self.wcapi.get(f"products/{product_id}/variations") if response.status_code == 200: for parfum in parfums: data = { 'attributes': [ { 'name': name_attribute, 'option': parfum } ], 'manage_stock': False, 'in_stock':True, 'regular_price': product_data['price'], } print(f"Posting variation: {data}") result = self.wcapi.post(f"products/{product_id}/variations", data) print(result.status_code) pprint.pprint(result.json()) else: return False""" class WooCommerceManager(OdsReader): def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager, filename_ods): super().__init__(filename_ods) self.wcapi = wcapi self.media_manager = media_manager self.category_manager = category_manager self.product_manager = product_manager self.tab_manager = tab_manager self.attribute_manager = attribute_manager self.variation_manager = variation_manager self.filename_ods = filename_ods # access to managers @property def mm(self): return self.media_manager @property def cm(self): return self.category_manager @property def pm(self): return self.product_manager @property def tm(self): return self.tab_manager @property def am(self): return self.attribute_manager @property def vm(self): return self.variation_manager def tab_exists(self, product_id, name_tab): return self.product_manager.tab_exists(product_id, name_tab) def get_product_tab_details(self): all_products_json = self.get_all_attribute_and_tab_lines() all_tabs = self.tab_manager.get_list_name_data() dict = {} for product in all_products_json: line = [] for tab in all_tabs: line.append([tab, product[tab]]) dict[product["Parfum"]] = line return dict def get_product_attributes_details(self): ret = [] all_products_json = self.get_all_product_lines() all_attributes = self.attribute_manager.get_list_name_data() for product in all_products_json: for attribute in all_attributes: ret.append([attribute, product[attribute]]) return ret def update_product_tab_by_slug(self, slug): product_id = self.product_manager.find_id_by_slug(slug) product = self.product_manager.find_product_by_id(product_id) products_tab_details = self.get_product_tab_details() x=1 for value in products_tab_details.values(): for key in products_tab_details.keys(): for title, content in value: if key: if key in product['short_description']: self.tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 else: pass else: print('no key') x=1 def update_product_attribute_by_slug(self, slug): product_id = self.product_manager.find_id_by_slug(slug) product_ods = self.get_product_by_slug_from_ods(slug) products_attribute_details = self.get_product_attributes_details() for name, value in products_attribute_details: self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_ods['Type'])) def update_product(self): self.update_product_tab() def update_product_by_slug(self, slug): self.product_manager.update_data_product_by_slug(slug) self.update_product_tab_by_slug(slug) #self.update_product_attribute_by_slug(slug) def create_all_informations(self): medias = self.media_manager.get_all_as_slug_dict() self.product_manager.medias = medias self.process_file(FILENAME_ODS) self.update_product() def get_list_category_for_product(self, category): category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")] return category_list_by_doc def get_list_variable_attributes(self,attributes): list_variable_attributes_by_doc = [attr.strip() for attr in attributes.split(",")] return list_variable_attributes_by_doc def get_list_media_id_for_product(self, product): #list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")] list_media_by_doc = [] list_media_by_doc.append(product['Image1']) list_media_by_doc.append(product['Image2']) list_media_by_doc.append(product['Image3']) list_media_by_doc.append(product['Pyramide']) list_media_by_doc.append(product['Clp']) return list_media_by_doc def is_variable(self, name, value): if name == "Volume" or name =="Choix parfums": if value is not None: return True else: return False def update_product_attribute(self, attributes, product_data): print(f"product_data = {product_data}") product_id = self.product_manager.find_id_by_slug(product_data['slug']) for name, value in attributes.items(): self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(name, value)) #self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data)) def update_product_variations(self, product_data): product_id = self.product_manager.find_id_by_slug(product_data['slug']) self.variation_manager.create_variations_products(product_id, product_data) def update_product_tab(self, product_data): for product in product_data: self.update_product_tab_by_id(product['id']) def create_product(self, product_data, attributes, tabs, categories, medias, json_data): self.product_manager.create_product(product_data=product_data) self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) self.update_product_attribute(attributes=attributes, product_data=product_data) product_id = self.product_manager.find_id_by_slug(product_data['slug']) #self.update_product_variations(product_data) self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) def create_or_update_product(self, product_data, attributes, tabs, categories, medias, json_data): try: product_id = self.product_manager.find_id_by_slug(product_data['slug']) if product_id: #self.product_manager.update_data_product(attributes=attributes, product_data=product_data) self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) else: self.product_manager.create_product(product_data=product_data) self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data) self.update_product_attribute(attributes=attributes, product_data=product_data) product_id = self.product_manager.find_id_by_slug(product_data['slug']) #self.update_product_variations(product_data) self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) except Exception as e: print(f"Erreur lors de la mise à jour du produit: {e}") logger.exception(f"Erreur lors de la mise à jour du produit: {e}") logger.debug() def get_product_lines(self, search_value=None): if search_value: return self.get_product_line_by_value(search_value) else: return self.get_all_product_lines() def process_file(self, search_value=None): # refresh media cache medias = self.media_manager.get_all_as_slug_dict() self.product_manager.medias = medias # read provided file products_lines = self.get_product_lines(search_value) #pprint.pprint(products_lines) for product_line in products_lines: # standard product data product_data = { 'name' : product_line['Nom'], 'price': product_line['Prix'], 'regular_price': product_line['Prix'], 'stock_quantity': product_line['Stock'], 'manage_stock':True, 'weight':str(product_line['Poids']), 'sku':str(product_line['Numéro de référence']), 'description': product_line['Description'], 'short_description': product_line['Courte Description'], 'slug':product_line['Slug'] } if product_line['Promo'] is not None: product_data['sale_price'] = product_line['Promo'] """if product_line['Type'] == "parfums": product_data['type'] = "variable" if product_line['Volume'] is not None: values_attributes = self.get_list_variable_attributes(product_line['Volume']) attributes['Volume'] = values_attributes else: product_data['type'] = "simple""" attributes = { "Temps de combustion" : product_line['Temps de combustion'], "Type de cire" : product_line['Type de cire'], "Mèche" : product_line['Mèche'], "Fabrication" : product_line['Fabrication'], "Composition" : product_line['Composition'], "Ingrédients et engagements" : product_line['Ingrédients et engagements'], "Parfums" : product_line['Parfums'], "Volume" : product_line["Volume"] } tabs ={ #"Description" : product_line["Description"], "Conseils d'utilisation" : product_line["Conseils d’utilisation"], "Précautions articles" : product_line["Précautions articles"], #"Allergènes" : product_line["Allergènes"] } # ... associated categories categories = self.get_list_category_for_product(product_line['Catégories']) # ... associated medias medias = self.get_list_media_id_for_product(product_line['Media Slugs']) # create or update product self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias) def get_product_data_as_dict(self, product_line): product_data = { 'name' : product_line['Nom'], 'price': product_line['Prix'], 'regular_price': product_line['Prix'], 'stock_quantity': product_line['Stock'], 'manage_stock':True, 'weight':str(product_line['Poids']), 'sku':str(product_line['Numéro de référence']), 'description': product_line['Description'], 'short_description': product_line['Courte Description'], 'slug':product_line['Slug'] } if product_line['Promo'] is not None: product_data['sale_price'] = product_line['Promo'] if product_line['Type'] is not None: product_data['type'] = "variable" if product_line['Fabrication sur commande'] is not None: #backoreded = [b.strip() for b in product_line['Fabrication sur commande'].split(",")] product_data['backorders'] = "notify" product_data['backorders_allowed'] = True product_data['backordered'] = True if product_line['Status du produit']: if product_line['Status du produit'] == 'P': product_data['status'] = 'publish' logger.info(f"Article publié {product_line['Nom']}") elif product_line['Status du produit'] == 'B': product_data['status'] = 'draft' logger.info(f"Article brouillon {product_line['Nom']}") else: logger.debug(f"Article ignoré {product_line['Nom']}") return None return product_data def get_product_attributes_as_dict(self, product_line): attributes = { "Temps de combustion" : product_line['Temps de combustion'], "Type de cire" : product_line['Type de cire'], "Mèche" : product_line['Mèche'], "Fabrication" : product_line['Fabrication'], "Composition" : product_line['Composition'], "Ingrédients et engagements" : product_line['Ingrédients et engagements'], #"Parfums" : product_line['Choix parfums'] #"Volume" : product_line["Volume"] } if product_line["Volume"]: attributes["volume"] = product_line["Volume"] if product_line['Type de cire']: attributes['Type de cire'] = product_line['Type de cire'] if product_line['Mèche']: attributes['Mèche'] = product_line['Mèche'] if product_line['Parfums']: attributes["Parfums"] = product_line['Parfums'] return attributes def get_product_tabs_as_dict(self, product_line): tabs ={ #"Description" : product_line["Description"], "Conseils d'utilisation" : product_line["Conseils d’utilisation"], "Précautions articles" : product_line["Précautions articles"], #"Allergènes" : product_line["Allergènes"] } return tabs # --------------------------------- Medias ----------------------------------------------- def has_product_all_medias(self, product_id, medias): print(f"medias={medias}") list_medias_by_api = self.product_manager.get_all_medias_as_list(product_id) names_in_list_medias_by_api = [media['name'] for media in list_medias_by_api] if set(medias) != set(names_in_list_medias_by_api): return False else: return True def assign_medias_to_product(self, id, media_slugs): list_medias_id = [] for slug in media_slugs: list_medias_id.append(self.media_manager.find_id_by_slug(slug)) logger.info(f"liste des medias id = {list_medias_id}") self.product_manager.update_data_list_medias_product(id, list_medias_id) # --------------------------------- Category ---------------------------------------------- def has_product_all_categories(self, product_id, categories): list_categories_by_api = self.product_manager.get_all_categories_as_list(product_id) names_in_list_categories_by_api = [category['name'] for category in list_categories_by_api] if set(categories) != set(names_in_list_categories_by_api): return False else: return True def assign_categories_to_product(self, id, categorie_slugs): list_categories_id = [] for slug in categorie_slugs: list_categories_id.append(self.category_manager.find_id_by_slug(slug)) self.product_manager.update_data_list_category_product(id, list_categories_id) # --------------------------------- Attributes ---------------------------------------------- def is_exist_attribute_in_one_product(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response: products_info = response.json() if products_info['attributes']: return True else: return False else: return False def has_product_all_attributes(self, product_id, attributes): #pprint.pprint(self.product_manager.get_all_attributes_as_list(product_id)) pprint.pprint(attributes) list_attributes_by_api = self.product_manager.get_all_attributes_as_list(product_id) pprint.pprint(list_attributes_by_api) """if list_attributes_by_api == []: return True # Comparaison directe not_found_same_value = False for i, d in enumerate(list_attributes_by_api): for key, value in attributes.items(): for k, v in d.items(): if k == 'options': if value in d[k]: not_found_same_value = False print(f"v = {v}, value = {value} d[k] = {d[k]}") print("trouvé") else: print(f"v = {v}, value = {value} d[k] = {d[k]}") print("pas trouvé") #return True""" """common_keys = list_attributes_by_api.keys() & attributes.keys() found_same_value = False for key in common_keys: pprint.pprint(common_keys) if list_attributes_by_api[key] != attributes[key]: found_same_value = True break else: pass if found_same_value == True: return True else: return False""" def assign_attributes_to_product(self, product_id, attributes): for attr in attributes: for key, value in attributes.items(): #logger.debug(f"key = {key} - value = {value} - attr = {attr}") if attr == key : self.attribute_manager.create_for_product(product_id=product_id, name=key, value=value, variation=self.is_variable(key, value)) # --------------------------------- Product ---------------------------------------------- def has_product_all_datas(self, product_id, product_data): list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id) common_keys = list_product_data_by_api.keys() & product_data.keys() # Étape 2 : comparaison des valeurs found_same_value = False for key in common_keys: if list_product_data_by_api[key] != product_data[key]: found_same_value = True break else: pass if found_same_value == True: return True else: return False def assign_product_changement(self, product_id, product_data): list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id) common_keys = list_product_data_by_api.keys() & product_data.keys() # Étape 2 : comparaison des valeurs differences = {} for key in common_keys: if list_product_data_by_api[key] != product_data[key]: differences[key] = product_data[key] self.wcapi.put(f"products/{product_id}", differences) # -------------------------------------------- Tabs ------------------------------------------------------ #def missing_tabs_for_product(self, product_id, tabs): # --------------------------------------------------------------------------------------------------------- def assign_tabs_to_product(self, product_id, tabs): self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) def process_file_from_to(self, range_start, range_end=None): medias = self.media_manager.get_all_as_slug_dict() self.product_manager.medias = medias #logger.info(f"self.fetch_all_product_rows(range_start, range_end): = {self.fetch_all_product_rows(range_start, range_end)}") for product_line in self.fetch_all_product_rows(range_start, range_end): # define expected settings expected_product_data = self.get_product_data_as_dict(product_line) expected_categories = self.get_list_category_for_product(product_line['Catégories']) expected_medias = self.get_list_media_id_for_product(product_line) expected_attributes = self.get_product_attributes_as_dict(product_line) expected_tabs = self.get_product_tabs_as_dict(product_line) # step1: make sure product exists product_id = self.product_manager.find_id_by_slug(product_line['Slug']) if product_id is None: #product_id = self.create_product(product_data=expected_product_data, attributes=expected_attributes, tabs=expected_tabs, categories=expected_categories, medias=expected_medias, json_data=product_line) product_id = self.product_manager.create_product(product_data=expected_product_data) logger.info('produit créé') # step2: make sure categories are assigned to product if not self.has_product_all_categories(product_id=product_id, categories=expected_categories): self.assign_categories_to_product(id=product_id, categorie_slugs=expected_categories) logger.info('catégorie créée') # step3: make sure attributes are assigned to product #print(f"attr = {self.product_manager.is_exist_attribute_in_one_product(product_id)}") # if self.product_manager.is_exist_attribute_in_one_product(product_id): if self.has_product_all_attributes(product_id=product_id, attributes=expected_attributes): #self.assign_attributes_to_product(product_id=product_id, attributes=expected_attributes) logger.info('attributs créés') # step4: make sure tabs are assigned to product if not self.product_manager.is_exist_tabs_in_one_product(product_id): self.assign_tabs_to_product(product_id=product_id, tabs=expected_tabs) logger.info('onglets crées') # step5: make sure media are assigned to product if not self.has_product_all_medias(product_id=product_id, medias=expected_medias): self.assign_medias_to_product(id=product_id, media_slugs=expected_medias) logger.info('medias crées') if not self.has_product_all_datas(product_id, expected_product_data): logger.info('ici') def delete_all_informations(self): self.media_manager.delete_all_images() self.attribute_manager.delete_all() self.product_manager.delete_all_product() self.category_manager.delete_all_category() def delete_information_by_slug(self): self.product_manager.delete_product_by_slug("chope-adoucissant") #category_manager.delete_all_category() class OrderManager: def __init__(self, wcapi, ath): super().__init__() self.wcapi = wcapi self.ath = ath self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0" } def delete_all_orders(self): response = self.wcapi.get("orders/",params={"per_page": 100}) print(f"response = {response.status_code}") if response.status_code == 200: orders = response.json() for index, order in enumerate(orders): self.wcapi.delete(f"orders/{order['id']}", params={"force": True}).json() class SeoManager(OdsReader): def __init__(self, ath, filename_ods):# filename_ods super().__init__(filename_ods) # filename_ods self.ath = ath self.page_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/pages" def get_all_pages(self): print("coucou") """Récupère toutes les images en gérant la pagination""" all_pages = [] dict_id_slug = {} response = self.wcapi.get(f"seo", params={"per_page": 100}) if response.status_code != 200: pass list_pages = response.json() if not list_pages: pass for index, page in enumerate(list_pages): dict_id_slug[list_pages[index]['id']] = list_pages[index]['slug'] all_pages.append(dict_id_slug) dict_id_slug = {} return all_pages def update_seo_page(self): all_pages = self.get_all_pages() pprint.pprint(all_pages) seo_lines = self.get_all_seo_lines() #pprint.pprint(seo_lines) for page_id_slug in all_pages: for key_page, slug_page in page_id_slug.items(): print(f"key_page = {key_page}") for line in seo_lines: #dict_seo = {} if line['Slug'] == slug_page: data = { "meta": { "og_title": line["Titre"], "og_description": line["Description"], #"_yoast_wpseo_opengraph-title": line["Titre"], #"_yoast_wpseo_opengraph-description": line["Description"] } } response = self.wcapi.post(f"seo/{key_page}") """"meta": { "_yoast_wpseo_title": line["Titre"], "_yoast_wpseo_metadesc": line["Description"], "_yoast_wpseo_opengraph-title": line["Titre"], "_yoast_wpseo_opengraph-description": line["Description"] }""" """dict_seo['yoast_head_json']['description'] = line['Description'] dict_seo['yoast_head_json']['og_description'] = line['Description'] dict_seo['yoast_head_json']['og_title'] = line['Titre'] response = requests.post( f"{self.page_api_url}/{page['id']}", headers={ "Authorization": f"Basic {self.ath.auth_base64}", #"Authorization": f"Basic {self.ath['auth_base64']}", #"Content-Disposition": f"attachment; filename={image_name}" }, json=dict_seo, verify=False )""" #page['yoast_head_json']['description'] #page['yoast_head_json']['og_description'] #page['yoast_head_json']['og_title'] #ALL_TABS = ["Allergènes", "Conseils d’utilisation", "Description", "Précautions articles"] #ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"] if __name__ == "__main__": media_manager = MediaManager(ath=ath, filename_ods=FILENAME_ODS) """ utilisation module argparse # on va appeler ça importation d'un fichier ods, d'où l'action import-ods # on va appeler cette commande, "la commande de base" wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key= --wc-secret= import-ods --ods-path=fichier.ods # traitement de l'intégralité d'un fichier ods ... --all # traitement des medias seulement, on peut en option spécifier une plage de média à importer ... --medias [--media-range=1:40] plu tard ... # traitement des catégories seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de la catégorie ... --categories [--categories-regex=] ex: traiter uniquement les catégories dont le nom contient le terme "bougie" ... --categories [--categories-regex=.*bougie.*] # traitement des articles seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de l'article' # ... --products [--products-regex=] ex: traiter uniquement les articles dont le nom contient le terme "bougie" ... --categories [--products-regex=.*bougie.*] """ #parser = argparse.ArgumentParser(description="Script de traitement WooCommerce") #wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key= --wc-secret= import-ods --ods-path=fichier.ods