From f9ce16baae20e0003ca4adc7132ba93c9dc7fc6b Mon Sep 17 00:00:00 2001 From: Bene Date: Mon, 7 Apr 2025 12:29:21 +0200 Subject: [PATCH] =?UTF-8?q?Remove:=20ancien=20script=20plus=20utilis=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api_woocomerce_04042025.py | 1150 ------------------------------------ 1 file changed, 1150 deletions(-) delete mode 100644 api_woocomerce_04042025.py diff --git a/api_woocomerce_04042025.py b/api_woocomerce_04042025.py deleted file mode 100644 index ebbc99d..0000000 --- a/api_woocomerce_04042025.py +++ /dev/null @@ -1,1150 +0,0 @@ -from woocommerce import API as WoocommerceApi -from pathlib import Path -import pandas as pd -import ezodf -import requests -import pprint -import base64 -import time -import json -import pyexcel_ods3 -import unicodedata -import logging -import os -import time - -logger = logging.getLogger(__name__) - -# 1️⃣ Configurer le logger -logging.basicConfig( - filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés - level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL) - format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log - datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date -) - -# via consumer key and consumer secret : -# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768 - -wcapi = WoocommerceApi( - url="https://lescreationsdemissbleue.local", - consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e", - consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768", - wp_api=True, - version="wc/v3", - verify_ssl=False, # Désactive la vérification SSL pour le développement - timeout=30 -) - -class AuthentificationWpApi: - # Identifiants WordPress (et non WooCommerce) - wordpress_username = "admin_lcdm" # Remplace par ton username WordPress - wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs - - # Générer l'authentification Basic en base64 - auth_str = f"{wordpress_username}:{wordpress_application_password}" - auth_bytes = auth_str.encode("utf-8") - auth_base64 = base64.b64encode(auth_bytes).decode("utf-8") - -ath = AuthentificationWpApi() - -WEBSITE_URL = "https://lescreationsdemissbleue.local" -FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods" -BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" -#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" - -class OdsReader: - def __init__(self, filename_ods=FILENAME_ODS): - self.filename_ods = filename_ods - - def get_all_product_lines(self): - return self.get_doc_ods(2) - - def get_product_by_slug_from_ods(self, slug): - for product in self.get_all_product_lines(): - if product['Slug'] == slug: return product - return None - - def get_all_media_lines(self): - return self.get_doc_ods(0) - - def get_all_attribute_and_tab_lines(self): - return self.get_doc_ods(3) - - def get_all_category_lines(self): - return self.get_doc_ods(1) - - def get_doc_ods(self, number_sheet): - doc = ezodf.opendoc(self.filename_ods) - sheet = doc.sheets[number_sheet] - data = [] - for row in sheet.rows(): - data.append([cell.value for cell in row]) - - df = pd.DataFrame(data) - df.columns = df.iloc[0] - df = df[1:].reset_index(drop=True) - df = df.dropna(how='all') - json_data = df.to_dict(orient="records") - return json_data - -class MediaManager(OdsReader): - - def __init__(self, ath): - super().__init__() - self.ath = ath - self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" - self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings" - - def upload_media(self): - json_data = self.get_all_media_lines() - for media in json_data: - path = Path(BASE_PATH + media['Chemin']) - image_name = path.name - if not self.is_exists(media, image_name): - with open(BASE_PATH + media['Chemin'], "rb") as image_file: - response = requests.post( - self.media_api_url, - headers={ - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Disposition": f"attachment; filename={image_name}" - }, - files={"file": image_file}, - verify=False - ) - if response.status_code == 201: - media_data = response.json() - self.update_data_media(media, media_data['id']) - else: - return None - else: - pass - - def is_exists(self, media, image_name): - all_images = self.get_all_images() - name_without_extension, extension = os.path.splitext(image_name) - for image in all_images: - if media['Slug'] == image['slug']: - return True - else: - pass - return False - - - def update_data_media(self, media, id_img): - update_data = { - "title" : media['Nom'], - "alt_text": media['Description'], - "slug": media['Slug'], - } - path = Path(BASE_PATH + media['Chemin']) - image_name = path.name - response = requests.post( - f"{self.media_api_url}/{id_img}", - headers={ - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Disposition": f"attachment; filename={image_name}" - }, - json=update_data, - verify=False - ) - - if response.status_code == 200: - return response.json() - else: - return None - - def find_id_by_slug(self, slug): - images = self.get_all_images() - for img in images: - if img['slug'] == slug: - return img['id'] - - def get_all_as_slug_dict(self): - all_slug_dict = {} - images = self.get_all_images() - for img in images: - all_slug_dict[img['id']] = img['slug'] - return all_slug_dict - - def delete_media_by_slug(self, slug): - images = self.get_all_images() - for img in images: - if img['slug'] == slug: - delete_url = f"{self.media_api_url}/{img['id']}?force=true" - response = requests.delete(delete_url, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False) - - def get_all_images(self): - """Récupère toutes les images en gérant la pagination""" - all_images = [] - page = 1 - while True: - response = requests.get(f"{self.media_api_url}?per_page=100&page={page}", - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False - ) - if response.status_code != 200: - break - - images = response.json() - if not images: - break - - all_images.extend(images) - page += 1 - - return all_images - - def delete_images(self, images): - """Supprime toutes les images récupérées""" - for img in images: - img_id = img['id'] - delete_url = f"{self.media_api_url}/{img_id}?force=true" - - response = requests.delete(delete_url, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False) - if response.status_code in [200, 410]: # 410 = déjà supprimé - print(f"Image {img_id} supprimée.") - else: - print(f"Erreur suppression {img_id} :", response.status_code, response.text) - - def delete_all_images(self): - images = self.get_all_images() - for img in images: - img_id = img['id'] - delete_url = f"{self.media_api_url}/{img_id}?force=true" - - response = requests.delete(delete_url, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False) - if response.status_code in [200, 410]: # 410 = déjà supprimé - print(f"Image {img_id} supprimée.") - else: - print(f"Erreur suppression {img_id} :", response.status_code, response.text) - - def assign_image_logo(self): - images = self.get_all_images() - for img in images: - if img['slug'] == "logo-lescreationsdemissbleue": - data = { - "site_logo":img['id'], - "site_icon" : img['id'] - } - response = requests.post( - self.media_api_settings, - json=data, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False - ) - - if response.status_code == 200: - print("Logo mis à jour avec succès !") - else: - print(f"Erreur lors de la mise à jour du logo : {response.text}") - - -class CategoryManager(OdsReader): - - def __init__(self, wcapi, ath, medias=None): - super().__init__() - self.wcapi = wcapi - self.ath = ath - self.medias = medias - self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" - self.error_log = [] - self.headers = { - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Type": "application/json" - } - - def find_id_by_slug(self, slug): - response = self.wcapi.get("products/categories/",params={"per_page": 100}) - if response.status_code == 200: - categories = response.json() - for cat in categories: - if cat['slug'] == slug: - return cat['id'] - - def create_category(self, name, description, slug): - category_data = { - "name": name, - "description": description, - "slug":slug - } - if self.find_id_by_slug(slug): - self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà") - else: - self.wcapi.post("products/categories/", category_data) - - def assign_parent_category(self, parent_slug, slug): - response = self.wcapi.get("products/categories/",params={"per_page": 100}) - if response.status_code == 200: - categories = response.json() - for cat in categories: - parent_id = self.find_id_by_parent_slug(parent_slug) - if parent_id: - if cat['slug'] == slug: - self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id}) - - def find_id_by_parent_slug(self, parent_slug): - response = self.wcapi.get("products/categories/",params={"per_page": 100}) - if response.status_code == 200: - categories = response.json() - for cat in categories: - if cat['slug'] == parent_slug: - return cat['id'] - - def find_media_id_by_slug(self, media_slug): - for id, slug in self.medias.items(): - if media_slug == slug: - return id - - def update_media_id_for_category(self, media_id, cat_id): - response = requests.get(f"{self.media_api_url}/{media_id}", - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False - ) - update_category_data = { - "image" : {'id':media_id}, - } - self.wcapi.put(f"products/categories/{cat_id}", update_category_data) - - def update_data_categories(self): - json_data = self.get_all_category_lines() - for category in json_data: - self.create_category(category['Nom'], category['Description'], category['Slug']) - cat_id = self.find_id_by_slug(category['Slug']) - media_id = self.find_media_id_by_slug(category['Media Slug']) - self.assign_parent_category(category['Parent Slug'], category['Slug']) - self.update_media_id_for_category(media_id,cat_id) - - def delete_all_category(self): - response = self.wcapi.get(f"products/categories",params={"per_page": 100}) - for cat in response.json(): - self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True}) - - def delete_media_category(self, media_slug): - media_id = self.find_media_id_by_slug(media_slug) - requests.delete( - f"{self.media_api_url}/{media_id['id']}", - headers=self.headers, - verify=False - ) - - def delete_category_by_id(self, category_id): - self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) - - def delete_category_by_slug(self, slug): - category_id = self.find_id_by_slug(slug) - #print(f"category_id = {category_id}") - self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) - - def get_errors(self): - return print(f"self.error_log = {self.error_log}") - -class ProductManager(OdsReader): - def __init__(self, wcapi, ath, medias=None): - super().__init__() - self.wcapi = wcapi - self.ath = ath - self.medias = medias - self.error_log = [] - self.headers = { - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Type": "application/json" - } - self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" - - def update_data_list_cat_product(self, list_category_id, list_img_id, product_id): - product_data = { - 'categories':list_category_id, - 'images':list_img_id, - } - self.wcapi.put(f"products/{product_id}", product_data) - - def get_list_media_id_for_product(self, medias): - list_media_id_for_product = [] - for id, media_slug in self.medias.items(): - for media in medias: - if media == media_slug: - image_id = {'id':id} - list_media_id_for_product.append(image_id) - return list_media_id_for_product[::-1] - - def get_list_category_for_product(self, categories): - response = self.wcapi.get("products/categories",params={"per_page": 100}) - list_category_for_product = [] - for category in response.json(): - for cat in categories: - if category['name'] == cat: - id_category = {'id':category['id']} - list_category_for_product.append(id_category) - return list_category_for_product - - def find_product_by_id(self, id): - response = self.wcapi.get(f"products/{id}") - if response.status_code == 200: - product = response.json() - return product - - def find_id_by_slug(self, slug): - response = self.wcapi.get("products/",params={"per_page": 100}) - if response.status_code == 200: - products = response.json() - for pro in products: - if pro['slug'] == slug: - return pro['id'] - - def find_media_id_by_slug(self, media_slug): - for id, slug in self.medias.items(): - if media_slug == slug: - return id - - def create_tabs_from_custom_dict(self, product_id, product): - product_tabs_data = {} - list_product_tabs_data = [] - x = 1 - for key in product.keys(): - if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes": - product_tabs_data['title'] = key - product_tabs_data['content'] = product[key] - product_tabs_data['nickname'] = '' - product_tabs_data['position'] = x - product_tabs_data['tab_type'] = 'local' - list_product_tabs_data.append(product_tabs_data) - product_tabs_data = {} - x += 1 - - response = self.wcapi.get(f"products/{product_id}") - if response.status_code == 200: - meta_data = [] - meta_data.append( - {'key': 'wb_custom_tabs', 'value': list_product_tabs_data} - ) - meta_data_data = { - 'meta_data': meta_data - } - res = self.wcapi.post(f"products/{product_id}", meta_data_data) - else: - print(f"error") - - def create_product(self, product_data): - if self.find_id_by_slug(product_data['slug']): - self.error_log.append(f"Produit contenant comme slug '{product_data['slug']}' existe déjà") - else: - response = self.wcapi.post("products/", product_data) - - def update_data_product(self, product_data, categories, medias): - json_data = self.get_all_product_lines() - for product in json_data: - self.create_product(product_data) - product_id = self.find_id_by_slug(product_data['slug']) - list_category_id = self.get_list_category_for_product(categories) - list_img_id = self.get_list_media_id_for_product(medias) - self.update_data_list_cat_product(list_category_id, list_img_id, product_id) - - def update_data_product_by_slug(self, slug): - json_data = self.get_all_product_lines() - for product in json_data: - if product['Slug'] == slug: - self.create_product(product) - product_id = self.find_id_by_slug(product['Slug']) - list_category_id = self.get_list_category_for_product(product['Catégories']) - list_img_id = self.get_list_media_id_for_product(product['Media Slugs']) - self.update_data_list_cat_product(list_category_id, list_img_id, product_id) - - def get_all_products(self): - """Récupère tous les produits en gérant la pagination""" - all_products = [] - page = 1 - - while True: - response = self.wcapi.get("products", params={"per_page": 100, "page": page}) - - if response.status_code != 200: - print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}") - break - - products = response.json() - if not products: # Si la page est vide, on arrête la boucle - break - - all_products.extend(products) - page += 1 # On passe à la page suivante - - return all_products - - - def delete_product(self): - json_data = self.get_all_product_lines() - for product in json_data: - list_products = self.wcapi.get(f"products/") - for pro in list_products.json(): - if product['Nom'] == pro['name']: - self.wcapi.delete(f"products/{pro['id']}") - - def delete_all_product(self): - products = self.get_all_products() - if products: - for pro in products: - self.wcapi.delete(f"products/{pro['id']}", params={"force": True}) - - def delete_media_product(self, media_slug): - media_id = self.find_media_id_by_slug(media_slug) - requests.delete( - f"{self.media_api_url}/{media_id['id']}", - headers=self.headers, - verify=False - ) - - def delete_product_by_id(self, product_id): - self.wcapi.delete(f"products/{product_id}", params={"force": True}) - - def delete_product_by_slug(self, slug): - product_id = self.find_id_by_slug(slug) - self.wcapi.delete(f"products/{product_id}", params={"force": True}) - - - def normalize_string(text): - return unicodedata.normalize("NFKC", text).strip().lower() - - def tab_exists(self, product_id, name_tab): - response = self.wcapi.get(f"products/{product_id}") - if response.status_code == 200: - response_json = self.wcapi.get(f"products/{product_id}").json() - for meta_data in response_json['meta_data']: - for key_meta_data, value_meta_data in meta_data.items(): - if key_meta_data == "value": - if isinstance(value_meta_data, list): - for tab in value_meta_data: - if name_tab == tab['title']: - return True - return False - -class AttributeManager(OdsReader): - - def __init__(self, wcapi): - super().__init__() - self.wcapi = wcapi - - def get_attributes(self): - attributes = self.wcapi.get(f"products/attributes").json() - one_attribute = self.wcapi.get(f"products/attributes/1/terms").json() - return attributes - - def get_by_name(self, name): - attributes = self.wcapi.get(f"products/attributes").json() - for attr in attributes: - if attr['name'] == name: - attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json() - return attribute - - def get_list_name_data(self): - list_name_data = [] - json_data = self.get_all_attribute_and_tab_lines() - for item in json_data: - if item['Onglet'].strip() == "Informations Complémentaires": - list_name_data.append(item['Nom']) - return list_name_data - - def create(self): - features_json_data = self.get_all_attribute_and_tab_lines() - for item in features_json_data: - if item['Onglet'].strip() == "Informations Complémentaires": - attribute_data = { - 'name' : item["Nom"] - } - self.wcapi.post(f"products/attributes", attribute_data) - - def get_term(self): - term_dict = {} - list_item = [] - term_json_data = self.get_all_attribute_and_tab_lines() - for item in term_json_data: - if item['Onglet'].strip() == "Informations Complémentaires": - if "," in item["Valeurs"]: - list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")] - else: - item['Valeurs'].strip() - if list_item: - term_dict[item['Nom']] = list_item - else: - term_dict[item['Nom']] = item['Valeurs'] - return term_dict - - def configure_term(self): - term_dict = self.get_term() - response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) - if response.status_code == 200: - attributes = response.json() - for attribute in attributes: - for name, value in term_dict.items(): - if attribute['name'] == name: - if isinstance(value, list): - for v in value: - term = { - 'name' : v - } - self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) - else: - term = { - 'name' : value - } - self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) - - - def create_for_product(self, product_id, name, value, variation=False): - data_attribute = { - 'name': name, - 'options':value - } - #list_product_tabs_data.append(data_tab) - response = self.wcapi.get(f"products/{product_id}") - if response.status_code == 200: - product_meta_data = response.json() - existing_attributes_data = product_meta_data.get("attributes", []) - already_exist = False - for data in existing_attributes_data: - for key_data, value_data in data.items(): - if key_data == "value": - if isinstance(value_data, list): - for value in value_data: - if value['name'] == name: - already_exist = True - - if already_exist == False: - found = False - for attribute in existing_attributes_data: - if attribute["name"] == name: - attribute["options"].append(data_attribute) - found = True - break - - # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée - if not found: - existing_attributes_data.append({ - "name": name, - "options": [value], - "visible":True, - "variation": variation, - #"parent_id":product_id - }) - attributes_data = { - 'attributes': existing_attributes_data - } - res = self.wcapi.put(f"products/{product_id}", attributes_data) - else: - print('already_exist') - else: - print(f"error") - - def delete_all_for_product(self): - response_product = self.wcapi.get(f"products/", params={"per_page": 100}) - if response_product.status_code == 200: - products = response_product.json() - for product in products: - existing_attributes_data = product.get("attributes", []) - if existing_attributes_data == []: - pass - else: - attribute_data = { - 'attributes': [] - } - res = self.wcapi.post(f"products/{product['id']}", attribute_data) - - - def delete_all_term(self): - response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100}) - if response_attribute.status_code == 200: - attributes = response_attribute.json() - for attribute in attributes: - response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100}) - if response_attribute_term.status_code == 200: - attributes_term = response_attribute_term.json() - for term in attributes_term: - self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True}) - - def delete_all(self): - response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) - if response.status_code == 200: - attributes = response.json() - for attribute in attributes: - self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True}) - - -class TabManager(OdsReader): - - def __init__(self, wcapi): - super().__init__() - self.wcapi = wcapi - - def get_list_name_data(self): - list_name_data = [] - json_data = self.get_all_attribute_and_tab_lines() - for item in json_data: - if item['Onglet'].strip() != "Informations Complémentaires": - list_name_data.append(item['Nom']) - return list_name_data - - def create_or_update_for_product(self, product_id, tabs): - position = 1 - for title, content in tabs.items(): - position += 1 - data_tab = { - 'title': title, - 'content':content, - 'nickname':'', - 'position':position, - 'tab_type': 'local' - } - response = self.wcapi.get(f"products/{product_id}") - if response.status_code == 200: - product_meta_data = response.json() - existing_meta_data = product_meta_data.get("meta_data", []) - already_exist = False - for data in existing_meta_data: - for key_data, value_data in data.items(): - if key_data == "value": - if isinstance(value_data, list): - for value in value_data: - if value['title'] == title: - already_exist = True - if already_exist == False: - found = False - for meta in existing_meta_data: - if meta["key"] == "wb_custom_tabs": - meta["value"].append(data_tab) - found = True - break - - # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée - if not found: - existing_meta_data.append({ - "key": "wb_custom_tabs", - "value": [data_tab] - }) - meta_data_data = { - 'meta_data': existing_meta_data - } - res = self.wcapi.put(f"products/{product_id}", meta_data_data) - else: - print('else') - data_tab = { - 'content':content, - } - meta_data_data = { - 'meta_data': existing_meta_data - } - res = self.wcapi.put(f"products/{product_id}", meta_data_data) - else: - print(f"error") - - def delete_by_product_id(self, product_id): - response = self.wcapi.get(f"products/{product_id}") - if response.status_code == 200: - product_meta_data = response.json() - existing_meta_data = product_meta_data.get("meta_data", []) - if existing_meta_data == []: - pass - else: - meta_data = { - 'meta_data': [{"key": "wb_custom_tabs","value":[]}] - } - res = self.wcapi.post(f"products/{product_id}", meta_data) - - def delete_all(self): - response = self.wcapi.get(f"products/", params={"per_page": 100}) - if response.status_code == 200: - product_meta_data = response.json() - for product in product_meta_data: - existing_meta_data = product.get("meta_data", []) - if existing_meta_data == []: - pass - else: - meta_data = { - 'meta_data': [{"key": "wb_custom_tabs","value":[]}] - } - res = self.wcapi.post(f"products/{product['id']}", meta_data) - -class VariationsManager(OdsReader): - - def __init__(self, wcapi): - super().__init__() - self.wcapi = wcapi - - def get_attribute_id(self, product_data): - response = self.wcapi.get(f"products/attributes") - if response.status_code == 200: - attributes = response.json() - for key, value in product_data.items(): - for attr_key, attr_value in attributes.items(): - if attr_value['name'] == key: - attribute_id = attr_value['id'] - return attribute_id - - def update_product_attributes_merged(self, wcapi, product_id, attribute_name, new_options): - """ - Met à jour l'attribut d'un produit WooCommerce en ajoutant de nouvelles options, - sans écraser les autres attributs existants. - - :param wcapi: Instance API WooCommerce (wcapi = API(...)) - :param product_id: ID du produit à mettre à jour - :param attribute_name: Nom de l'attribut à enrichir (ex: "Parfums") - :param new_options: Liste des nouvelles valeurs à ajouter (ex: ["Lavande", "Citron"]) - """ - # Nettoyer les nouvelles options - new_options = [opt.strip() for opt in new_options.split('|') if opt.strip()] - # 1. Récupérer le produit existant - response = wcapi.get(f"products/{product_id}") - if response.status_code != 200: - print(f"❌ Impossible de récupérer le produit {product_id}") - return - - product = response.json() - attributes = product.get("attributes", []) - - # 2. Chercher l'attribut ciblé - found = False - for attr in attributes: - if attr["name"].lower() == attribute_name.lower(): - existing_options = attr.get("options", []) - merged_options = list(set(existing_options + new_options)) - attr["options"] = merged_options - attr["variation"] = True - attr["visible"] = True - attr["parent_id"] = product_id - attr["manage_stock"] = "parent" - found = True - break - - # 3. Si l'attribut n'existe pas, on l'ajoute - if not found: - attributes.append({ - "name": attribute_name, - "variation": True, - "visible": True, - "options": new_options - }) - - # 4. Mettre à jour le produit avec les attributs fusionnés - update_data = { - "attributes": attributes - } - - update_res = wcapi.put(f"products/{product_id}", update_data) - if update_res.status_code == 200: - print(f"✅ Attribut '{attribute_name}' mis à jour avec succès.") - else: - print(f"❌ Erreur lors de la mise à jour : {update_res.status_code}") - print(update_res.json()) - - def create_variations_products(self, product_id, product_data): - #products_lines = self.get_all_product_lines() - product_line = self.get_product_by_slug_from_ods(product_data['slug']) - for product_line_key, products_line_value in product_line.items(): - if product_line_key == "Parfums": - name_attribute = product_line_key - parfums = products_line_value - if product_line_key == "Type": - if product_data['type'] == "variable": - response = self.wcapi.get(f"products/{product_id}") - if response.status_code == 200: - existing_product = response.json() - self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums) - - parfums = [p.strip() for p in parfums.split("|") if p.strip()] - - response = self.wcapi.get(f"products/{product_id}/variations") - - if response.status_code == 200: - for parfum in parfums: - data = { - 'attributes': [ - { - 'name': name_attribute, - 'option': parfum - } - ], - 'manage_stock': False, - 'in_stock':True, - 'regular_price': product_data['price'], - } - print(f"Posting variation: {data}") - result = self.wcapi.post(f"products/{product_id}/variations", data) - print(result.status_code) - pprint.pprint(result.json()) - else: - return False - -class WooCommerceManager(OdsReader): - def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager): - super().__init__() - self.wcapi = wcapi - self.media_manager = media_manager - self.category_manager = category_manager - self.product_manager = product_manager - self.tab_manager = tab_manager - self.attribute_manager = attribute_manager - self.variation_manager = variation_manager - - def tab_exists(self, product_id, name_tab): - return self.product_manager.tab_exists(product_id, name_tab) - - def get_product_tab_details(self): - all_products_json = self.get_all_attribute_and_tab_lines() - all_tabs = self.tab_manager.get_list_name_data() - dict = {} - for product in all_products_json: - line = [] - for tab in all_tabs: - line.append([tab, product[tab]]) - dict[product["Parfum"]] = line - return dict - - def get_product_attributes_details(self): - ret = [] - all_products_json = self.get_all_product_lines() - all_attributes = self.attribute_manager.get_list_name_data() - for product in all_products_json: - for attribute in all_attributes: - ret.append([attribute, product[attribute]]) - return ret - - def update_product_tab_by_slug(self, slug): - product_id = self.product_manager.find_id_by_slug(slug) - product = self.product_manager.find_product_by_id(product_id) - products_tab_details = self.get_product_tab_details() - x=1 - for value in products_tab_details.values(): - for key in products_tab_details.keys(): - for title, content in value: - if key: - if key in product['short_description']: - tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local") - x=x+1 - else: - pass - else: - print('no key') - x=1 - - def update_product_attribute_by_slug(self, slug): - product_id = self.product_manager.find_id_by_slug(slug) - product_ods = self.get_product_by_slug_from_ods(slug) - products_attribute_details = self.get_product_attributes_details() - for name, value in products_attribute_details: - self.attribute_manager.create_for_product(product_id=product_id, - name=name, value=value, - variation=self.is_variable(product_ods['Type'])) - - def update_product(self): - #self.product_manager.update_data_product() - self.update_product_tab() - #self.update_product_attribute() - - """def update_product_by_slug(self): - self.product_manager.update_data_product() - self.update_product_tab() - self.update_product_attribute()""" - - def update_product_variation(self, product_id, product_data): - pass - - def update_product_by_slug(self, slug): - self.product_manager.update_data_product_by_slug(slug) - self.update_product_tab_by_slug(slug) - #self.update_product_attribute_by_slug(slug) - - def create_all_informations(self): - #medias = self.media_manager.get_all_as_slug_dict() - #self.product_manager.medias = medias - #self.update_product_by_slug("chope-citron-meringue") - #self.media_manager.upload_media() - #self.media_manager.assign_image_logo() - medias = self.media_manager.get_all_as_slug_dict() - self.product_manager.medias = medias - #self.category_manager.medias = medias - #self.category_manager.update_data_categories() - #self.attribute_manager.create() - #self.attribute_manager.configure_term() - self.process_file(FILENAME_ODS) - self.update_product() - - def get_list_category_for_product(self, category): - category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")] - return category_list_by_doc - - def get_list_media_id_for_product(self, media): - list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")] - return list_media_by_doc - - def is_variable(self, type): - return type.lower() == "parfums" - - def update_product_attribute(self, attributes, product_data): - product_id = self.product_manager.find_id_by_slug(product_data['slug']) - for name, value in attributes.items(): - self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data['type'])) - - def update_product_variations(self, product_data): - product_id = self.product_manager.find_id_by_slug(product_data['slug']) - self.variation_manager.create_variations_products(product_id, product_data) - - def update_product_tab(self, product_data): - for product in product_data: - self.update_product_tab_by_id(product['id']) - - - def create_or_update_product(self, product_data, attributes, tabs, categories, medias): - self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias) - self.update_product_attribute(attributes=attributes, product_data=product_data) - product_id = self.product_manager.find_id_by_slug(product_data['slug']) - self.update_product_variations(product_data) - self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs) - - - def process_file(self, filename): - # refresh media cache - medias = media_manager.get_all_as_slug_dict() - self.product_manager.medias = medias - # read provided file - reader = OdsReader(filename) - for product_line in reader.get_all_product_lines(): - # standard product data - product_data = { - 'name' : product_line['Nom'], - 'price': product_line['Prix'], - 'regular_price': product_line['Prix'], - 'stock_quantity': product_line['Stock'], - 'manage_stock':True, - 'weight':str(product_line['Poids']), - 'sku':str(product_line['Numéro de référence']), - 'description': product_line['Description'], - 'short_description': product_line['Courte Description'], - 'slug':product_line['Slug'] - } - if product_line['Type'] == "parfums": - product_data['type'] = "variable" - else: - product_data['type'] = "simple" - - attributes = { - "Temps de combustion" : product_line['Temps de combustion'], - "Type de cire" : product_line['Type de cire'], - "Mèche" : product_line['Mèche'], - "Fabrication" : product_line['Fabrication'], - "Composition" : product_line['Composition'], - "Ingrédients et engagements" : product_line['Ingrédients et engagements'], - "Parfums" : product_line['Parfums'] - } - - tabs ={ - #"Description" : product_line["Description"], - "Conseils d'utilisation" : product_line["Conseils d’utilisation"], - "Précautions articles" : product_line["Précautions articles"], - #"Allergènes" : product_line["Allergènes"] - } - # ... associated categories - categories = self.get_list_category_for_product(product_line['Catégories']) - - # ... associated medias - medias = self.get_list_media_id_for_product(product_line['Media Slugs']) - - # create or update product - self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias) - - - """def put_social_data(self): - response = requests.post(url, - auth=HTTPBasicAuth("consumer_key", "consumer_secret"), - json={ - "acf": { - "instagram_url": "https://instagram.com/ton_compte" - } - } - )""" - - def delete_all_informations(self): - self.media_manager.delete_all_images() - self.attribute_manager.delete_all() - self.product_manager.delete_all_product() - self.category_manager.delete_all_category() - - def delete_information_by_slug(self): - product_manager.delete_product_by_slug("chope-adoucissant") - #category_manager.delete_all_category() - -#ALL_TABS = ["Allergènes", "Conseils d’utilisation", "Description", "Précautions articles"] -#ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"] - -media_manager = MediaManager(ath=ath) -#media_manager.upload_media() -#media_manager.delete_all_images() -#media_manager.assign_image_logo() -category_manager = CategoryManager(wcapi=wcapi,ath=ath) -#category_manager.delete_all_category() -product_manager = ProductManager(wcapi=wcapi,ath=ath) -#product_manager.delete_all_product() -#medias=media_manager.get_all_as_slug_dict() -#media_manager.delete_media_by_slug('pyramide-olfactive-frangipanier') -#product_manager.delete_product_by_slug("citron-meringue") -#product_manager.update_data_product() -tab_manager = TabManager(wcapi=wcapi) -attribute_manager = AttributeManager(wcapi=wcapi) -variation_manager = VariationsManager(wcapi=wcapi) -#attribute_manager.create(ALL_ATTRIBUTES) -#attribute_manager.create() -#attribute_manager.configure_term() -#attribute_manager.delete_all_term() -#product_id = product_manager.find_id_by_slug("citron-meringue")""" -woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager, variation_manager=variation_manager) -##woocommerce_manager.delete_all_informations() # -woocommerce_manager.create_all_informations() -##woocommerce_manager.process_file(FILENAME_ODS) -#category_manager.update_data_categories() -#woocommerce_manager.delete_all_informations() -#woocommerce_manager.delete_information_by_slug() -#woocommerce_manager.create_all_informations() -#woocommerce_manager.create_all_categories_and_products() -#woocommerce_manager.update_product_tab() -#woocommerce_manager.tab_manager.delete_by_product_id(1890) -#woocommerce_manager.tab_manager.delete_all() -#woocommerce_manager.update_product() -#woocommerce_manager.attribute_manager.delete_all_for_product() -#woocommerce_manager.update_product_attribute_by_slug('citron-meringue') -#woocommerce_manager.attribute_manager.delete_all_for_product() - -"""tabs_in_product = [] -for tab in ALL_TABS: - tab_in_product = woocommerce_manager.tab_exists(1890, tab) - tabs_in_product.append(tab_in_product)""" - -""" -utilisation -module argparse -# on va appeler ça importation d'un fichier ods, d'où l'action import-ods -# on va appeler cette commande, "la commande de base" -wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key= --wc-secret= import-ods --ods-path=fichier.ods - -# traitement de l'intégralité d'un fichier ods -... --all - -# traitement des medias seulement, on peut en option spécifier une plage de média à importer -... --medias [--media-range=1:40] - -plu tard ... -# traitement des catégories seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de la catégorie -... --categories [--categories-regex=] -ex: traiter uniquement les catégories dont le nom contient le terme "bougie" -... --categories [--categories-regex=.*bougie.*] - -# traitement des articles seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de l'article' -# ... --products [--products-regex=] -ex: traiter uniquement les articles dont le nom contient le terme "bougie" -... --categories [--products-regex=.*bougie.*] - - -""" \ No newline at end of file