From 8760966e4c85148dc28a30d1aeab124c1c32ea9b Mon Sep 17 00:00:00 2001 From: Bene Date: Mon, 7 Apr 2025 12:33:25 +0200 Subject: [PATCH] add initial script api woocommerce --- final_api_woocommerce/api_woocommerce.py | 1150 ++++++++++++++++++++++ 1 file changed, 1150 insertions(+) create mode 100644 final_api_woocommerce/api_woocommerce.py diff --git a/final_api_woocommerce/api_woocommerce.py b/final_api_woocommerce/api_woocommerce.py new file mode 100644 index 0000000..ebbc99d --- /dev/null +++ b/final_api_woocommerce/api_woocommerce.py @@ -0,0 +1,1150 @@ +from woocommerce import API as WoocommerceApi +from pathlib import Path +import pandas as pd +import ezodf +import requests +import pprint +import base64 +import time +import json +import pyexcel_ods3 +import unicodedata +import logging +import os +import time + +logger = logging.getLogger(__name__) + +# 1️⃣ Configurer le logger +logging.basicConfig( + filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés + level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log + datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date +) + +# via consumer key and consumer secret : +# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768 + +wcapi = WoocommerceApi( + url="https://lescreationsdemissbleue.local", + consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e", + consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768", + wp_api=True, + version="wc/v3", + verify_ssl=False, # Désactive la vérification SSL pour le développement + timeout=30 +) + +class AuthentificationWpApi: + # Identifiants WordPress (et non WooCommerce) + wordpress_username = "admin_lcdm" # Remplace par ton username WordPress + wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs + + # Générer l'authentification Basic en base64 + auth_str = f"{wordpress_username}:{wordpress_application_password}" + auth_bytes = auth_str.encode("utf-8") + auth_base64 = base64.b64encode(auth_bytes).decode("utf-8") + +ath = AuthentificationWpApi() + +WEBSITE_URL = "https://lescreationsdemissbleue.local" +FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods" +BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" +#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" + +class OdsReader: + def __init__(self, filename_ods=FILENAME_ODS): + self.filename_ods = filename_ods + + def get_all_product_lines(self): + return self.get_doc_ods(2) + + def get_product_by_slug_from_ods(self, slug): + for product in self.get_all_product_lines(): + if product['Slug'] == slug: return product + return None + + def get_all_media_lines(self): + return self.get_doc_ods(0) + + def get_all_attribute_and_tab_lines(self): + return self.get_doc_ods(3) + + def get_all_category_lines(self): + return self.get_doc_ods(1) + + def get_doc_ods(self, number_sheet): + doc = ezodf.opendoc(self.filename_ods) + sheet = doc.sheets[number_sheet] + data = [] + for row in sheet.rows(): + data.append([cell.value for cell in row]) + + df = pd.DataFrame(data) + df.columns = df.iloc[0] + df = df[1:].reset_index(drop=True) + df = df.dropna(how='all') + json_data = df.to_dict(orient="records") + return json_data + +class MediaManager(OdsReader): + + def __init__(self, ath): + super().__init__() + self.ath = ath + self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" + self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings" + + def upload_media(self): + json_data = self.get_all_media_lines() + for media in json_data: + path = Path(BASE_PATH + media['Chemin']) + image_name = path.name + if not self.is_exists(media, image_name): + with open(BASE_PATH + media['Chemin'], "rb") as image_file: + response = requests.post( + self.media_api_url, + headers={ + "Authorization": f"Basic {self.ath.auth_base64}", + "Content-Disposition": f"attachment; filename={image_name}" + }, + files={"file": image_file}, + verify=False + ) + if response.status_code == 201: + media_data = response.json() + self.update_data_media(media, media_data['id']) + else: + return None + else: + pass + + def is_exists(self, media, image_name): + all_images = self.get_all_images() + name_without_extension, extension = os.path.splitext(image_name) + for image in all_images: + if media['Slug'] == image['slug']: + return True + else: + pass + return False + + + def update_data_media(self, media, id_img): + update_data = { + "title" : media['Nom'], + "alt_text": media['Description'], + "slug": media['Slug'], + } + path = Path(BASE_PATH + media['Chemin']) + image_name = path.name + response = requests.post( + f"{self.media_api_url}/{id_img}", + headers={ + "Authorization": f"Basic {self.ath.auth_base64}", + "Content-Disposition": f"attachment; filename={image_name}" + }, + json=update_data, + verify=False + ) + + if response.status_code == 200: + return response.json() + else: + return None + + def find_id_by_slug(self, slug): + images = self.get_all_images() + for img in images: + if img['slug'] == slug: + return img['id'] + + def get_all_as_slug_dict(self): + all_slug_dict = {} + images = self.get_all_images() + for img in images: + all_slug_dict[img['id']] = img['slug'] + return all_slug_dict + + def delete_media_by_slug(self, slug): + images = self.get_all_images() + for img in images: + if img['slug'] == slug: + delete_url = f"{self.media_api_url}/{img['id']}?force=true" + response = requests.delete(delete_url, + headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + verify=False) + + def get_all_images(self): + """Récupère toutes les images en gérant la pagination""" + all_images = [] + page = 1 + while True: + response = requests.get(f"{self.media_api_url}?per_page=100&page={page}", + headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + verify=False + ) + if response.status_code != 200: + break + + images = response.json() + if not images: + break + + all_images.extend(images) + page += 1 + + return all_images + + def delete_images(self, images): + """Supprime toutes les images récupérées""" + for img in images: + img_id = img['id'] + delete_url = f"{self.media_api_url}/{img_id}?force=true" + + response = requests.delete(delete_url, + headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + verify=False) + if response.status_code in [200, 410]: # 410 = déjà supprimé + print(f"Image {img_id} supprimée.") + else: + print(f"Erreur suppression {img_id} :", response.status_code, response.text) + + def delete_all_images(self): + images = self.get_all_images() + for img in images: + img_id = img['id'] + delete_url = f"{self.media_api_url}/{img_id}?force=true" + + response = requests.delete(delete_url, + headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + verify=False) + if response.status_code in [200, 410]: # 410 = déjà supprimé + print(f"Image {img_id} supprimée.") + else: + print(f"Erreur suppression {img_id} :", response.status_code, response.text) + + def assign_image_logo(self): + images = self.get_all_images() + for img in images: + if img['slug'] == "logo-lescreationsdemissbleue": + data = { + "site_logo":img['id'], + "site_icon" : img['id'] + } + response = requests.post( + self.media_api_settings, + json=data, + headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + verify=False + ) + + if response.status_code == 200: + print("Logo mis à jour avec succès !") + else: + print(f"Erreur lors de la mise à jour du logo : {response.text}") + + +class CategoryManager(OdsReader): + + def __init__(self, wcapi, ath, medias=None): + super().__init__() + self.wcapi = wcapi + self.ath = ath + self.medias = medias + self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" + self.error_log = [] + self.headers = { + "Authorization": f"Basic {self.ath.auth_base64}", + "Content-Type": "application/json" + } + + def find_id_by_slug(self, slug): + response = self.wcapi.get("products/categories/",params={"per_page": 100}) + if response.status_code == 200: + categories = response.json() + for cat in categories: + if cat['slug'] == slug: + return cat['id'] + + def create_category(self, name, description, slug): + category_data = { + "name": name, + "description": description, + "slug":slug + } + if self.find_id_by_slug(slug): + self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà") + else: + self.wcapi.post("products/categories/", category_data) + + def assign_parent_category(self, parent_slug, slug): + response = self.wcapi.get("products/categories/",params={"per_page": 100}) + if response.status_code == 200: + categories = response.json() + for cat in categories: + parent_id = self.find_id_by_parent_slug(parent_slug) + if parent_id: + if cat['slug'] == slug: + self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id}) + + def find_id_by_parent_slug(self, parent_slug): + response = self.wcapi.get("products/categories/",params={"per_page": 100}) + if response.status_code == 200: + categories = response.json() + for cat in categories: + if cat['slug'] == parent_slug: + return cat['id'] + + def find_media_id_by_slug(self, media_slug): + for id, slug in self.medias.items(): + if media_slug == slug: + return id + + def update_media_id_for_category(self, media_id, cat_id): + response = requests.get(f"{self.media_api_url}/{media_id}", + headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + verify=False + ) + update_category_data = { + "image" : {'id':media_id}, + } + self.wcapi.put(f"products/categories/{cat_id}", update_category_data) + + def update_data_categories(self): + json_data = self.get_all_category_lines() + for category in json_data: + self.create_category(category['Nom'], category['Description'], category['Slug']) + cat_id = self.find_id_by_slug(category['Slug']) + media_id = self.find_media_id_by_slug(category['Media Slug']) + self.assign_parent_category(category['Parent Slug'], category['Slug']) + self.update_media_id_for_category(media_id,cat_id) + + def delete_all_category(self): + response = self.wcapi.get(f"products/categories",params={"per_page": 100}) + for cat in response.json(): + self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True}) + + def delete_media_category(self, media_slug): + media_id = self.find_media_id_by_slug(media_slug) + requests.delete( + f"{self.media_api_url}/{media_id['id']}", + headers=self.headers, + verify=False + ) + + def delete_category_by_id(self, category_id): + self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) + + def delete_category_by_slug(self, slug): + category_id = self.find_id_by_slug(slug) + #print(f"category_id = {category_id}") + self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) + + def get_errors(self): + return print(f"self.error_log = {self.error_log}") + +class ProductManager(OdsReader): + def __init__(self, wcapi, ath, medias=None): + super().__init__() + self.wcapi = wcapi + self.ath = ath + self.medias = medias + self.error_log = [] + self.headers = { + "Authorization": f"Basic {self.ath.auth_base64}", + "Content-Type": "application/json" + } + self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" + + def update_data_list_cat_product(self, list_category_id, list_img_id, product_id): + product_data = { + 'categories':list_category_id, + 'images':list_img_id, + } + self.wcapi.put(f"products/{product_id}", product_data) + + def get_list_media_id_for_product(self, medias): + list_media_id_for_product = [] + for id, media_slug in self.medias.items(): + for media in medias: + if media == media_slug: + image_id = {'id':id} + list_media_id_for_product.append(image_id) + return list_media_id_for_product[::-1] + + def get_list_category_for_product(self, categories): + response = self.wcapi.get("products/categories",params={"per_page": 100}) + list_category_for_product = [] + for category in response.json(): + for cat in categories: + if category['name'] == cat: + id_category = {'id':category['id']} + list_category_for_product.append(id_category) + return list_category_for_product + + def find_product_by_id(self, id): + response = self.wcapi.get(f"products/{id}") + if response.status_code == 200: + product = response.json() + return product + + def find_id_by_slug(self, slug): + response = self.wcapi.get("products/",params={"per_page": 100}) + if response.status_code == 200: + products = response.json() + for pro in products: + if pro['slug'] == slug: + return pro['id'] + + def find_media_id_by_slug(self, media_slug): + for id, slug in self.medias.items(): + if media_slug == slug: + return id + + def create_tabs_from_custom_dict(self, product_id, product): + product_tabs_data = {} + list_product_tabs_data = [] + x = 1 + for key in product.keys(): + if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes": + product_tabs_data['title'] = key + product_tabs_data['content'] = product[key] + product_tabs_data['nickname'] = '' + product_tabs_data['position'] = x + product_tabs_data['tab_type'] = 'local' + list_product_tabs_data.append(product_tabs_data) + product_tabs_data = {} + x += 1 + + response = self.wcapi.get(f"products/{product_id}") + if response.status_code == 200: + meta_data = [] + meta_data.append( + {'key': 'wb_custom_tabs', 'value': list_product_tabs_data} + ) + meta_data_data = { + 'meta_data': meta_data + } + res = self.wcapi.post(f"products/{product_id}", meta_data_data) + else: + print(f"error") + + def create_product(self, product_data): + if self.find_id_by_slug(product_data['slug']): + self.error_log.append(f"Produit contenant comme slug '{product_data['slug']}' existe déjà") + else: + response = self.wcapi.post("products/", product_data) + + def update_data_product(self, product_data, categories, medias): + json_data = self.get_all_product_lines() + for product in json_data: + self.create_product(product_data) + product_id = self.find_id_by_slug(product_data['slug']) + list_category_id = self.get_list_category_for_product(categories) + list_img_id = self.get_list_media_id_for_product(medias) + self.update_data_list_cat_product(list_category_id, list_img_id, product_id) + + def update_data_product_by_slug(self, slug): + json_data = self.get_all_product_lines() + for product in json_data: + if product['Slug'] == slug: + self.create_product(product) + product_id = self.find_id_by_slug(product['Slug']) + list_category_id = self.get_list_category_for_product(product['Catégories']) + list_img_id = self.get_list_media_id_for_product(product['Media Slugs']) + self.update_data_list_cat_product(list_category_id, list_img_id, product_id) + + def get_all_products(self): + """Récupère tous les produits en gérant la pagination""" + all_products = [] + page = 1 + + while True: + response = self.wcapi.get("products", params={"per_page": 100, "page": page}) + + if response.status_code != 200: + print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}") + break + + products = response.json() + if not products: # Si la page est vide, on arrête la boucle + break + + all_products.extend(products) + page += 1 # On passe à la page suivante + + return all_products + + + def delete_product(self): + json_data = self.get_all_product_lines() + for product in json_data: + list_products = self.wcapi.get(f"products/") + for pro in list_products.json(): + if product['Nom'] == pro['name']: + self.wcapi.delete(f"products/{pro['id']}") + + def delete_all_product(self): + products = self.get_all_products() + if products: + for pro in products: + self.wcapi.delete(f"products/{pro['id']}", params={"force": True}) + + def delete_media_product(self, media_slug): + media_id = self.find_media_id_by_slug(media_slug) + requests.delete( + f"{self.media_api_url}/{media_id['id']}", + headers=self.headers, + verify=False + ) + + def delete_product_by_id(self, product_id): + self.wcapi.delete(f"products/{product_id}", params={"force": True}) + + def delete_product_by_slug(self, slug): + product_id = self.find_id_by_slug(slug) + self.wcapi.delete(f"products/{product_id}", params={"force": True}) + + + def normalize_string(text): + return unicodedata.normalize("NFKC", text).strip().lower() + + def tab_exists(self, product_id, name_tab): + response = self.wcapi.get(f"products/{product_id}") + if response.status_code == 200: + response_json = self.wcapi.get(f"products/{product_id}").json() + for meta_data in response_json['meta_data']: + for key_meta_data, value_meta_data in meta_data.items(): + if key_meta_data == "value": + if isinstance(value_meta_data, list): + for tab in value_meta_data: + if name_tab == tab['title']: + return True + return False + +class AttributeManager(OdsReader): + + def __init__(self, wcapi): + super().__init__() + self.wcapi = wcapi + + def get_attributes(self): + attributes = self.wcapi.get(f"products/attributes").json() + one_attribute = self.wcapi.get(f"products/attributes/1/terms").json() + return attributes + + def get_by_name(self, name): + attributes = self.wcapi.get(f"products/attributes").json() + for attr in attributes: + if attr['name'] == name: + attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json() + return attribute + + def get_list_name_data(self): + list_name_data = [] + json_data = self.get_all_attribute_and_tab_lines() + for item in json_data: + if item['Onglet'].strip() == "Informations Complémentaires": + list_name_data.append(item['Nom']) + return list_name_data + + def create(self): + features_json_data = self.get_all_attribute_and_tab_lines() + for item in features_json_data: + if item['Onglet'].strip() == "Informations Complémentaires": + attribute_data = { + 'name' : item["Nom"] + } + self.wcapi.post(f"products/attributes", attribute_data) + + def get_term(self): + term_dict = {} + list_item = [] + term_json_data = self.get_all_attribute_and_tab_lines() + for item in term_json_data: + if item['Onglet'].strip() == "Informations Complémentaires": + if "," in item["Valeurs"]: + list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")] + else: + item['Valeurs'].strip() + if list_item: + term_dict[item['Nom']] = list_item + else: + term_dict[item['Nom']] = item['Valeurs'] + return term_dict + + def configure_term(self): + term_dict = self.get_term() + response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) + if response.status_code == 200: + attributes = response.json() + for attribute in attributes: + for name, value in term_dict.items(): + if attribute['name'] == name: + if isinstance(value, list): + for v in value: + term = { + 'name' : v + } + self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) + else: + term = { + 'name' : value + } + self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) + + + def create_for_product(self, product_id, name, value, variation=False): + data_attribute = { + 'name': name, + 'options':value + } + #list_product_tabs_data.append(data_tab) + response = self.wcapi.get(f"products/{product_id}") + if response.status_code == 200: + product_meta_data = response.json() + existing_attributes_data = product_meta_data.get("attributes", []) + already_exist = False + for data in existing_attributes_data: + for key_data, value_data in data.items(): + if key_data == "value": + if isinstance(value_data, list): + for value in value_data: + if value['name'] == name: + already_exist = True + + if already_exist == False: + found = False + for attribute in existing_attributes_data: + if attribute["name"] == name: + attribute["options"].append(data_attribute) + found = True + break + + # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée + if not found: + existing_attributes_data.append({ + "name": name, + "options": [value], + "visible":True, + "variation": variation, + #"parent_id":product_id + }) + attributes_data = { + 'attributes': existing_attributes_data + } + res = self.wcapi.put(f"products/{product_id}", attributes_data) + else: + print('already_exist') + else: + print(f"error") + + def delete_all_for_product(self): + response_product = self.wcapi.get(f"products/", params={"per_page": 100}) + if response_product.status_code == 200: + products = response_product.json() + for product in products: + existing_attributes_data = product.get("attributes", []) + if existing_attributes_data == []: + pass + else: + attribute_data = { + 'attributes': [] + } + res = self.wcapi.post(f"products/{product['id']}", attribute_data) + + + def delete_all_term(self): + response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100}) + if response_attribute.status_code == 200: + attributes = response_attribute.json() + for attribute in attributes: + response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100}) + if response_attribute_term.status_code == 200: + attributes_term = response_attribute_term.json() + for term in attributes_term: + self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True}) + + def delete_all(self): + response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) + if response.status_code == 200: + attributes = response.json() + for attribute in attributes: + self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True}) + + +class TabManager(OdsReader): + + def __init__(self, wcapi): + super().__init__() + self.wcapi = wcapi + + def get_list_name_data(self): + list_name_data = [] + json_data = self.get_all_attribute_and_tab_lines() + for item in json_data: + if item['Onglet'].strip() != "Informations Complémentaires": + list_name_data.append(item['Nom']) + return list_name_data + + def create_or_update_for_product(self, product_id, tabs): + position = 1 + for title, content in tabs.items(): + position += 1 + data_tab = { + 'title': title, + 'content':content, + 'nickname':'', + 'position':position, + 'tab_type': 'local' + } + response = self.wcapi.get(f"products/{product_id}") + if response.status_code == 200: + product_meta_data = response.json() + existing_meta_data = product_meta_data.get("meta_data", []) + already_exist = False + for data in existing_meta_data: + for key_data, value_data in data.items(): + if key_data == "value": + if isinstance(value_data, list): + for value in value_data: + if value['title'] == title: + already_exist = True + if already_exist == False: + found = False + for meta in existing_meta_data: + if meta["key"] == "wb_custom_tabs": + meta["value"].append(data_tab) + found = True + break + + # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée + if not found: + existing_meta_data.append({ + "key": "wb_custom_tabs", + "value": [data_tab] + }) + meta_data_data = { + 'meta_data': existing_meta_data + } + res = self.wcapi.put(f"products/{product_id}", meta_data_data) + else: + print('else') + data_tab = { + 'content':content, + } + meta_data_data = { + 'meta_data': existing_meta_data + } + res = self.wcapi.put(f"products/{product_id}", meta_data_data) + else: + print(f"error") + + def delete_by_product_id(self, product_id): + response = self.wcapi.get(f"products/{product_id}") + if response.status_code == 200: + product_meta_data = response.json() + existing_meta_data = product_meta_data.get("meta_data", []) + if existing_meta_data == []: + pass + else: + meta_data = { + 'meta_data': [{"key": "wb_custom_tabs","value":[]}] + } + res = self.wcapi.post(f"products/{product_id}", meta_data) + + def delete_all(self): + response = self.wcapi.get(f"products/", params={"per_page": 100}) + if response.status_code == 200: + product_meta_data = response.json() + for product in product_meta_data: + existing_meta_data = product.get("meta_data", []) + if existing_meta_data == []: + pass + else: + meta_data = { + 'meta_data': [{"key": "wb_custom_tabs","value":[]}] + } + res = self.wcapi.post(f"products/{product['id']}", meta_data) + +class VariationsManager(OdsReader): + + def __init__(self, wcapi): + super().__init__() + self.wcapi = wcapi + + def get_attribute_id(self, product_data): + response = self.wcapi.get(f"products/attributes") + if response.status_code == 200: + attributes = response.json() + for key, value in product_data.items(): + for attr_key, attr_value in attributes.items(): + if attr_value['name'] == key: + attribute_id = attr_value['id'] + return attribute_id + + def update_product_attributes_merged(self, wcapi, product_id, attribute_name, new_options): + """ + Met à jour l'attribut d'un produit WooCommerce en ajoutant de nouvelles options, + sans écraser les autres attributs existants. + + :param wcapi: Instance API WooCommerce (wcapi = API(...)) + :param product_id: ID du produit à mettre à jour + :param attribute_name: Nom de l'attribut à enrichir (ex: "Parfums") + :param new_options: Liste des nouvelles valeurs à ajouter (ex: ["Lavande", "Citron"]) + """ + # Nettoyer les nouvelles options + new_options = [opt.strip() for opt in new_options.split('|') if opt.strip()] + # 1. Récupérer le produit existant + response = wcapi.get(f"products/{product_id}") + if response.status_code != 200: + print(f"❌ Impossible de récupérer le produit {product_id}") + return + + product = response.json() + attributes = product.get("attributes", []) + + # 2. Chercher l'attribut ciblé + found = False + for attr in attributes: + if attr["name"].lower() == attribute_name.lower(): + existing_options = attr.get("options", []) + merged_options = list(set(existing_options + new_options)) + attr["options"] = merged_options + attr["variation"] = True + attr["visible"] = True + attr["parent_id"] = product_id + attr["manage_stock"] = "parent" + found = True + break + + # 3. Si l'attribut n'existe pas, on l'ajoute + if not found: + attributes.append({ + "name": attribute_name, + "variation": True, + "visible": True, + "options": new_options + }) + + # 4. Mettre à jour le produit avec les attributs fusionnés + update_data = { + "attributes": attributes + } + + update_res = wcapi.put(f"products/{product_id}", update_data) + if update_res.status_code == 200: + print(f"✅ Attribut '{attribute_name}' mis à jour avec succès.") + else: + print(f"❌ Erreur lors de la mise à jour : {update_res.status_code}") + print(update_res.json()) + + def create_variations_products(self, product_id, product_data): + #products_lines = self.get_all_product_lines() + product_line = self.get_product_by_slug_from_ods(product_data['slug']) + for product_line_key, products_line_value in product_line.items(): + if product_line_key == "Parfums": + name_attribute = product_line_key + parfums = products_line_value + if product_line_key == "Type": + if product_data['type'] == "variable": + response = self.wcapi.get(f"products/{product_id}") + if response.status_code == 200: + existing_product = response.json() + self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums) + + parfums = [p.strip() for p in parfums.split("|") if p.strip()] + + response = self.wcapi.get(f"products/{product_id}/variations") + + if response.status_code == 200: + for parfum in parfums: + data = { + 'attributes': [ + { + 'name': name_attribute, + 'option': parfum + } + ], + 'manage_stock': False, + 'in_stock':True, + 'regular_price': product_data['price'], + } + print(f"Posting variation: {data}") + result = self.wcapi.post(f"products/{product_id}/variations", data) + print(result.status_code) + pprint.pprint(result.json()) + else: + return False + +class WooCommerceManager(OdsReader): + def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager): + super().__init__() + self.wcapi = wcapi + self.media_manager = media_manager + self.category_manager = category_manager + self.product_manager = product_manager + self.tab_manager = tab_manager + self.attribute_manager = attribute_manager + self.variation_manager = variation_manager + + def tab_exists(self, product_id, name_tab): + return self.product_manager.tab_exists(product_id, name_tab) + + def get_product_tab_details(self): + all_products_json = self.get_all_attribute_and_tab_lines() + all_tabs = self.tab_manager.get_list_name_data() + dict = {} + for product in all_products_json: + line = [] + for tab in all_tabs: + line.append([tab, product[tab]]) + dict[product["Parfum"]] = line + return dict + + def get_product_attributes_details(self): + ret = [] + all_products_json = self.get_all_product_lines() + all_attributes = self.attribute_manager.get_list_name_data() + for product in all_products_json: + for attribute in all_attributes: + ret.append([attribute, product[attribute]]) + return ret + + def update_product_tab_by_slug(self, slug): + product_id = self.product_manager.find_id_by_slug(slug) + product = self.product_manager.find_product_by_id(product_id) + products_tab_details = self.get_product_tab_details() + x=1 + for value in products_tab_details.values(): + for key in products_tab_details.keys(): + for title, content in value: + if key: + if key in product['short_description']: + tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local") + x=x+1 + else: + pass + else: + print('no key') + x=1 + + def update_product_attribute_by_slug(self, slug): + product_id = self.product_manager.find_id_by_slug(slug) + product_ods = self.get_product_by_slug_from_ods(slug) + products_attribute_details = self.get_product_attributes_details() + for name, value in products_attribute_details: + self.attribute_manager.create_for_product(product_id=product_id, + name=name, value=value, + variation=self.is_variable(product_ods['Type'])) + + def update_product(self): + #self.product_manager.update_data_product() + self.update_product_tab() + #self.update_product_attribute() + + """def update_product_by_slug(self): + self.product_manager.update_data_product() + self.update_product_tab() + self.update_product_attribute()""" + + def update_product_variation(self, product_id, product_data): + pass + + def update_product_by_slug(self, slug): + self.product_manager.update_data_product_by_slug(slug) + self.update_product_tab_by_slug(slug) + #self.update_product_attribute_by_slug(slug) + + def create_all_informations(self): + #medias = self.media_manager.get_all_as_slug_dict() + #self.product_manager.medias = medias + #self.update_product_by_slug("chope-citron-meringue") + #self.media_manager.upload_media() + #self.media_manager.assign_image_logo() + medias = self.media_manager.get_all_as_slug_dict() + self.product_manager.medias = medias + #self.category_manager.medias = medias + #self.category_manager.update_data_categories() + #self.attribute_manager.create() + #self.attribute_manager.configure_term() + self.process_file(FILENAME_ODS) + self.update_product() + + def get_list_category_for_product(self, category): + category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")] + return category_list_by_doc + + def get_list_media_id_for_product(self, media): + list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")] + return list_media_by_doc + + def is_variable(self, type): + return type.lower() == "parfums" + + def update_product_attribute(self, attributes, product_data): + product_id = self.product_manager.find_id_by_slug(product_data['slug']) + for name, value in attributes.items(): + self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data['type'])) + + def update_product_variations(self, product_data): + product_id = self.product_manager.find_id_by_slug(product_data['slug']) + self.variation_manager.create_variations_products(product_id, product_data) + + def update_product_tab(self, product_data): + for product in product_data: + self.update_product_tab_by_id(product['id']) + + + def create_or_update_product(self, product_data, attributes, tabs, categories, medias): + self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias) + self.update_product_attribute(attributes=attributes, product_data=product_data) + product_id = self.product_manager.find_id_by_slug(product_data['slug']) + self.update_product_variations(product_data) + self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) + + + def process_file(self, filename): + # refresh media cache + medias = media_manager.get_all_as_slug_dict() + self.product_manager.medias = medias + # read provided file + reader = OdsReader(filename) + for product_line in reader.get_all_product_lines(): + # standard product data + product_data = { + 'name' : product_line['Nom'], + 'price': product_line['Prix'], + 'regular_price': product_line['Prix'], + 'stock_quantity': product_line['Stock'], + 'manage_stock':True, + 'weight':str(product_line['Poids']), + 'sku':str(product_line['Numéro de référence']), + 'description': product_line['Description'], + 'short_description': product_line['Courte Description'], + 'slug':product_line['Slug'] + } + if product_line['Type'] == "parfums": + product_data['type'] = "variable" + else: + product_data['type'] = "simple" + + attributes = { + "Temps de combustion" : product_line['Temps de combustion'], + "Type de cire" : product_line['Type de cire'], + "Mèche" : product_line['Mèche'], + "Fabrication" : product_line['Fabrication'], + "Composition" : product_line['Composition'], + "Ingrédients et engagements" : product_line['Ingrédients et engagements'], + "Parfums" : product_line['Parfums'] + } + + tabs ={ + #"Description" : product_line["Description"], + "Conseils d'utilisation" : product_line["Conseils d’utilisation"], + "Précautions articles" : product_line["Précautions articles"], + #"Allergènes" : product_line["Allergènes"] + } + # ... associated categories + categories = self.get_list_category_for_product(product_line['Catégories']) + + # ... associated medias + medias = self.get_list_media_id_for_product(product_line['Media Slugs']) + + # create or update product + self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias) + + + """def put_social_data(self): + response = requests.post(url, + auth=HTTPBasicAuth("consumer_key", "consumer_secret"), + json={ + "acf": { + "instagram_url": "https://instagram.com/ton_compte" + } + } + )""" + + def delete_all_informations(self): + self.media_manager.delete_all_images() + self.attribute_manager.delete_all() + self.product_manager.delete_all_product() + self.category_manager.delete_all_category() + + def delete_information_by_slug(self): + product_manager.delete_product_by_slug("chope-adoucissant") + #category_manager.delete_all_category() + +#ALL_TABS = ["Allergènes", "Conseils d’utilisation", "Description", "Précautions articles"] +#ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"] + +media_manager = MediaManager(ath=ath) +#media_manager.upload_media() +#media_manager.delete_all_images() +#media_manager.assign_image_logo() +category_manager = CategoryManager(wcapi=wcapi,ath=ath) +#category_manager.delete_all_category() +product_manager = ProductManager(wcapi=wcapi,ath=ath) +#product_manager.delete_all_product() +#medias=media_manager.get_all_as_slug_dict() +#media_manager.delete_media_by_slug('pyramide-olfactive-frangipanier') +#product_manager.delete_product_by_slug("citron-meringue") +#product_manager.update_data_product() +tab_manager = TabManager(wcapi=wcapi) +attribute_manager = AttributeManager(wcapi=wcapi) +variation_manager = VariationsManager(wcapi=wcapi) +#attribute_manager.create(ALL_ATTRIBUTES) +#attribute_manager.create() +#attribute_manager.configure_term() +#attribute_manager.delete_all_term() +#product_id = product_manager.find_id_by_slug("citron-meringue")""" +woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager, variation_manager=variation_manager) +##woocommerce_manager.delete_all_informations() # +woocommerce_manager.create_all_informations() +##woocommerce_manager.process_file(FILENAME_ODS) +#category_manager.update_data_categories() +#woocommerce_manager.delete_all_informations() +#woocommerce_manager.delete_information_by_slug() +#woocommerce_manager.create_all_informations() +#woocommerce_manager.create_all_categories_and_products() +#woocommerce_manager.update_product_tab() +#woocommerce_manager.tab_manager.delete_by_product_id(1890) +#woocommerce_manager.tab_manager.delete_all() +#woocommerce_manager.update_product() +#woocommerce_manager.attribute_manager.delete_all_for_product() +#woocommerce_manager.update_product_attribute_by_slug('citron-meringue') +#woocommerce_manager.attribute_manager.delete_all_for_product() + +"""tabs_in_product = [] +for tab in ALL_TABS: + tab_in_product = woocommerce_manager.tab_exists(1890, tab) + tabs_in_product.append(tab_in_product)""" + +""" +utilisation +module argparse +# on va appeler ça importation d'un fichier ods, d'où l'action import-ods +# on va appeler cette commande, "la commande de base" +wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key= --wc-secret= import-ods --ods-path=fichier.ods + +# traitement de l'intégralité d'un fichier ods +... --all + +# traitement des medias seulement, on peut en option spécifier une plage de média à importer +... --medias [--media-range=1:40] + +plu tard ... +# traitement des catégories seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de la catégorie +... --categories [--categories-regex=] +ex: traiter uniquement les catégories dont le nom contient le terme "bougie" +... --categories [--categories-regex=.*bougie.*] + +# traitement des articles seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de l'article' +# ... --products [--products-regex=] +ex: traiter uniquement les articles dont le nom contient le terme "bougie" +... --categories [--products-regex=.*bougie.*] + + +""" \ No newline at end of file