from woocommerce import API as WoocommerceApi from pathlib import Path import pandas as pd import ezodf import requests import pprint import base64 import time import json import pyexcel_ods3 import unicodedata import logging logger = logging.getLogger(__name__) # 1️⃣ Configurer le logger logging.basicConfig( filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL) format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date ) # via consumer key and consumer secret : # https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768 wcapi = WoocommerceApi( url="https://lescreationsdemissbleue.local", consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e", consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768", wp_api=True, version="wc/v3", verify_ssl=False # Désactive la vérification SSL pour le développement ) class AuthentificationWpApi: # Identifiants WordPress (et non WooCommerce) wordpress_username = "admin_lcdm" # Remplace par ton username WordPress wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs # Générer l'authentification Basic en base64 auth_str = f"{wordpress_username}:{wordpress_application_password}" auth_bytes = auth_str.encode("utf-8") auth_base64 = base64.b64encode(auth_bytes).decode("utf-8") ath = AuthentificationWpApi() WEBSITE_URL = "https://lescreationsdemissbleue.local" FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue.ods" BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\Photos_Claudine\\Photos_bougies_Claudine\\" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" class OdsReader: def __init__(self): self.filename_ods = FILENAME_ODS def get_doc_ods(self, number_sheet): # ezodf.config.set_table_expand_strategy('all') doc = ezodf.opendoc(self.filename_ods) #pprint.pprint(doc) #print(f"type = {type(doc.sheets)}") #print(f"doc.sheets = {doc.sheets}") sheets_list = list(doc.sheets) # 🔹 Vérification : Afficher toutes les feuilles disponibles print(f"Feuilles disponibles : {[sheet.name for sheet in sheets_list]}") sheet = doc.sheets[number_sheet] print(f"Feuilles disponibles : {[sheet.name for sheet in doc.sheets]}") # Debug data = [] for row in sheet.rows(): data.append([cell.value for cell in row]) df = pd.DataFrame(data) df.columns = df.iloc[0] df = df[1:].reset_index(drop=True) df = df.dropna(how='all') json_data = df.to_dict(orient="records") return json_data class MediaManager(OdsReader): def __init__(self, ath): super().__init__() self.ath = ath self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" #[cat.strip().replace('"', '') for cat in product['Categorie'].split("/")] def upload_media(self): json_data = self.get_doc_ods(0) for media in json_data: path = Path(BASE_PATH + media['Chemin']) image_name = path.name #print(f'path = {path}') with open(BASE_PATH + media['Chemin'], "rb") as image_file: response = requests.post( self.media_api_url, headers={ "Authorization": f"Basic {self.ath.auth_base64}", "Content-Disposition": f"attachment; filename={image_name}" }, files={"file": image_file}, verify=False ) #print(f'response.status_code = {response.status_code}, response_text = {response.text}') if response.status_code == 201: media_data = response.json() #print(f"media_data['slug'] = {media_data['slug']}") self.update_data_media(media, media_data['id']) else: return None def update_data_media(self, media, id_img): update_data = { "title" : media['Nom'], "alt_text": media['Description'], "slug": media['Slug'], } path = Path(BASE_PATH + media['Chemin']) image_name = path.name response = requests.post( f"{self.media_api_url}/{id_img}", headers={ "Authorization": f"Basic {self.ath.auth_base64}", "Content-Disposition": f"attachment; filename={image_name}" }, json=update_data, verify=False ) if response.status_code == 200: return response.json() else: return None def find_id_by_slug(self, slug): images = self.get_all_images() for img in images: if img['slug'] == slug: return img['id'] def get_all_as_slug_dict(self): all_slug_dict = {} images = self.get_all_images() for img in images: all_slug_dict[img['id']] = img['slug'] return all_slug_dict def delete_media_by_slug(self, slug): images = self.get_all_images() for img in images: if img['slug'] == slug: delete_url = f"{self.media_api_url}/{img['id']}?force=true" response = requests.delete(delete_url, headers={"Authorization": f"Basic {self.ath.auth_base64}"}, verify=False) def get_all_images(self): """Récupère toutes les images en gérant la pagination""" all_images = [] page = 1 while True: response = requests.get(f"{self.media_api_url}?per_page=100&page={page}", headers={"Authorization": f"Basic {self.ath.auth_base64}"}, verify=False ) if response.status_code != 200: break images = response.json() if not images: break all_images.extend(images) page += 1 return all_images def delete_images(self, images): """Supprime toutes les images récupérées""" for img in images: img_id = img['id'] delete_url = f"{self.media_api_url}/{img_id}?force=true" response = requests.delete(delete_url, headers={"Authorization": f"Basic {self.ath.auth_base64}"}, verify=False) if response.status_code in [200, 410]: # 410 = déjà supprimé print(f"Image {img_id} supprimée.") else: print(f"Erreur suppression {img_id} :", response.status_code, response.text) """category_manager = CategoryManager(api=api, medias=media_manager.get_all_as_slug_dict()) category_manager.create(name, short_description, description, )""" class CategoryManager(OdsReader): def __init__(self, wcapi, ath, medias): super().__init__() self.wcapi = wcapi self.ath = ath self.medias = medias self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", "Content-Type": "application/json" } def find_id_by_slug(self, slug): response = self.wcapi.get("products/categories/",params={"per_page": 100}) #print(f"response.status_code = {response.status_code}") if response.status_code == 200: categories = response.json() for cat in categories: if cat['slug'] == slug: #pprint.pprint(cat) return cat['id'] def create_category(self, name, description, slug): category_data = { "name": name, "description": description, "slug":slug } if self.find_id_by_slug(slug): self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà") else: self.wcapi.post("products/categories/", category_data) def assign_parent_category(self, parent_slug, slug): response = self.wcapi.get("products/categories/",params={"per_page": 100}) if response.status_code == 200: categories = response.json() for cat in categories: parent_id = self.find_id_by_parent_slug(parent_slug) if parent_id: if cat['slug'] == slug: self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id}) def find_id_by_parent_slug(self, parent_slug): response = self.wcapi.get("products/categories/",params={"per_page": 100}) if response.status_code == 200: categories = response.json() for cat in categories: if cat['slug'] == parent_slug: return cat['id'] def find_media_id_by_slug(self, media_slug): for id, slug in self.medias.items(): if media_slug == slug: return id def update_media_id_for_category(self, media_id, cat_id): update_category_data = { "image" : {'id':media_id}, } self.wcapi.put(f"products/categories/{cat_id}", update_category_data) def update_data_categories(self): json_data = self.get_doc_ods(1) for category in json_data: self.create_category(category['Nom'], category['Description'], category['Slug']) cat_id = self.find_id_by_slug(category['Slug']) media_id = self.find_media_id_by_slug(category['Media Slug']) self.assign_parent_category(category['Parent Slug'], category['Slug']) self.update_media_id_for_category(media_id,cat_id) def delete_all_category(self): response = self.wcapi.get(f"products/categories",params={"per_page": 100}) for cat in response.json(): self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True}) def delete_media_category(self, media_slug): media_id = self.find_media_id_by_slug(media_slug) requests.delete( f"{self.media_api_url}/{media_id['id']}", headers=self.headers, verify=False ) def delete_category_by_id(self, category_id): self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) def delete_category_by_slug(self, slug): category_id = self.find_id_by_slug(slug) #print(f"category_id = {category_id}") self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) def get_errors(self): return print(f"self.error_log = {self.error_log}") class ProductManager(OdsReader): def __init__(self, wcapi, ath, medias): super().__init__() self.wcapi = wcapi self.ath = ath self.medias = medias self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", "Content-Type": "application/json" } self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" def update_data_list_cat_product(self, list_category_id, list_img_id, product_id): product_data = { 'categories':list_category_id, 'images':list_img_id, } print(f"product_id = {product_id}") #print(f"product_data = {product_data}") self.wcapi.put(f"products/{product_id}", product_data) #self.wcapi.get(f"products/{product_id}") def get_list_media_id_for_product(self, product): media_id = {} list_media_id_for_product = [] list_media_by_doc = [img.strip().replace(' ', '') for img in product.split(",")] #pprint.pprint(self.uploaded_images_pro) #print(f'list_media_by_doc = {list_media_by_doc}') for id, media_slug in self.medias.items(): for media in list_media_by_doc: if media == media_slug: image_id = {'id':id} list_media_id_for_product.append(image_id) #print(f"list_image_id_for_product = {list_media_id_for_product}") return list_media_id_for_product[::-1] def get_list_category_for_product(self, product): response = self.wcapi.get("products/categories",params={"per_page": 100}) category_list_by_doc = [cat.strip().replace('"', '') for cat in product.split("/")] list_category_for_product = [] for category in response.json(): for cat in category_list_by_doc: #print(f"category['name'] = {category['name']}") #print(f"cat = {cat}") if category['name'] == cat: id_category = {'id':category['id']} list_category_for_product.append(id_category) #print(f'list_category_for_product = {list_category_for_product}') return list_category_for_product def find_id_by_slug(self, slug): response = self.wcapi.get("products/",params={"per_page": 100}) if response.status_code == 200: products = response.json() for pro in products: if pro['slug'] == slug: return pro['id'] def find_media_id_by_slug(self, media_slug): for id, slug in self.medias.items(): if media_slug == slug: return id def create_tabs_from_custom_dict(self, product_id, product): product_tabs_data = {} list_product_tabs_data = [] x = 1 for key in product.keys(): #key = key.replace("’", "'") if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes": product_tabs_data['title'] = key product_tabs_data['content'] = product[key] product_tabs_data['nickname'] = '' product_tabs_data['position'] = x product_tabs_data['tab_type'] = 'local' list_product_tabs_data.append(product_tabs_data) product_tabs_data = {} x += 1 response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: #product_response = response.json() meta_data = [] meta_data.append( {'key': 'wb_custom_tabs', 'value': list_product_tabs_data} ) meta_data_data = { 'meta_data': meta_data } #pprint.pprint(meta_data_data) res = self.wcapi.post(f"products/{product_id}", meta_data_data) else: print(f"error") def create_product(self, product): product_data = { 'name' : product['Nom'], 'price': product['Prix'], 'regular_price': product['Prix'], 'stock_quantity': 1, 'manage_stock':True, #'description': product['Description'], 'short_description': product['Courte Description'], } #print(f"product['Prix'] = {product['Prix']}") if self.find_id_by_slug(product['Slug']): self.error_log.append(f"Produit contenant comme slug '{product['Slug']}' existe déjà") else: self.wcapi.post("products/", product_data) """if response.status_code == 201: product = response.json() return product["id"] else: return None""" def update_data_product(self): json_data = self.get_doc_ods(2) for product in json_data: self.create_product(product) product_id = self.find_id_by_slug(product['Slug']) list_category_id = self.get_list_category_for_product(product['Catégories']) list_img_id = self.get_list_media_id_for_product(product['Media Slugs']) self.update_data_list_cat_product(list_category_id, list_img_id, product_id) #self.create_tabs_from_custom_dict(product_id, product) def get_all_products(self): """Récupère tous les produits en gérant la pagination""" all_products = [] page = 1 while True: response = self.wcapi.get("products", params={"per_page": 100, "page": page}) if response.status_code != 200: print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}") break products = response.json() if not products: # Si la page est vide, on arrête la boucle break all_products.extend(products) page += 1 # On passe à la page suivante #pprint.pprint(all_products) return all_products def delete_product(self): json_data = self.get_doc_ods(2) for product in json_data: list_products = self.wcapi.get(f"products/") for pro in list_products.json(): #print(f"product['Nom'] = {product['Nom']}") #print(f"pro['name'] = {pro['name']}") if product['Nom'] == pro['name']: self.wcapi.delete(f"products/{pro['id']}") """def delete_all_product(self): response = self.wcapi.get(f"products/",params={"per_page": 100}) for pro in response.json(): self.wcapi.delete(f"products/{pro['id']}", params={"force": True})""" def delete_all_product(self): products = self.get_all_products() for pro in products: self.wcapi.delete(f"products/{pro['id']}", params={"force": True}) def delete_media_product(self, media_slug): media_id = self.find_media_id_by_slug(media_slug) requests.delete( f"{self.media_api_url}/{media_id['id']}", headers=self.headers, verify=False ) def delete_product_by_id(self, product_id): self.wcapi.delete(f"products/{product_id}", params={"force": True}) def delete_product_by_slug(self, slug): product_id = self.find_id_by_slug(slug) #print(f"product_id = {product_id}") self.wcapi.delete(f"products/{product_id}", params={"force": True}) def normalize_string(text): return unicodedata.normalize("NFKC", text).strip().lower() def tab_exists(self, product_id, name_tab): response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: response_json = self.wcapi.get(f"products/{product_id}").json() #pprint.pprint(response_json) #print(f"response_json['meta_data']= {response_json['meta_data']}") for meta_data in response_json['meta_data']: #pprint.pprint(meta_data) for key_meta_data, value_meta_data in meta_data.items(): #pprint.pprint(response_json['meta_data']) #print(f"tùype meta_data = {type(response_json['meta_data'][0])}") #print(f"value = {value}") #print('ici') #print(type(value_meta_data)) #print(f"key_meta_data = {key_meta_data}, value_meta_data = {value_meta_data}") if key_meta_data == "value": #print('loool') if isinstance(value_meta_data, list): for tab in value_meta_data: # Normalisation des chaînes pour éviter les erreurs subtiles """clean_name_tab = name_tab.strip().lower() clean_tab_title = tab['title'].strip().lower() print(f"Comparing: '{clean_name_tab}' == '{clean_tab_title}'") # Débugging if clean_name_tab == clean_tab_title: print("✅ Correspondance trouvée !") else: print("❌ Pas de correspondance")""" #print(f"name_tab = {name_tab} - tab = {tab}") if name_tab == tab['title']: return True return False class AttributeManager(OdsReader): def __init__(self, wcapi): super().__init__() self.wcapi = wcapi def get_attributes(self): attributes = self.wcapi.get(f"products/attributes").json() #pprint.pprint(attributes) one_attribute = self.wcapi.get(f"products/attributes/1/terms").json() #print('_____') #pprint.pprint(one_attribute) return attributes def get_by_name(self, name): attributes = self.wcapi.get(f"products/attributes").json() for attr in attributes: if attr['name'] == name: attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json() #print(f"attribute = {attribute}") return attribute def get_list_name_data(self): list_name_data = [] json_data = self.get_doc_ods(3) for item in json_data: if item['Onglet'].strip() == "Informations Complémentaires": list_name_data.append(item['Nom']) return list_name_data """def create(self, list_name_attributes): for name_attribute in list_name_attributes: attribute_data = { 'name' : name_attribute } self.wcapi.post(f"products/attributes", attribute_data) """ def create(self): features_json_data = self.get_doc_ods(3) for item in features_json_data: if item['Onglet'].strip() == "Informations Complémentaires": print(f"nom = {item['Nom']}") attribute_data = { 'name' : item["Nom"] } self.wcapi.post(f"products/attributes", attribute_data) def get_term(self): term_dict = {} list_item = [] term_json_data = self.get_doc_ods(3) for item in term_json_data: if item['Onglet'].strip() == "Informations Complémentaires": print(f"item['Valeurs'] = {item['Valeurs']}") if "," in item["Valeurs"]: list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")] else: item['Valeurs'].strip() if list_item: term_dict[item['Nom']] = list_item else: term_dict[item['Nom']] = item['Valeurs'] return term_dict def configure_term(self): term_dict = self.get_term() response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) if response.status_code == 200: attributes = response.json() for attribute in attributes: for name, value in term_dict.items(): print(f"term_dict = {term_dict}") if attribute['name'] == name: if isinstance(value, list): for v in value: term = { 'name' : v } self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) else: term = { 'name' : value } self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term) def create_for_product(self, product_id, name, value): data_attribute = { 'name': name, 'options':value } print(f"ici = {name} et {value}") #list_product_tabs_data.append(data_tab) response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: product_meta_data = response.json() existing_attributes_data = product_meta_data.get("attributes", []) pprint.pprint(existing_attributes_data) already_exist = False for data in existing_attributes_data: for key_data, value_data in data.items(): if key_data == "value": if isinstance(value_data, list): for value in value_data: if value['name'] == name: already_exist = True if already_exist == False: found = False for attribute in existing_attributes_data: if attribute["name"] == name: attribute["options"].append(data_attribute) found = True break # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée if not found: existing_attributes_data.append({ "name": name, "options": [value], "visible":True, }) attributes_data = { 'attributes': existing_attributes_data } pprint.pprint(attributes_data) res = self.wcapi.put(f"products/{product_id}", attributes_data) else: print('already_exist') else: print(f"error") def delete_all_for_product(self): response_product = self.wcapi.get(f"products/", params={"per_page": 100}) if response_product.status_code == 200: products = response_product.json() for product in products: existing_attributes_data = product.get("attributes", []) if existing_attributes_data == []: pass else: attribute_data = { 'attributes': [] } res = self.wcapi.post(f"products/{product['id']}", attribute_data) def delete_all_term(self): response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100}) if response_attribute.status_code == 200: attributes = response_attribute.json() for attribute in attributes: response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100}) if response_attribute_term.status_code == 200: attributes_term = response_attribute_term.json() for term in attributes_term: self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True}) def delete_all(self): response = self.wcapi.get(f"products/attributes", params={"per_page": 100}) if response.status_code == 200: attributes = response.json() for attribute in attributes: self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True}) """json_data = self.get_doc_ods(3) for item in json_data: if item['Onglet'].strip() == "Informations Complémentaires": print('ici') self.wcapi.delete(f"products/attributes",params={"force": True})""" #def get_names_attributes(self attributes): """ def get_list_name_data(self): list_name_data = [] json_data = self.get_doc_ods(3) for item in json_data: if item['Onglet'].strip() != "Informations Complémentaires": list_name_data.append(item['Nom']) return list_name_data def create_new_name_attributes(self, attributes, list_name_attributes): for key, value in attributes.items(): for name_attribute in list_name_attributes: if key == name_attribute: attribute_data = { 'name' : value } self.wcapi.post(f"products/attributes", attribute_data) def delete_all(self): attributes = self.wcapi.get(f"products/attributes", params={"per_page": 100}) for attr in attributes: self.wcapi.delete(f"products/attributes/{attr['id']}", params={"force": True}) def delete_by_name(self, name): attributes = self.wcapi.get(f"products/attributes", params={"per_page": 100}).json() for attr in attributes: if attr['name'] == name: self.wcapi.delete(f"products/attributes/{attr['id']}", params={"force": True}) """ class TabManager(OdsReader): def __init__(self, wcapi): super().__init__() self.wcapi = wcapi def get_list_name_data(self): list_name_data = [] json_data = self.get_doc_ods(3) for item in json_data: if item['Onglet'].strip() != "Informations Complémentaires": list_name_data.append(item['Nom']) return list_name_data def create_for_product(self, product_id, title, content, nickname, position, tab_type): logger.info(f"create_for_product(product_id={product_id}, title={title}, content={str(content)[:20]}, nickname={nickname}, position={position}, tab_type={tab_type}) called") data_tab = { 'title': title, 'content':content, 'nickname':nickname, 'position':position, 'tab_type':tab_type } #list_product_tabs_data.append(data_tab) response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: product_meta_data = response.json() existing_meta_data = product_meta_data.get("meta_data", []) already_exist = False for data in existing_meta_data: for key_data, value_data in data.items(): if key_data == "value": if isinstance(value_data, list): for value in value_data: if value['title'] == title: already_exist = True if already_exist == False: found = False for meta in existing_meta_data: if meta["key"] == "wb_custom_tabs": meta["value"].append(data_tab) found = True break # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée if not found: existing_meta_data.append({ "key": "wb_custom_tabs", "value": [data_tab] }) meta_data_data = { 'meta_data': existing_meta_data } res = self.wcapi.put(f"products/{product_id}", meta_data_data) else: print('already_exist') else: print(f"error") def delete_by_product_id(self, product_id): response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: product_meta_data = response.json() existing_meta_data = product_meta_data.get("meta_data", []) if existing_meta_data == []: pass else: meta_data = { 'meta_data': [{"key": "wb_custom_tabs","value":[]}] } res = self.wcapi.post(f"products/{product_id}", meta_data) def delete_all(self): response = self.wcapi.get(f"products/", params={"per_page": 100}) if response.status_code == 200: product_meta_data = response.json() for product in product_meta_data: existing_meta_data = product.get("meta_data", []) if existing_meta_data == []: pass else: meta_data = { 'meta_data': [{"key": "wb_custom_tabs","value":[]}] } res = self.wcapi.post(f"products/{product['id']}", meta_data) #products = self.product_manager.get_all_products() #for product in products: class WooCommerceManager(OdsReader): def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager): super().__init__() self.wcapi = wcapi self.media_manager = media_manager self.category_manager = category_manager self.product_manager = product_manager self.tab_manager = tab_manager self.attribute_manager = attribute_manager def tab_exists(self, product_id, name_tab): return self.product_manager.tab_exists(product_id, name_tab) """def get_product_tab_details_by_id(self,product_id): ret = [] #product = self.wcapi.get(f"products/{product_id}", params={"per_page": 100}) all_products_json = self.get_doc_ods(2) all_tabs = self.tab_manager.get_list_name_data() pprint.pprint(all_tabs) for product in all_products_json: for tab in all_tabs: ret.append([tab, product[tab]]) return ret """ def get_product_tab_details(self): ret = [] all_products_json = self.get_doc_ods(2) all_tabs = self.tab_manager.get_list_name_data() for product in all_products_json: for tab in all_tabs: ret.append([tab, product[tab]]) return ret def get_product_attributes_details(self): ret = [] all_products_json = self.get_doc_ods(2) all_attributes = self.attribute_manager.get_list_name_data() for product in all_products_json: for attributes in all_attributes: ret.append([attributes, product[attributes]]) return ret def update_product_tab(self): #self.product_manager.update_data_product() #product_id = self.product_manager.find_id_by_slug(slug) products_tab_details = self.get_product_tab_details() products = self.product_manager.get_all_products() x=1 for product in products: for title, content in products_tab_details: tab_manager.create_for_product(product_id=product['id'], title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 def update_product_attribute(self): products_attributes_details = self.get_product_attributes_details() products = self.product_manager.get_all_products() #x=1 for product in products: for name, value in products_attributes_details: self.attribute_manager.create_for_product(product_id=product['id'], name=name, value=value) #x=x+1 def update_product_tab_by_slug(self, slug): #self.product_manager.update_data_product() product_id = self.product_manager.find_id_by_slug(slug) products_tab_details = self.get_product_tab_details() x=1 for title, content in products_tab_details: tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 def update_product_attribute_by_slug(self, slug): #self.product_manager.update_data_product() product_id = self.product_manager.find_id_by_slug(slug) products_attribute_details = self.get_product_attributes_details() for name, value in products_attribute_details: print(f"name = {name}, valuee = {value}") self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value) """def create_all_product(self): self.product_manager.update_data_product() x=1 products = self.product_manager.get_all_products() json_data = self.get_doc_ods(2) for product in products: print('aaaaaaaaaaaaa') #pprint.pprint(product) #print(f"product['title'] = {product['title']}") print(f"product['title']['rendered'] = {product['slug']}") print('aaaaaaaaaaaaa') products_tabs_data = self.get_content_product(product['id']) for p in json_data: print(f"p['Nom']= {p['Nom']}") print(f"product.id = {product['id']}") print(f"product.slug = {p['Slug']} - { product['slug']}") print(f"product.name = {product['name']}") if p['Slug'] == product['slug']: #pprint.pprint(products_tabs_data) print('______________________') for title, content in products_tabs_data: tab_manager.create_for_product(product_id=product['id'], title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 """ #attributes = AttributeManager(wcapi) ALL_TABS = ["Allergènes", "Conseils d’utilisation", "Description", "Précautions articles"] ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"] #attributes.create_new_attributes(ALL_ATTRIBUTES) #attributes.get_by_name('Composition') """woocommerce_manager.create_product(product_full_dict) # une ligne du tableau "Produits" # prepare data product_data={} product_attributes_data=[] product_tabs_data=[] for k,v in product_dict.items(): if k in ALL_ATTRIBUTES: products_attributes_data.append((k,v)) else if k in ALL_TABS: products_tabs_data.append((k,v)) else: products_data[k]=v # create product and assign attributes id = product_manager.create(product_data) # assign tabs values x=1 for title, content in products_tabs_data: tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 # assign attributes values for name, value in products_attributes_data: attribute_manager.create_for_product(product_id=id, name=name, value=value)""" media_manager = MediaManager(ath=ath) #media_manager.upload_media() category_manager = CategoryManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict()) product_manager = ProductManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict()) #product_manager.delete_product_by_slug("citron-meringue") tab_manager = TabManager(wcapi=wcapi) attribute_manager = AttributeManager(wcapi=wcapi) #attribute_manager.create(ALL_ATTRIBUTES) #attribute_manager.create() #attribute_manager.configure_term() #attribute_manager.delete_all_term() #product_id = product_manager.find_id_by_slug("citron-meringue")""" woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager) #woocommerce_manager.update_product_tab() #woocommerce_manager.tab_manager.delete_by_product_id(1890) #woocommerce_manager.tab_manager.delete_all() # woocommerce_manager.update_product_by_slug('bougie-personnalisee') woocommerce_manager.attribute_manager.delete_all_for_product() woocommerce_manager.update_product_attribute_by_slug('citron-meringue') #woocommerce_manager.attribute_manager.delete_all_for_product() tabs_in_product = [] for tab in ALL_TABS: tab_in_product = woocommerce_manager.tab_exists(1890, tab) tabs_in_product.append(tab_in_product) #print(f"all_tabs = {tabs_in_product}") """print("co !!!") if all(tabs_in_product): print('true') #return True else: print("false") #return False""" #products_data = woocommerce_manager.get_product_content(519) """x=1 for title, content in products_tabs_data: tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 # create product with api # assign tabs values with api # assign attributes values with api""" #media_manager = MediaManager(ath=ath) #media_manager.upload_media() #media_manager.delete_media_by_slug(slug="chope-citron-meringue-face") #id = media_manager.find_id_by_slug(slug="chope-citron-meringue-face") #print(f'id = {id}') #slug_dict = media_manager.get_all_as_slug_dict() #pprint.pprint(slug_dict) #category_manager = CategoryManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict()) #category_manager.find_id_by_slug(slug="chopes") #category_manager.delete_category_by_slug("bougies-gourmandes") """category_manager.delete_category_by_slug("chopes") category_manager.delete_category_by_slug("gamme-prestige") category_manager.delete_category_by_slug("nouveau-test") category_manager.delete_category_by_slug("second-test") category_manager.delete_category_by_slug("yoyo") id = category_manager.update_data_categories() category_manager.get_errors()""" #product_manager = ProductManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict()) #product_manager.update_data_product() #product_manager.find_id_by_slug('citron-meringue') #product_manager.find_id_by_slug('bougie-chope-de-biere-lavande') #product_manager.find_id_by_slug('bougie-personnalisee') #product_manager.delete_product_by_slug("citron-meringue") #product_manager.update_data_product() #attributes.get_value_attributes() """product = product_manager.get_all_products() for p in product: print(p['id'])""" """product_manager.create(product_list=...) manager = WooCommerceManager(media_manager, category_manager, product_manager) manager.delete_product(slug=...) """ """ #print(f'id = {id}') class ProductManager(OdsReader): def __init__(self, wcapi, ath, medias): super().__init__() self.wcapi = wcapi self.ath = ath self.medias = medias self.media_api_url = f"{url_website}/wp-json/wp/v2/media" self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", "Content-Type": "application/json" } def update_data_list_cat_product(self, list_category_id, list_img_id, product_id): product_data = { 'categories':list_category_id, 'images':list_img_id, } print(f"product_id = {product_id}") self.wcapi.put(f"products/{product_id}", product_data) #self.wcapi.get(f"products/{product_id}") def get_list_media_id_for_product(self, product): media_id = {} list_media_id_for_product = [] list_media_by_doc = [img.strip().replace(' ', '') for img in product['Media Slugs'].split(",")] #pprint.pprint(self.uploaded_images_pro) for id, media_slug in self.medias.items(): for media in list_media_by_doc: if media == media_slug: image_id = id list_media_id_for_product.append(image_id) #print(f"list_image_id_for_product = {list_image_id_for_product}") return list_media_id_for_product def get_list_category_for_product(self, product): response = self.wcapi.get("products/categories") category_list_by_doc = [cat.strip().replace('"', '') for cat in product['Categorie'].split("/")] list_category_for_product = [] for category in response.json(): for cat in category_list_by_doc: #print(f"category['name'] = {category['name']}") #print(f"cat = {cat}") if category['name'] == cat: id_category = {'id':category['id']} list_category_for_product.append(id_category) return list_category_for_product def find_id_by_slug(self, media_slug): for id, slug in self.medias.items(): if media_slug == slug: return id def create_product(self, product): product_data = { 'name' : product['Nom'], #'price': product['Prix'], #'stock_quantity': product['Stock'], #'description': product['Description'], 'short_description': product['Courte Description'], } response = self.wcapi.post("products/", product_data) if response.status_code == 201: product = response.json() return product["id"] else: return None def update(self): json_data = self.get_doc_ods() for product in json_data: product_id = self.create_product(product) list_category_id = self.get_list_category_for_product(product) list_img_id = self.get_list_media_id_for_product(product) self.update_data_list_cat_product(list_category_id, list_img_id, product_id) def delete_product(self): json_data = self.get_doc_ods() for product in json_data: list_products = self.wcapi.get(f"products/") for pro in list_products.json(): print(f"product['Nom'] = {product['Nom']}") print(f"pro['name'] = {pro['name']}") if product['Nom'] == pro['name']: self.wcapi.delete(f"products/{pro['id']}") def delete_img_product(self): list_products = self.wcapi.get(f"products/") for pro in list_products.json(): #print(f"pro['name'] = {pro['name']}") for img_pro in self.uploaded_images_pro: #print(f"img_pro['title'] = {img_pro['title']}") if pro['name'] == img_pro['title']: #print(f"img_pro['id'] = {img_pro['id']}") #print(f"pro['id'] = {pro['id']}") response = requests.post( f"{self.media_api_url}/{img_pro['id']}", headers=self.headers, json={"force": True}, # 🔥 Forcer la suppressio verify=False ) def delete_all_img_product_by_name(self): response = requests.get( f"{self.media_api_url}?per_page=100", headers=self.headers, verify=False ) images = response.json() for img in images: for path_image, name_image, title_image, text_alt, category_name in self.uploaded_images_pro: if img['title']['rendered'] == title_image: print(f"img_id = {img['id']}") print('iiii') response = requests.delete( f"{self.media_api_url}/{img['id']}?force=true", headers=self.headers, verify=False ) print(f"Status Code: {response.status_code}") pprint.pprint(response.json()) def delete_all_img_product(self): response = requests.get( f"{self.media_api_url}?per_page=100", headers=self.headers, verify=False ) images = response.json() for img in images: for img_pro in self.uploaded_images_pro: if img['title']['rendered'] == img_pro['title']: print('coucou') print(f"img_pro['id'] = {img_pro['id']}") print(f"img['id'] = {img['id']}") response = requests.delete( #f"{self.media_api_url}/{img['id']}?force=true" f"{self.media_api_url}/{img_pro['id']}?force=true", headers=self.headers, verify=False ) print(f"Status Code: {response.status_code}") pprint.pprint(response.json()) def delete(self): #self.delete_product() #self.delete_img_product() #self.delete_all_img_product() self.delete_all_img_product_by_name() url_website = "https://lescreationsdemissbleue.local" filename_ods = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" image_manager = ImageManager(images_to_upload,ath,url_website) dict_image = image_manager.update() #category_manager = CategoryManager(wcapi,ath,categories_to_create,url_website,dict_image['category']) #category_manager.update() #category_manager = CategoryManager(wcapi,ath,categories_to_create,url_website,images_to_upload[0]["categories"]) #category_manager.delete() #product_manager = ProductManager(wcapi,ath,filename_ods,dict_image['product']) #product_manager.update() #product_manager = ProductManager(wcapi,ath,filename_ods, images_to_upload[1]["products"]) #product_manager.delete() ALL_MEDIAS=[ { 'slug': None, 'title': None, 'alt_text': None } ] ALL_CATEGORIES=[ { 'name': None, 'parent_name': 'None', 'description': None, 'media_slug': None } ] ALL_PRODUCTS=[ { 'name': None, 'parent_name': 'None', 'description': None, 'media_slug_list': [] } ] """ """api = WoocommerceApi(...) media_manager = MediaManager(api=api) media_manager.upload(media_list=...) media_manager.find_id_by_slug(slug=...) media_manager.delete(slug=...) category_manager = CategoryManager(api=api, medias=media_manager.get_all_as_slug_dict()) category_manager.create(name, short_description, description, ) product_manager = ProductManager(api=api, medias=media_manager.get_all()) product_manager.create(product_list=...) manager = WooCommerceManager(media_manager, category_manager, product_manager) manager.delete_product(slug=...) attribute_manager = AttributeManager(api=api) ALL_ATTRIBUTES = ("Mèche", "Fabrication") attribute_manager.register(ALL_ATTRIBUTES) product_manager.create(product_dict) woocommerce_manager = WooCommerceManager(api) woocommerce_manager.create_product(product_full_dict) # une ligne du tableau "Produits" # prepare data product_data={} product_attributes_data=[] product_tabs_data=[] for k,v in product_dict.items(): if k in ALL_ATTRIBUTES: products_attributes_data.append((k,v)) else if k in ALL_TABS: products_tabs_data.append((k,v)) else: products_data[k]=v # create product and assign attributes id = product_manager.create(product_data) # assign tabs values x=1 for title, content in products_tabs_data: tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local") x=x+1 # assign attributes values for name, value in products_attributes_data: attribute_manager.create_for_product(product_id=id, name=name, value=value) """