Files
missbleue/api_woocomerce_04042025.py
2025-04-07 12:20:48 +02:00

1150 lines
48 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from woocommerce import API as WoocommerceApi
from pathlib import Path
import pandas as pd
import ezodf
import requests
import pprint
import base64
import time
import json
import pyexcel_ods3
import unicodedata
import logging
import os
import time
logger = logging.getLogger(__name__)
# 1⃣ Configurer le logger
logging.basicConfig(
filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés
level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log
datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date
)
# via consumer key and consumer secret :
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
wcapi = WoocommerceApi(
url="https://lescreationsdemissbleue.local",
consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
wp_api=True,
version="wc/v3",
verify_ssl=False, # Désactive la vérification SSL pour le développement
timeout=30
)
class AuthentificationWpApi:
# Identifiants WordPress (et non WooCommerce)
wordpress_username = "admin_lcdm" # Remplace par ton username WordPress
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
# Générer l'authentification Basic en base64
auth_str = f"{wordpress_username}:{wordpress_application_password}"
auth_bytes = auth_str.encode("utf-8")
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
ath = AuthentificationWpApi()
WEBSITE_URL = "https://lescreationsdemissbleue.local"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods"
BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
class OdsReader:
def __init__(self, filename_ods=FILENAME_ODS):
self.filename_ods = filename_ods
def get_all_product_lines(self):
return self.get_doc_ods(2)
def get_product_by_slug_from_ods(self, slug):
for product in self.get_all_product_lines():
if product['Slug'] == slug: return product
return None
def get_all_media_lines(self):
return self.get_doc_ods(0)
def get_all_attribute_and_tab_lines(self):
return self.get_doc_ods(3)
def get_all_category_lines(self):
return self.get_doc_ods(1)
def get_doc_ods(self, number_sheet):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
df = df.dropna(how='all')
json_data = df.to_dict(orient="records")
return json_data
class MediaManager(OdsReader):
def __init__(self, ath):
super().__init__()
self.ath = ath
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings"
def upload_media(self):
json_data = self.get_all_media_lines()
for media in json_data:
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
if not self.is_exists(media, image_name):
with open(BASE_PATH + media['Chemin'], "rb") as image_file:
response = requests.post(
self.media_api_url,
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
files={"file": image_file},
verify=False
)
if response.status_code == 201:
media_data = response.json()
self.update_data_media(media, media_data['id'])
else:
return None
else:
pass
def is_exists(self, media, image_name):
all_images = self.get_all_images()
name_without_extension, extension = os.path.splitext(image_name)
for image in all_images:
if media['Slug'] == image['slug']:
return True
else:
pass
return False
def update_data_media(self, media, id_img):
update_data = {
"title" : media['Nom'],
"alt_text": media['Description'],
"slug": media['Slug'],
}
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
response = requests.post(
f"{self.media_api_url}/{id_img}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
json=update_data,
verify=False
)
if response.status_code == 200:
return response.json()
else:
return None
def find_id_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
return img['id']
def get_all_as_slug_dict(self):
all_slug_dict = {}
images = self.get_all_images()
for img in images:
all_slug_dict[img['id']] = img['slug']
return all_slug_dict
def delete_media_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
delete_url = f"{self.media_api_url}/{img['id']}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
def get_all_images(self):
"""Récupère toutes les images en gérant la pagination"""
all_images = []
page = 1
while True:
response = requests.get(f"{self.media_api_url}?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
if response.status_code != 200:
break
images = response.json()
if not images:
break
all_images.extend(images)
page += 1
return all_images
def delete_images(self, images):
"""Supprime toutes les images récupérées"""
for img in images:
img_id = img['id']
delete_url = f"{self.media_api_url}/{img_id}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
def delete_all_images(self):
images = self.get_all_images()
for img in images:
img_id = img['id']
delete_url = f"{self.media_api_url}/{img_id}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
def assign_image_logo(self):
images = self.get_all_images()
for img in images:
if img['slug'] == "logo-lescreationsdemissbleue":
data = {
"site_logo":img['id'],
"site_icon" : img['id']
}
response = requests.post(
self.media_api_settings,
json=data,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
if response.status_code == 200:
print("Logo mis à jour avec succès !")
else:
print(f"Erreur lors de la mise à jour du logo : {response.text}")
class CategoryManager(OdsReader):
def __init__(self, wcapi, ath, medias=None):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == slug:
return cat['id']
def create_category(self, name, description, slug):
category_data = {
"name": name,
"description": description,
"slug":slug
}
if self.find_id_by_slug(slug):
self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà")
else:
self.wcapi.post("products/categories/", category_data)
def assign_parent_category(self, parent_slug, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
parent_id = self.find_id_by_parent_slug(parent_slug)
if parent_id:
if cat['slug'] == slug:
self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id})
def find_id_by_parent_slug(self, parent_slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == parent_slug:
return cat['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def update_media_id_for_category(self, media_id, cat_id):
response = requests.get(f"{self.media_api_url}/{media_id}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
update_category_data = {
"image" : {'id':media_id},
}
self.wcapi.put(f"products/categories/{cat_id}", update_category_data)
def update_data_categories(self):
json_data = self.get_all_category_lines()
for category in json_data:
self.create_category(category['Nom'], category['Description'], category['Slug'])
cat_id = self.find_id_by_slug(category['Slug'])
media_id = self.find_media_id_by_slug(category['Media Slug'])
self.assign_parent_category(category['Parent Slug'], category['Slug'])
self.update_media_id_for_category(media_id,cat_id)
def delete_all_category(self):
response = self.wcapi.get(f"products/categories",params={"per_page": 100})
for cat in response.json():
self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True})
def delete_media_category(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
def delete_category_by_id(self, category_id):
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def delete_category_by_slug(self, slug):
category_id = self.find_id_by_slug(slug)
#print(f"category_id = {category_id}")
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def get_errors(self):
return print(f"self.error_log = {self.error_log}")
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, medias=None):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
product_data = {
'categories':list_category_id,
'images':list_img_id,
}
self.wcapi.put(f"products/{product_id}", product_data)
def get_list_media_id_for_product(self, medias):
list_media_id_for_product = []
for id, media_slug in self.medias.items():
for media in medias:
if media == media_slug:
image_id = {'id':id}
list_media_id_for_product.append(image_id)
return list_media_id_for_product[::-1]
def get_list_category_for_product(self, categories):
response = self.wcapi.get("products/categories",params={"per_page": 100})
list_category_for_product = []
for category in response.json():
for cat in categories:
if category['name'] == cat:
id_category = {'id':category['id']}
list_category_for_product.append(id_category)
return list_category_for_product
def find_product_by_id(self, id):
response = self.wcapi.get(f"products/{id}")
if response.status_code == 200:
product = response.json()
return product
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/",params={"per_page": 100})
if response.status_code == 200:
products = response.json()
for pro in products:
if pro['slug'] == slug:
return pro['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def create_tabs_from_custom_dict(self, product_id, product):
product_tabs_data = {}
list_product_tabs_data = []
x = 1
for key in product.keys():
if key == "Conseils dutilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes":
product_tabs_data['title'] = key
product_tabs_data['content'] = product[key]
product_tabs_data['nickname'] = ''
product_tabs_data['position'] = x
product_tabs_data['tab_type'] = 'local'
list_product_tabs_data.append(product_tabs_data)
product_tabs_data = {}
x += 1
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
meta_data = []
meta_data.append(
{'key': 'wb_custom_tabs', 'value': list_product_tabs_data}
)
meta_data_data = {
'meta_data': meta_data
}
res = self.wcapi.post(f"products/{product_id}", meta_data_data)
else:
print(f"error")
def create_product(self, product_data):
if self.find_id_by_slug(product_data['slug']):
self.error_log.append(f"Produit contenant comme slug '{product_data['slug']}' existe déjà")
else:
response = self.wcapi.post("products/", product_data)
def update_data_product(self, product_data, categories, medias):
json_data = self.get_all_product_lines()
for product in json_data:
self.create_product(product_data)
product_id = self.find_id_by_slug(product_data['slug'])
list_category_id = self.get_list_category_for_product(categories)
list_img_id = self.get_list_media_id_for_product(medias)
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
def update_data_product_by_slug(self, slug):
json_data = self.get_all_product_lines()
for product in json_data:
if product['Slug'] == slug:
self.create_product(product)
product_id = self.find_id_by_slug(product['Slug'])
list_category_id = self.get_list_category_for_product(product['Catégories'])
list_img_id = self.get_list_media_id_for_product(product['Media Slugs'])
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
def get_all_products(self):
"""Récupère tous les produits en gérant la pagination"""
all_products = []
page = 1
while True:
response = self.wcapi.get("products", params={"per_page": 100, "page": page})
if response.status_code != 200:
print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}")
break
products = response.json()
if not products: # Si la page est vide, on arrête la boucle
break
all_products.extend(products)
page += 1 # On passe à la page suivante
return all_products
def delete_product(self):
json_data = self.get_all_product_lines()
for product in json_data:
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
if product['Nom'] == pro['name']:
self.wcapi.delete(f"products/{pro['id']}")
def delete_all_product(self):
products = self.get_all_products()
if products:
for pro in products:
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})
def delete_media_product(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
def delete_product_by_id(self, product_id):
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def delete_product_by_slug(self, slug):
product_id = self.find_id_by_slug(slug)
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def normalize_string(text):
return unicodedata.normalize("NFKC", text).strip().lower()
def tab_exists(self, product_id, name_tab):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
response_json = self.wcapi.get(f"products/{product_id}").json()
for meta_data in response_json['meta_data']:
for key_meta_data, value_meta_data in meta_data.items():
if key_meta_data == "value":
if isinstance(value_meta_data, list):
for tab in value_meta_data:
if name_tab == tab['title']:
return True
return False
class AttributeManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_attributes(self):
attributes = self.wcapi.get(f"products/attributes").json()
one_attribute = self.wcapi.get(f"products/attributes/1/terms").json()
return attributes
def get_by_name(self, name):
attributes = self.wcapi.get(f"products/attributes").json()
for attr in attributes:
if attr['name'] == name:
attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json()
return attribute
def get_list_name_data(self):
list_name_data = []
json_data = self.get_all_attribute_and_tab_lines()
for item in json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create(self):
features_json_data = self.get_all_attribute_and_tab_lines()
for item in features_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
attribute_data = {
'name' : item["Nom"]
}
self.wcapi.post(f"products/attributes", attribute_data)
def get_term(self):
term_dict = {}
list_item = []
term_json_data = self.get_all_attribute_and_tab_lines()
for item in term_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
if "," in item["Valeurs"]:
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
else:
item['Valeurs'].strip()
if list_item:
term_dict[item['Nom']] = list_item
else:
term_dict[item['Nom']] = item['Valeurs']
return term_dict
def configure_term(self):
term_dict = self.get_term()
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
for name, value in term_dict.items():
if attribute['name'] == name:
if isinstance(value, list):
for v in value:
term = {
'name' : v
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
else:
term = {
'name' : value
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
def create_for_product(self, product_id, name, value, variation=False):
data_attribute = {
'name': name,
'options':value
}
#list_product_tabs_data.append(data_tab)
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_attributes_data = product_meta_data.get("attributes", [])
already_exist = False
for data in existing_attributes_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['name'] == name:
already_exist = True
if already_exist == False:
found = False
for attribute in existing_attributes_data:
if attribute["name"] == name:
attribute["options"].append(data_attribute)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_attributes_data.append({
"name": name,
"options": [value],
"visible":True,
"variation": variation,
#"parent_id":product_id
})
attributes_data = {
'attributes': existing_attributes_data
}
res = self.wcapi.put(f"products/{product_id}", attributes_data)
else:
print('already_exist')
else:
print(f"error")
def delete_all_for_product(self):
response_product = self.wcapi.get(f"products/", params={"per_page": 100})
if response_product.status_code == 200:
products = response_product.json()
for product in products:
existing_attributes_data = product.get("attributes", [])
if existing_attributes_data == []:
pass
else:
attribute_data = {
'attributes': []
}
res = self.wcapi.post(f"products/{product['id']}", attribute_data)
def delete_all_term(self):
response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response_attribute.status_code == 200:
attributes = response_attribute.json()
for attribute in attributes:
response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100})
if response_attribute_term.status_code == 200:
attributes_term = response_attribute_term.json()
for term in attributes_term:
self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True})
def delete_all(self):
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True})
class TabManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_list_name_data(self):
list_name_data = []
json_data = self.get_all_attribute_and_tab_lines()
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create_or_update_for_product(self, product_id, tabs):
position = 1
for title, content in tabs.items():
position += 1
data_tab = {
'title': title,
'content':content,
'nickname':'',
'position':position,
'tab_type': 'local'
}
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
already_exist = False
for data in existing_meta_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['title'] == title:
already_exist = True
if already_exist == False:
found = False
for meta in existing_meta_data:
if meta["key"] == "wb_custom_tabs":
meta["value"].append(data_tab)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_meta_data.append({
"key": "wb_custom_tabs",
"value": [data_tab]
})
meta_data_data = {
'meta_data': existing_meta_data
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
print('else')
data_tab = {
'content':content,
}
meta_data_data = {
'meta_data': existing_meta_data
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
print(f"error")
def delete_by_product_id(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product_id}", meta_data)
def delete_all(self):
response = self.wcapi.get(f"products/", params={"per_page": 100})
if response.status_code == 200:
product_meta_data = response.json()
for product in product_meta_data:
existing_meta_data = product.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product['id']}", meta_data)
class VariationsManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_attribute_id(self, product_data):
response = self.wcapi.get(f"products/attributes")
if response.status_code == 200:
attributes = response.json()
for key, value in product_data.items():
for attr_key, attr_value in attributes.items():
if attr_value['name'] == key:
attribute_id = attr_value['id']
return attribute_id
def update_product_attributes_merged(self, wcapi, product_id, attribute_name, new_options):
"""
Met à jour l'attribut d'un produit WooCommerce en ajoutant de nouvelles options,
sans écraser les autres attributs existants.
:param wcapi: Instance API WooCommerce (wcapi = API(...))
:param product_id: ID du produit à mettre à jour
:param attribute_name: Nom de l'attribut à enrichir (ex: "Parfums")
:param new_options: Liste des nouvelles valeurs à ajouter (ex: ["Lavande", "Citron"])
"""
# Nettoyer les nouvelles options
new_options = [opt.strip() for opt in new_options.split('|') if opt.strip()]
# 1. Récupérer le produit existant
response = wcapi.get(f"products/{product_id}")
if response.status_code != 200:
print(f"❌ Impossible de récupérer le produit {product_id}")
return
product = response.json()
attributes = product.get("attributes", [])
# 2. Chercher l'attribut ciblé
found = False
for attr in attributes:
if attr["name"].lower() == attribute_name.lower():
existing_options = attr.get("options", [])
merged_options = list(set(existing_options + new_options))
attr["options"] = merged_options
attr["variation"] = True
attr["visible"] = True
attr["parent_id"] = product_id
attr["manage_stock"] = "parent"
found = True
break
# 3. Si l'attribut n'existe pas, on l'ajoute
if not found:
attributes.append({
"name": attribute_name,
"variation": True,
"visible": True,
"options": new_options
})
# 4. Mettre à jour le produit avec les attributs fusionnés
update_data = {
"attributes": attributes
}
update_res = wcapi.put(f"products/{product_id}", update_data)
if update_res.status_code == 200:
print(f"✅ Attribut '{attribute_name}' mis à jour avec succès.")
else:
print(f"❌ Erreur lors de la mise à jour : {update_res.status_code}")
print(update_res.json())
def create_variations_products(self, product_id, product_data):
#products_lines = self.get_all_product_lines()
product_line = self.get_product_by_slug_from_ods(product_data['slug'])
for product_line_key, products_line_value in product_line.items():
if product_line_key == "Parfums":
name_attribute = product_line_key
parfums = products_line_value
if product_line_key == "Type":
if product_data['type'] == "variable":
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
existing_product = response.json()
self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums)
parfums = [p.strip() for p in parfums.split("|") if p.strip()]
response = self.wcapi.get(f"products/{product_id}/variations")
if response.status_code == 200:
for parfum in parfums:
data = {
'attributes': [
{
'name': name_attribute,
'option': parfum
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': product_data['price'],
}
print(f"Posting variation: {data}")
result = self.wcapi.post(f"products/{product_id}/variations", data)
print(result.status_code)
pprint.pprint(result.json())
else:
return False
class WooCommerceManager(OdsReader):
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager):
super().__init__()
self.wcapi = wcapi
self.media_manager = media_manager
self.category_manager = category_manager
self.product_manager = product_manager
self.tab_manager = tab_manager
self.attribute_manager = attribute_manager
self.variation_manager = variation_manager
def tab_exists(self, product_id, name_tab):
return self.product_manager.tab_exists(product_id, name_tab)
def get_product_tab_details(self):
all_products_json = self.get_all_attribute_and_tab_lines()
all_tabs = self.tab_manager.get_list_name_data()
dict = {}
for product in all_products_json:
line = []
for tab in all_tabs:
line.append([tab, product[tab]])
dict[product["Parfum"]] = line
return dict
def get_product_attributes_details(self):
ret = []
all_products_json = self.get_all_product_lines()
all_attributes = self.attribute_manager.get_list_name_data()
for product in all_products_json:
for attribute in all_attributes:
ret.append([attribute, product[attribute]])
return ret
def update_product_tab_by_slug(self, slug):
product_id = self.product_manager.find_id_by_slug(slug)
product = self.product_manager.find_product_by_id(product_id)
products_tab_details = self.get_product_tab_details()
x=1
for value in products_tab_details.values():
for key in products_tab_details.keys():
for title, content in value:
if key:
if key in product['short_description']:
tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
else:
pass
else:
print('no key')
x=1
def update_product_attribute_by_slug(self, slug):
product_id = self.product_manager.find_id_by_slug(slug)
product_ods = self.get_product_by_slug_from_ods(slug)
products_attribute_details = self.get_product_attributes_details()
for name, value in products_attribute_details:
self.attribute_manager.create_for_product(product_id=product_id,
name=name, value=value,
variation=self.is_variable(product_ods['Type']))
def update_product(self):
#self.product_manager.update_data_product()
self.update_product_tab()
#self.update_product_attribute()
"""def update_product_by_slug(self):
self.product_manager.update_data_product()
self.update_product_tab()
self.update_product_attribute()"""
def update_product_variation(self, product_id, product_data):
pass
def update_product_by_slug(self, slug):
self.product_manager.update_data_product_by_slug(slug)
self.update_product_tab_by_slug(slug)
#self.update_product_attribute_by_slug(slug)
def create_all_informations(self):
#medias = self.media_manager.get_all_as_slug_dict()
#self.product_manager.medias = medias
#self.update_product_by_slug("chope-citron-meringue")
#self.media_manager.upload_media()
#self.media_manager.assign_image_logo()
medias = self.media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
#self.category_manager.medias = medias
#self.category_manager.update_data_categories()
#self.attribute_manager.create()
#self.attribute_manager.configure_term()
self.process_file(FILENAME_ODS)
self.update_product()
def get_list_category_for_product(self, category):
category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")]
return category_list_by_doc
def get_list_media_id_for_product(self, media):
list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")]
return list_media_by_doc
def is_variable(self, type):
return type.lower() == "parfums"
def update_product_attribute(self, attributes, product_data):
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
for name, value in attributes.items():
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data['type']))
def update_product_variations(self, product_data):
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
self.variation_manager.create_variations_products(product_id, product_data)
def update_product_tab(self, product_data):
for product in product_data:
self.update_product_tab_by_id(product['id'])
def create_or_update_product(self, product_data, attributes, tabs, categories, medias):
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias)
self.update_product_attribute(attributes=attributes, product_data=product_data)
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
self.update_product_variations(product_data)
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
def process_file(self, filename):
# refresh media cache
medias = media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
# read provided file
reader = OdsReader(filename)
for product_line in reader.get_all_product_lines():
# standard product data
product_data = {
'name' : product_line['Nom'],
'price': product_line['Prix'],
'regular_price': product_line['Prix'],
'stock_quantity': product_line['Stock'],
'manage_stock':True,
'weight':str(product_line['Poids']),
'sku':str(product_line['Numéro de référence']),
'description': product_line['Description'],
'short_description': product_line['Courte Description'],
'slug':product_line['Slug']
}
if product_line['Type'] == "parfums":
product_data['type'] = "variable"
else:
product_data['type'] = "simple"
attributes = {
"Temps de combustion" : product_line['Temps de combustion'],
"Type de cire" : product_line['Type de cire'],
"Mèche" : product_line['Mèche'],
"Fabrication" : product_line['Fabrication'],
"Composition" : product_line['Composition'],
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
"Parfums" : product_line['Parfums']
}
tabs ={
#"Description" : product_line["Description"],
"Conseils d'utilisation" : product_line["Conseils dutilisation"],
"Précautions articles" : product_line["Précautions articles"],
#"Allergènes" : product_line["Allergènes"]
}
# ... associated categories
categories = self.get_list_category_for_product(product_line['Catégories'])
# ... associated medias
medias = self.get_list_media_id_for_product(product_line['Media Slugs'])
# create or update product
self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias)
"""def put_social_data(self):
response = requests.post(url,
auth=HTTPBasicAuth("consumer_key", "consumer_secret"),
json={
"acf": {
"instagram_url": "https://instagram.com/ton_compte"
}
}
)"""
def delete_all_informations(self):
self.media_manager.delete_all_images()
self.attribute_manager.delete_all()
self.product_manager.delete_all_product()
self.category_manager.delete_all_category()
def delete_information_by_slug(self):
product_manager.delete_product_by_slug("chope-adoucissant")
#category_manager.delete_all_category()
#ALL_TABS = ["Allergènes", "Conseils dutilisation", "Description", "Précautions articles"]
#ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"]
media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
#media_manager.delete_all_images()
#media_manager.assign_image_logo()
category_manager = CategoryManager(wcapi=wcapi,ath=ath)
#category_manager.delete_all_category()
product_manager = ProductManager(wcapi=wcapi,ath=ath)
#product_manager.delete_all_product()
#medias=media_manager.get_all_as_slug_dict()
#media_manager.delete_media_by_slug('pyramide-olfactive-frangipanier')
#product_manager.delete_product_by_slug("citron-meringue")
#product_manager.update_data_product()
tab_manager = TabManager(wcapi=wcapi)
attribute_manager = AttributeManager(wcapi=wcapi)
variation_manager = VariationsManager(wcapi=wcapi)
#attribute_manager.create(ALL_ATTRIBUTES)
#attribute_manager.create()
#attribute_manager.configure_term()
#attribute_manager.delete_all_term()
#product_id = product_manager.find_id_by_slug("citron-meringue")"""
woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager, variation_manager=variation_manager)
##woocommerce_manager.delete_all_informations() #
woocommerce_manager.create_all_informations()
##woocommerce_manager.process_file(FILENAME_ODS)
#category_manager.update_data_categories()
#woocommerce_manager.delete_all_informations()
#woocommerce_manager.delete_information_by_slug()
#woocommerce_manager.create_all_informations()
#woocommerce_manager.create_all_categories_and_products()
#woocommerce_manager.update_product_tab()
#woocommerce_manager.tab_manager.delete_by_product_id(1890)
#woocommerce_manager.tab_manager.delete_all()
#woocommerce_manager.update_product()
#woocommerce_manager.attribute_manager.delete_all_for_product()
#woocommerce_manager.update_product_attribute_by_slug('citron-meringue')
#woocommerce_manager.attribute_manager.delete_all_for_product()
"""tabs_in_product = []
for tab in ALL_TABS:
tab_in_product = woocommerce_manager.tab_exists(1890, tab)
tabs_in_product.append(tab_in_product)"""
"""
utilisation
module argparse
# on va appeler ça importation d'un fichier ods, d'où l'action import-ods
# on va appeler cette commande, "la commande de base"
wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key=<consumer_key> --wc-secret=<consumer_secret> import-ods --ods-path=fichier.ods
# traitement de l'intégralité d'un fichier ods
... --all
# traitement des medias seulement, on peut en option spécifier une plage de média à importer
... --medias [--media-range=1:40]
plu tard ...
# traitement des catégories seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de la catégorie
... --categories [--categories-regex=<regex>]
ex: traiter uniquement les catégories dont le nom contient le terme "bougie"
... --categories [--categories-regex=.*bougie.*]
# traitement des articles seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de l'article'
# ... --products [--products-regex=<regex>]
ex: traiter uniquement les articles dont le nom contient le terme "bougie"
... --categories [--products-regex=.*bougie.*]
"""