Files
missbleue/api_woocommerce.py

1988 lines
84 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from woocommerce import API as WoocommerceApi
from pathlib import Path
import pandas as pd
import ezodf
import requests
import pprint
import base64
import unicodedata
import os
from watermark import create_watermark_image
from base64 import b64encode
import logging
logger = logging.getLogger(__name__)
# 🧪 Test
"""logger.debug("Démarrage du programme (DEBUG)")
logger.info("Traitement en cours (INFO)")
logger.warning("Avertissement (WARNING)")
logger.error("Erreur (ERROR)")
logger.critical("Erreur critique (CRITICAL)")"""
# via consumer key and consumer secret :
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
#url="https://lescreationsdemissbleue.local",
#url="https://les-creations-de-missbleue.local",
#consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
#consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
class AuthentificationWpApi():
# Identifiants WordPress (et non WooCommerce)
wordpress_username = "admin_lcdm" # Remplace par ton username WordPress
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #'W6Zt N5CU 2Gj6 TlKm clGn LvIz' #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
# Générer l'authentification Basic en base64
auth_str = f"{wordpress_username}:{wordpress_application_password}"
auth_bytes = auth_str.encode("utf-8")
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
ath = AuthentificationWpApi()
WEBSITE_URL = "https://les-creations-de-missbleue.local"
#WEBSITE_URL = "https://les-creations-de-missbleue.com"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods"
#BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
#BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
BASE_PATH = "photos/photos_site/Photos_site/"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_version_finale.ods"
class OdsReader:
def __init__(self, filename_ods=FILENAME_ODS):
self.filename_ods = filename_ods
def get_all_product_lines(self):
return self.get_doc_ods(2)
def fetch_all_product_rows(self, start, end=None):
return self.extract_ods_row(2, start, end)
def get_product_line_by_value(self, search_value):
return self.get_doc_ods_by_value(2, search_value)
def get_product_by_slug_from_ods(self, slug):
for product in self.get_all_product_lines():
if product['Slug'] == slug: return product
return None
def get_all_media_lines(self):
return self.get_doc_ods(0)
def fetch_all_media_rows(self, start, end=None):
return self.extract_ods_row(0, start, end)
def get_media_line_by_value(self, search_value):
return self.get_doc_ods_by_value(0, search_value)
def get_all_attribute_and_tab_lines(self):
return self.get_doc_ods(3)
def get_attribute_and_tab_lines(self, search_value):
return self.get_doc_ods_by_value(3, search_value)
def get_all_category_lines(self):
return self.get_doc_ods(1)
def get_category_line_by_value(self, search_value):
return self.get_doc_ods_by_value(1, search_value)
def get_all_seo_lines(self):
return self.get_doc_ods(6)
def get_doc_ods(self, number_sheet):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
df = df.dropna(how='all')
json_data = df.to_dict(orient="records")
return json_data
def get_doc_ods_by_value(self, number_sheet, search_value=None):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
df = df.dropna(how='all')
if search_value:
try:
print(f"Recherche de la valeur : {search_value}")
# Vérifier que le DataFrame n'est pas vide
if df.empty:
raise ValueError("Le DataFrame est vide")
# Nettoyer le search_value pour enlever les espaces superflus
search_value = str(search_value).strip()
# Dynamique sur la colonne à rechercher
column_name = 'Nom' # à modifier selon la situation
if column_name not in df.columns:
raise ValueError(f"La colonne '{column_name}' n'existe pas dans le DataFrame")
# Supprimer les espaces avant et après dans la colonne cible
df[column_name] = df[column_name].str.strip()
# Remplir les NaN par des chaînes vides
df[column_name] = df[column_name].fillna('')
# Recherche avec contains sur la colonne
mask = df[column_name].str.contains(str(search_value), case=False, na=False)
#print(f"Masque généré :\n{mask}")
if mask.sum() == 0: # Si aucune ligne ne correspond
raise ValueError(f"Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'")
# Filtrage du DataFrame
df = df[mask]
#print(f"df après filtrage :\n{df}")
except ValueError as ve:
#print(f"Erreur : {ve}")
logger.exception(f"🚫 Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'")
except Exception as e:
#print(f"Erreur lors de la recherche : {e}")
logger.exception(f"🚫 Erreur lors de la recherche de '{search_value}' dans la colonne '{column_name}'. Exception : {e}")
else:
print("Aucun search_value fourni")
# Convertir en json_data pour le retour
json_data = df.to_dict(orient="records")
return json_data
def extract_ods_row(self, number_sheet, start_row=None, end_row=None):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
logger.debug(f'extract_ods_row: {len(data)} rows found in ods sheet #{number_sheet}')
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
if start_row is not None and end_row is not None:
df = df.iloc[start_row:end_row]
elif start_row is not None:
df = df.iloc[start_row:]
elif end_row is not None:
df = df.iloc[:end_row]
"""# Voir tous les noms de colonnes
print(df.columns.tolist())
# Compter les doublons de noms
from collections import Counter
print([item for item, count in Counter(df.columns).items() if count > 1])
# Nettoyer les noms de colonnes (supprimer espaces, etc.)
df.columns = df.columns.str.strip()"""
df = df.dropna(how='all')
return df.to_dict(orient="records")
class MediaManager(OdsReader):
def __init__(self, ath, wcapi, filename_ods):# filename_ods
super().__init__(filename_ods) # filename_ods
self.ath = ath
self.wcapi = wcapi
#self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
#self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings"
self.dict_equivalence = {'ambre':'ambré', 'meringue':'meringué', 'givree':'givrée', 'sale':'salé', 'bresilien':'brésilien', 'epices':'épices', 'noel':'noël', 'a':'à', 'petales':'pétales',
'lumiere':'lumière', 'allumee':'allumée', 'eteinte':'éteinte', 'celebration':'célébration', 'argente':'argenté', 'dore':'doré', 'accroche':'accroché', 'pose':'posé', 'colore':'coloré',
'kevin': 'Kévin', 'interieur':'intérieur', 'cafe':'café', 'bresil':'Brésil', 'dagrumes': "d'agrumes", "iles":"îles", 'apero': 'apéro', 'quebecois':'québecois', 'defendu':'défendu',
'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé', 'peche' : 'pêche'
}
def upload_media(self, search_value=None):
if search_value:
json_data = self.get_media_line_by_value(search_value)
else:
json_data = self.get_all_media_lines()
for media in json_data:
media_chemin = media['Chemin'].replace("\\", "/")
path = Path(BASE_PATH + media_chemin)
image_name = path.name
try:
if not self.is_exists(media, image_name):
image_path = BASE_PATH + media_chemin
# 👇 Tentative d'ouverture et d'envoi
with open(image_path, "rb") as image_file:
response = self.wcapi.post("media",files={"file": image_file})
if response.status_code == 201:
media_data = response.json()
self.update_data_media(media, media_data['id'])
logger.info(f"✅ Image uploadée : {image_name}")
else:
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}")
else:
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
except requests.RequestException as e:
logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}")
except Exception as e:
logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}")
def create_and_update_media(self, media, image_name, path, watermark=False):
#print(f"image_path = {path}")
try:
if not self.is_exists(media, image_name):
if watermark:
image_path = path
else:
media_chemin = media['Chemin'].replace("\\", "/")
image_path = BASE_PATH + media_chemin
# 👇 Tentative d'ouverture et d'envoi
with open(image_path, "rb") as image_file:
response = self.wcapi.post("media",files={"file": image_file})
#print(f"response = {response.status_code}")
if response.status_code == 201:
media_data = response.json()
self.update_data_media(media, media_data['id'])
logger.info(f"✅ Image uploadée : {image_name}")
else:
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}")
else:
if self.is_txt_alt_exists(media, image_name):
media_id = self.is_txt_alt_exists(media, image_name)
self.update_data_media(media, media_id)
logger.info(f"✅ Image déjà existante et mise à jour : {image_name}")
else:
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
except requests.RequestException as e:
logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}")
except Exception as e:
logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}")
def upload_media_from_to(self, range_start, range_end=None):
json_data = self.fetch_all_media_rows(range_start, range_end)
logger.debug(f'{len(json_data)} to process')
for media in json_data:
media_chemin = media['Chemin'].replace("\\", "/")
path = Path(BASE_PATH + media_chemin)
image_name = path.name
#first_folder = media['Chemin'].split("\\")[0]
first_folder = media_chemin.split("/")[0]
watermarked_path = Path(create_watermark_image(str(path)))
watermarked_name = watermarked_path.name
if first_folder == 'Logo':
self.create_and_update_media(media,image_name,path)
else:
self.create_and_update_media(media, watermarked_name, watermarked_path, True)
try:
os.remove(watermarked_path)
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
def is_exists(self, media, image_name):
all_images = self.get_all_images()
name_without_extension, extension = os.path.splitext(image_name)
for image in all_images:
if media['Slug'] == image['slug']:
return True
else:
pass
return False
def is_txt_alt_exists(self, media, image_name):
all_images = self.get_all_images()
name_without_extension, extension = os.path.splitext(image_name)
for image in all_images:
if media['Slug'] == image['slug']:
if image['alt_text'] == "":
return image['id']
else:
pass
return False
def get_title_and_alt_text_media(self, media):
sentence = media['Slug'].replace('-', ' ')
sentence = sentence.replace('img', '').strip()
title = sentence.capitalize()
alt_text = title
return title, alt_text
def update_accent_in_sentence(self, sentence):
words = sentence.split()
new_words = [self.dict_equivalence[word.lower()] if word.lower() in self.dict_equivalence else word for word in words]
new_sentence = " ".join(new_words)
return new_sentence
def update_data_media(self, media, id_img):
if media['Nom'] is None or media['Description'] is None:
title, alt_text = self.get_title_and_alt_text_media(media)
else:
title = media['Nom']
alt_text = media['Description']
title = self.update_accent_in_sentence(title)
alt_text = self.update_accent_in_sentence(alt_text)
update_data = {
"title" : title,
"alt_text": alt_text,
"slug": media['Slug'],
}
media_chemin = media['Chemin'].replace("\\", "/")
path = Path(BASE_PATH + media_chemin)
image_name = path.name
response = self.wcapi.post(f"media/{id_img}", data=update_data)
"""response = self.wcapi.post(
f"media/{id_img}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
#"Authorization": f"Basic {self.ath['auth_base64']}",
"Content-Disposition": f"attachment; filename={image_name}",
"User-Agent": "Mozilla/5.0"
},
json=update_data,
verify=False
)"""
if response.status_code == 200:
return response.json()
else:
return None
def find_id_by_name(self, name):
images = self.get_all_images()
for img in images:
#pprint.pprint(img)
if img['title']['rendered'] == name:
return img['id']
def find_id_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
return img['id']
def get_all_as_slug_dict(self):
all_slug_dict = {}
images = self.get_all_images()
for img in images:
all_slug_dict[img['id']] = img['slug']
return all_slug_dict
def delete_media_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
self.wcapi.delete(f"media/{img['id']}", params = {"force": True})
def get_all_images(self):
"""Récupère toutes les images en gérant la pagination"""
all_images = []
page = 1
#print(f"self.ath.auth_base64 = {self.ath.auth_base64}")
while True:
"""response = self.wcapi.get(f"wp-json/wp/v2/media?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}",
"User-Agent": "Mozilla/5.0"},
verify=False
)"""
response = self.wcapi.get("media", params={"per_page": 100, "page": page})
#print(f"response.status_code = {response.status_code}")
if response.status_code != 200:
break
images = response.json()
if not images:
break
all_images.extend(images)
page += 1
return all_images
def delete_images(self, images):
"""Supprime toutes les images récupérées"""
for img in images:
img_id = img['id']
response = self.wcapi.delete(f"media/{img_id}")
"""delete_url = f"media/{img_id}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}",
"User-Agent": "Mozilla/5.0"},
#{"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False)"""
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
def delete_all_images(self):
images = self.get_all_images()
for img in images:
img_id = img['id']
response = self.wcapi.delete(f"media/{img_id}")
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
def assign_image_logo(self):
images = self.get_all_images()
for img in images:
if img['slug'] == "img-logo-lescreationsdemissbleue":
data = {
"site_logo":img['id'],
"site_icon" : img['id']
}
response = self.wcapi.post("settings", data=data)
if response.status_code == 200:
print("Logo mis à jour avec succès !")
else:
print(f"Erreur lors de la mise à jour du logo : {response.text}")
class CategoryManager(OdsReader):
def __init__(self, wcapi, ath, filename_ods, medias=None):
super().__init__(filename_ods)
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == slug:
return cat['id']
def find_id_by_name(self, name):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['name'] == name:
return cat['id']
def create_category(self, name, description, slug):
category_data = {
"name": name,
"description": description,
"slug":slug
}
if self.find_id_by_slug(slug):
logger.debug(f"Catégorie contenant comme slug '{slug}' existe déjà")
else:
try:
response = self.wcapi.post("products/categories/", category_data)
if response.status_code == 201:
logger.info(f"Catégorie créé avec succès. ID: {response.json()['id']}")
else:
logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}")
except Exception as e:
logger.error(f"Erreur inattendue lors de l'envoi de la catégorie à WooCommerce: {e}")
def assign_parent_category(self, parent_slug, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
parent_id = self.find_id_by_parent_slug(parent_slug)
if parent_id:
if cat['slug'] == slug:
self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id})
def find_id_by_parent_slug(self, parent_slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == parent_slug:
return cat['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def update_media_id_for_category(self, media_id, cat_id):
response = self.wcapi.get(f"media/{media_id}", params={"per_page": 1, "page": 1})
update_category_data = {
"image" : {'id':media_id},
}
self.wcapi.put(f"products/categories/{cat_id}", update_category_data)
def update_data_categories(self, search_value=None):
if search_value:
json_data = self.get_category_line_by_value(search_value)
else:
json_data = self.get_all_category_lines()
for category in json_data:
self.create_category(category['Nom'], category['Description'], category['Slug'])
cat_id = self.find_id_by_slug(category['Slug'])
media_id = self.find_media_id_by_slug(category['Media Slug'])
self.assign_parent_category(category['Parent Slug'], category['Slug'])
self.update_media_id_for_category(media_id,cat_id)
def delete_all_category(self):
response = self.wcapi.get(f"products/categories",params={"per_page": 100})
for cat in response.json():
self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True})
def delete_media_category(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
self.wcapi.delete(f"media/{media_id}")
def delete_category_by_id(self, category_id):
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def delete_category_by_slug(self, slug):
category_id = self.find_id_by_slug(slug)
#print(f"category_id = {category_id}")
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def get_errors(self):
return print(f"self.error_log = {self.error_log}")
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, filename_ods, medias=None):
super().__init__(filename_ods)
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0"
}
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
product_data = {
'categories':list_category_id,
'images':list_img_id,
}
self.wcapi.put(f"products/{product_id}", product_data)
def update_data_list_category_product(self, product_id, list_category_id):
#print(f"list_category_id = {list_category_id}")
categories = [{'id': cat_id} for cat_id in list_category_id if cat_id is not None]
product_data = {
'categories': categories,
}
self.wcapi.put(f"products/{product_id}", product_data)
def update_data_list_medias_product(self, product_id, list_medias_id):
images = [{'id': media_id} for media_id in list_medias_id if media_id is not None]
product_data = {
'images':images,
}
logger.debug(f"{product_data} - {product_id}")
self.wcapi.put(f"products/{product_id}", product_data)
def get_list_media_id_for_product(self, medias):
list_media_id_for_product = []
for id, media_slug in self.medias.items():
for media in medias:
if media == media_slug:
image_id = {'id':id}
list_media_id_for_product.append(image_id)
return list_media_id_for_product[::-1]
def get_list_category_for_product(self, categories):
response = self.wcapi.get("products/categories",params={"per_page": 100})
list_category_for_product = []
#logger.debug(f"response.json() = {response.json()}")
try :
for category in response.json():
for cat in categories:
if category['name'] == cat:
id_category = {'id':category['id']}
list_category_for_product.append(id_category)
#logger.debug(f"list_category_for_product = {list_category_for_product}")
return list_category_for_product
except requests.exceptions.JSONDecodeError as e:
logger.debug(f"text = {response.text}")
raise e
def find_product_by_id(self, id):
response = self.wcapi.get(f"products/{id}")
if response.status_code == 200:
product = response.json()
return product
def find_id_by_slug(self, slug):
response = self.wcapi.get("products",params={"slug": slug})
if response.status_code == 200:
products = response.json()
for pro in products:
if pro['slug'] == slug:
return pro['id']
return None
"""def find_id_by_slug(self, slug):
response = self.wcapi.get("products/",params={"per_page": 100})
if response.status_code == 200:
products = response.json()
for pro in products:
if pro['slug'] == slug:
return pro['id']"""
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def is_exist_attribute_in_one_product(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response:
products_info = response.json()
if products_info['attributes']:
return True
else:
return False
else:
return False
def is_exist_tabs_in_one_product(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response:
#logger.info(pprint.pformat(response.json()))
is_exist_tab = False
products_info = response.json()
for data in products_info['meta_data']:
if data.get('key') == "wb_custom_tabs" and not data.get('value'):
is_exist_tab = True
break
if is_exist_tab:
return True
else:
return False
else:
return False
"""def get_product_data_by_api(self, product_id, product_datas):
response = self.wcapi.get(f"products/{product_id}")
products_info = response.json()
new_datas = {}
for key, data in product_datas.items:
if products_info[key] == data:
pass
else:
products_info[key] = data
new_datas[key] = data
self.wcapi.put(f"products/{product_id}", new_datas)"""
def get_product_data_by_api(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
products_info = response.json()
return products_info
def create_tabs_from_custom_dict(self, product_id, product):
product_tabs_data = {}
list_product_tabs_data = []
x = 1
for key in product.keys():
if key == "Conseils dutilisation" or key == "Précautions articles" or key == "Description": #or key == "Allergènes":
product_tabs_data['title'] = key
product_tabs_data['content'] = product[key]
product_tabs_data['nickname'] = ''
product_tabs_data['position'] = x
product_tabs_data['tab_type'] = 'local'
list_product_tabs_data.append(product_tabs_data)
product_tabs_data = {}
x += 1
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
meta_data = []
meta_data.append(
{'key': 'wb_custom_tabs', 'value': list_product_tabs_data}
)
meta_data_data = {
'meta_data': meta_data
}
res = self.wcapi.post(f"products/{product_id}", meta_data_data)
else:
print(f"error - {product_id} - {response.status_code}")
def create_product(self, product_data):
logger.debug(f"ProductManager::create_product called (product_data = {product_data})")
if product_data is None:
logger.info(f"ProductManager::create_product called no product_data supplied - ignored")
return None
try:
product_id = self.find_id_by_slug(product_data['slug'])
if product_id:
return product_id
else:
response = self.wcapi.post("products/", product_data)
if response.status_code == 201:
logger.debug(f"ProductManager::create_product product created {response.json()['id']} - end")
return response.json()['id']
else:
logger.error(f"Erreur lors de la création du produit. {product_data['name']} Code: {response.status_code}, Message: {response.text}")
# Le produit n'a pas été créé, mais il y a une réponse avec un code d'erreur
except Exception as e:
logger.error(f"Erreur inattendue lors de l'envoi du produit à WooCommerce: {e}")
logger.debug(f"ProductManager::create_product - end")
def update_data_product(self, product_data, categories, medias, json_data):
#json_data = self.get_all_product_lines()
for field in json_data:
#logger.info(f"update_data_product = {product}")
product_id = self.find_id_by_slug(product_data['slug'])
list_category_id = self.get_list_category_for_product(categories)
list_img_id = self.get_list_media_id_for_product(medias)
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
def update_data_product_by_slug(self, slug):
json_data = self.get_all_product_lines()
for product in json_data:
if product['Slug'] == slug:
self.create_product(product)
product_id = self.find_id_by_slug(product['Slug'])
list_category_id = self.get_list_category_for_product(product['Catégories'])
list_img_id = self.get_list_media_id_for_product(product['Media Slugs'])
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
def get_all_products(self):
"""Récupère tous les produits en gérant la pagination"""
all_products = []
page = 1
while True:
response = self.wcapi.get("products", params={"per_page": 100, "page": page})
if response.status_code != 200:
print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}")
break
products = response.json()
if not products: # Si la page est vide, on arrête la boucle
break
all_products.extend(products)
page += 1 # On passe à la page suivante
return all_products
def get_all_categories_as_list(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response:
product = response.json()
return product['categories']
else:
return []
def get_all_medias_as_list(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response:
product = response.json()
return product['images']
else:
return []
def get_all_attributes_as_list(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response:
product = response.json()
return product["attributes"]
else:
[]
def delete_product(self):
json_data = self.get_all_product_lines()
for product in json_data:
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
if product['Nom'] == pro['name']:
self.wcapi.delete(f"products/{pro['id']}")
def delete_all_product(self):
products = self.get_all_products()
if products:
for pro in products:
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})
def delete_media_product(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
self.wcapi.delete(f"media/{media_id['id']}")
def delete_product_by_id(self, product_id):
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def delete_product_by_slug(self, slug):
product_id = self.find_id_by_slug(slug)
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def normalize_string(text):
return unicodedata.normalize("NFKC", text).strip().lower()
def tab_exists(self, product_id, name_tab):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
response_json = self.wcapi.get(f"products/{product_id}").json()
for meta_data in response_json['meta_data']:
for key_meta_data, value_meta_data in meta_data.items():
if key_meta_data == "value":
if isinstance(value_meta_data, list):
for tab in value_meta_data:
if name_tab == tab['title']:
return True
return False
class AttributeManager(OdsReader):
def __init__(self, wcapi, filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
def get_attributes(self):
attributes = self.wcapi.get(f"products/attributes").json()
one_attribute = self.wcapi.get(f"products/attributes/1/terms").json()
return attributes
def get_by_name(self, name):
attributes = self.wcapi.get(f"products/attributes").json()
for attr in attributes:
if attr['name'] == name:
attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json()
return attribute
def get_list_name_data(self):
list_name_data = []
json_data = self.get_all_attribute_and_tab_lines()
for item in json_data:
#if item['Onglet'].strip() == "Informations Complémentaires":
if item['Onglet'] == "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create(self, search_value=None):
if search_value:
features_json_data = self.get_attribute_and_tab_lines(search_value)
else:
features_json_data = self.get_all_attribute_and_tab_lines()
for item in features_json_data:
#if item['Onglet'].strip() == "Informations Complémentaires":
if item['Onglet'] == "Informations Complémentaires":
attribute_data = {
'name' : item["Nom"]
}
self.wcapi.post(f"products/attributes", attribute_data)
def get_term(self, search_value=None):
term_dict = {}
if search_value:
term_json_data = self.get_attribute_and_tab_lines(search_value)
else:
term_json_data = self.get_all_attribute_and_tab_lines()
for item in term_json_data:
list_item = []
#if item['Onglet'].strip() == "Informations Complémentaires":
if item['Onglet'] == "Informations Complémentaires":
if "," in item["Valeurs"]:
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
else:
item['Valeurs'].strip()
if list_item:
term_dict[item['Nom']] = list_item
else:
term_dict[item['Nom']] = item['Valeurs']
return term_dict
def configure_term(self):
term_dict = self.get_term()
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
for name, value in term_dict.items():
if attribute['name'] == name:
if isinstance(value, list):
for v in value:
term = {
'name' : v
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
else:
term = {
'name' : value
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
def create_for_product(self, product_id, name, value, variation=False):
data_attribute = {
'name': name,
'options':value
}
#logger.info(f"name = {name} - value = {value}")
#list_product_tabs_data.append(data_tab)
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
print("passe d'abord ici 1")
product_meta_data = response.json()
existing_attributes_data = product_meta_data.get("attributes", [])
already_exist = False
"""for data in existing_attributes_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['name'] == name:
already_exist = True"""
if already_exist == False:
found = False
#print(f"attributes_data = {existing_attributes_data}")
#print(f"data_attribute = {data_attribute}")
for attribute in existing_attributes_data:
#logger.info(f"name = {name} - value = {value}")
if attribute["name"] == name:
attribute["options"].append(data_attribute)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
if value is not None:
value = [v.strip() for v in value.split(",")]
logger.info(f"value = {value}")
existing_attributes_data.append({
"name": name,
"options": value,
"visible":True,
"variation": variation,
#"parent_id":product_id
})
attributes_data = {
'attributes': existing_attributes_data
}
logger.debug(f"existing_attributes_data = {existing_attributes_data}")
res = self.wcapi.put(f"products/{product_id}", attributes_data)
else:
print('already_exist')
else:
print(f"error 1")
def delete_all_for_product(self):
response_product = self.wcapi.get(f"products/", params={"per_page": 100})
if response_product.status_code == 200:
products = response_product.json()
for product in products:
existing_attributes_data = product.get("attributes", [])
if existing_attributes_data == []:
pass
else:
attribute_data = {
'attributes': []
}
res = self.wcapi.post(f"products/{product['id']}", attribute_data)
def delete_all_term(self):
response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response_attribute.status_code == 200:
attributes = response_attribute.json()
for attribute in attributes:
response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100})
if response_attribute_term.status_code == 200:
attributes_term = response_attribute_term.json()
for term in attributes_term:
self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True})
def delete_all(self):
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True})
class TabManager(OdsReader):
def __init__(self, wcapi,filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
def get_list_name_data(self, search_value=None):
list_name_data = []
"""if search_value:
json_data = self.get_attribute_and_tab_lines(search_value)
else:"""
json_data = self.get_all_attribute_and_tab_lines()
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create_or_update_for_product(self, product_id, tabs):
position = 1
for title, content in tabs.items():
position += 1
data_tab = {
'title': title,
'content':content,
'nickname':'',
'position':position,
'tab_type': 'local'
}
response = self.wcapi.get(f"products/{product_id}")
#logger.debug(response)
if response.status_code == 200:
print("passe d'abord ici 2")
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
already_exist = False
for data in existing_meta_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['title'] == title:
already_exist = True
if already_exist == False:
found = False
for meta in existing_meta_data:
if meta["key"] == "wb_custom_tabs":
meta["value"].append(data_tab)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_meta_data.append({
"key": "wb_custom_tabs",
"value": [data_tab]
})
meta_data_data = {
'meta_data': existing_meta_data
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
#print('else')
data_tab = {
'content':content,
}
meta_data_data = {
'meta_data': existing_meta_data
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
print(f"error 2")
def delete_by_product_id(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product_id}", meta_data)
def delete_all(self):
response = self.wcapi.get(f"products/", params={"per_page": 100})
if response.status_code == 200:
product_meta_data = response.json()
for product in product_meta_data:
existing_meta_data = product.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product['id']}", meta_data)
class VariationsManager(OdsReader):
def __init__(self, wcapi, filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
def get_attribute_id(self, product_data):
response = self.wcapi.get(f"products/attributes")
if response.status_code == 200:
attributes = response.json()
for key, value in product_data.items():
for attr_key, attr_value in attributes.items():
if attr_value['name'] == key:
attribute_id = attr_value['id']
return attribute_id
def update_product_attributes_merged(self, wcapi, product_id, attribute_name, new_options):
"""
Met à jour l'attribut d'un produit WooCommerce en ajoutant de nouvelles options,
sans écraser les autres attributs existants.
:param wcapi: Instance API WooCommerce (wcapi = API(...))
:param product_id: ID du produit à mettre à jour
:param attribute_name: Nom de l'attribut à enrichir (ex: "Parfums")
:param new_options: Liste des nouvelles valeurs à ajouter (ex: ["Lavande", "Citron"])
"""
# Nettoyer les nouvelles options
new_options = [opt.strip() for opt in new_options.split('|') if opt.strip()]
# 1. Récupérer le produit existant
response = wcapi.get(f"products/{product_id}")
if response.status_code != 200:
print(f"❌ Impossible de récupérer le produit {product_id}")
return
product = response.json()
attributes = product.get("attributes", [])
# 2. Chercher l'attribut ciblé
found = False
for attr in attributes:
if attr["name"].lower() == attribute_name.lower():
existing_options = attr.get("options", [])
merged_options = list(set(existing_options + new_options))
attr["options"] = merged_options
attr["variation"] = True
attr["visible"] = True
attr["parent_id"] = product_id
attr["manage_stock"] = "parent"
found = True
break
# 3. Si l'attribut n'existe pas, on l'ajoute
if not found:
attributes.append({
"name": attribute_name,
"variation": True,
"visible": True,
"options": new_options
})
# 4. Mettre à jour le produit avec les attributs fusionnés
update_data = {
"attributes": attributes
}
update_res = wcapi.put(f"products/{product_id}", update_data)
if update_res.status_code == 200:
print(f"✅ Attribut '{attribute_name}' mis à jour avec succès.")
else:
print(f"❌ Erreur lors de la mise à jour : {update_res.status_code}")
print(update_res.json())
def create_variations_products(self, product_id, product_data):
products_lines = self.get_all_product_lines()
product_line = self.get_product_by_slug_from_ods(product_data['slug'])
parfums = None
volumes = None
price_per_product_variable = None
if product_line['Type'] == "Variable":
if product_line['Choix parfums'] is not None:
parfums = [p.strip() for p in product_line['Choix parfums'].split(",")]
print(f"parfums = {parfums}")
if product_line['Volume'] is not None:
volumes = [v.strip() for v in product_line['Volume'].split(",")]
print(f"volumes = {volumes}")
"""if product_line['Prix pour'] is not None:
#products = [v.strip() for v in product_line['Prix pour'].split(",")]
products = product_line['Prix pour'].split(",")
price_per_product_variable = {}
for p in products:
name, price = p.split("=")
price_per_product_variable[name.strip()] = price.strip()
pprint.pprint(price_per_product_variable)"""
response = self.wcapi.get(f"products/{product_id}")
try:
if response.status_code == 200:
#existing_product = response.json()
#self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums)
if parfums is not None:
for parfum in parfums:
data = {
'attributes': [
{
'name': 'Choix parfums',
'option': parfum
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': product_data['price'],
}
print(f"Posting variation: {data}")
response = self.wcapi.post(f"products/{product_id}/variations", data)
logger.info(f"Variation de parfums a bien été créé")
if volumes is not None:
for volume in volumes:
data = {
'attributes': [
{
'name': 'Volume',
'option': volume
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': product_data['price'],
}
print(f"Posting variation: {data}")
result = self.wcapi.post(f"products/{product_id}/variations", data)
logger.info(f"Variation de volumes a bien été créé")
"""if price_per_product_variable is not None:
for name, price in price_per_product_variable.items():
data = {
'attributes': [
{
'name': 'Volume',
'option': name
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': price,
}
result = self.wcapi.post(f"products/{product_id}/variations", data)
logger.info(f"Variation de prix selon objet bien créé")"""
except Exception as e:
logger.exception(f"Erreur lors de la création du produit de variation : {e}")
#logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}")
"""
for product_line_key, products_line_value in product_line.items():
if product_line_key == "Choix parfums":
name_attribute = product_line_key
parfums = products_line_value
if product_line_key == "Type":
if product_data['type'] == "Variable":
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
existing_product = response.json()
self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums)
parfums = [p.strip() for p in parfums.split("|") if p.strip()]
response = self.wcapi.get(f"products/{product_id}/variations")
if response.status_code == 200:
for parfum in parfums:
data = {
'attributes': [
{
'name': name_attribute,
'option': parfum
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': product_data['price'],
}
print(f"Posting variation: {data}")
result = self.wcapi.post(f"products/{product_id}/variations", data)
print(result.status_code)
pprint.pprint(result.json())
else:
return False"""
class WooCommerceManager(OdsReader):
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager, filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
self.media_manager = media_manager
self.category_manager = category_manager
self.product_manager = product_manager
self.tab_manager = tab_manager
self.attribute_manager = attribute_manager
self.variation_manager = variation_manager
self.filename_ods = filename_ods
# access to managers
@property
def mm(self): return self.media_manager
@property
def cm(self): return self.category_manager
@property
def pm(self): return self.product_manager
@property
def tm(self): return self.tab_manager
@property
def am(self): return self.attribute_manager
@property
def vm(self): return self.variation_manager
def tab_exists(self, product_id, name_tab):
return self.product_manager.tab_exists(product_id, name_tab)
def get_product_tab_details(self):
all_products_json = self.get_all_attribute_and_tab_lines()
all_tabs = self.tab_manager.get_list_name_data()
dict = {}
for product in all_products_json:
line = []
for tab in all_tabs:
line.append([tab, product[tab]])
dict[product["Parfum"]] = line
return dict
def get_product_attributes_details(self):
ret = []
all_products_json = self.get_all_product_lines()
all_attributes = self.attribute_manager.get_list_name_data()
for product in all_products_json:
for attribute in all_attributes:
ret.append([attribute, product[attribute]])
return ret
def update_product_tab_by_slug(self, slug):
product_id = self.product_manager.find_id_by_slug(slug)
product = self.product_manager.find_product_by_id(product_id)
products_tab_details = self.get_product_tab_details()
x=1
for value in products_tab_details.values():
for key in products_tab_details.keys():
for title, content in value:
if key:
if key in product['short_description']:
self.tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
else:
pass
else:
print('no key')
x=1
def update_product_attribute_by_slug(self, slug):
product_id = self.product_manager.find_id_by_slug(slug)
product_ods = self.get_product_by_slug_from_ods(slug)
products_attribute_details = self.get_product_attributes_details()
for name, value in products_attribute_details:
self.attribute_manager.create_for_product(product_id=product_id,
name=name, value=value,
variation=self.is_variable(product_ods['Type']))
def update_product(self):
self.update_product_tab()
def update_product_by_slug(self, slug):
self.product_manager.update_data_product_by_slug(slug)
self.update_product_tab_by_slug(slug)
#self.update_product_attribute_by_slug(slug)
def create_all_informations(self):
medias = self.media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
self.process_file(FILENAME_ODS)
self.update_product()
def get_list_category_for_product(self, category):
category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")]
return category_list_by_doc
def get_list_variable_attributes(self,attributes):
list_variable_attributes_by_doc = [attr.strip() for attr in attributes.split(",")]
return list_variable_attributes_by_doc
def get_list_media_id_for_product(self, product):
#list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")]
list_media_by_doc = []
list_media_by_doc.append(product['Image1'])
list_media_by_doc.append(product['Image2'])
list_media_by_doc.append(product['Image3'])
list_media_by_doc.append(product['Pyramide'])
list_media_by_doc.append(product['Clp'])
return list_media_by_doc
def is_variable(self, name, value):
if name == "Volume" or name =="Choix parfums":
if value is not None:
return True
else:
return False
def update_product_attribute(self, attributes, product_data):
print(f"product_data = {product_data}")
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
for name, value in attributes.items():
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(name, value))
#self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data))
def update_product_variations(self, product_data):
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
self.variation_manager.create_variations_products(product_id, product_data)
def update_product_tab(self, product_data):
for product in product_data:
self.update_product_tab_by_id(product['id'])
def create_product(self, product_data, attributes, tabs, categories, medias, json_data):
self.product_manager.create_product(product_data=product_data)
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data)
self.update_product_attribute(attributes=attributes, product_data=product_data)
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
#self.update_product_variations(product_data)
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
def create_or_update_product(self, product_data, attributes, tabs, categories, medias, json_data):
try:
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
if product_id:
#self.product_manager.update_data_product(attributes=attributes, product_data=product_data)
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data)
else:
self.product_manager.create_product(product_data=product_data)
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data)
self.update_product_attribute(attributes=attributes, product_data=product_data)
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
#self.update_product_variations(product_data)
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
except Exception as e:
print(f"Erreur lors de la mise à jour du produit: {e}")
logger.exception(f"Erreur lors de la mise à jour du produit: {e}")
logger.debug()
def get_product_lines(self, search_value=None):
if search_value:
return self.get_product_line_by_value(search_value)
else:
return self.get_all_product_lines()
def process_file(self, search_value=None):
# refresh media cache
medias = self.media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
# read provided file
products_lines = self.get_product_lines(search_value)
#pprint.pprint(products_lines)
for product_line in products_lines:
# standard product data
product_data = {
'name' : product_line['Nom'],
'price': product_line['Prix'],
'regular_price': product_line['Prix'],
'stock_quantity': product_line['Stock'],
'manage_stock':True,
'weight':str(product_line['Poids']),
'sku':str(product_line['Numéro de référence']),
'description': product_line['Description'],
'short_description': product_line['Courte Description'],
'slug':product_line['Slug']
}
if product_line['Promo'] is not None:
product_data['sale_price'] = product_line['Promo']
"""if product_line['Type'] == "parfums":
product_data['type'] = "variable"
if product_line['Volume'] is not None:
values_attributes = self.get_list_variable_attributes(product_line['Volume'])
attributes['Volume'] = values_attributes
else:
product_data['type'] = "simple"""
attributes = {
"Temps de combustion" : product_line['Temps de combustion'],
"Type de cire" : product_line['Type de cire'],
"Mèche" : product_line['Mèche'],
"Fabrication" : product_line['Fabrication'],
"Composition" : product_line['Composition'],
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
"Parfums" : product_line['Parfums'],
"Volume" : product_line["Volume"]
}
tabs ={
#"Description" : product_line["Description"],
"Conseils d'utilisation" : product_line["Conseils dutilisation"],
"Précautions articles" : product_line["Précautions articles"],
#"Allergènes" : product_line["Allergènes"]
}
# ... associated categories
categories = self.get_list_category_for_product(product_line['Catégories'])
# ... associated medias
medias = self.get_list_media_id_for_product(product_line['Media Slugs'])
# create or update product
self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias)
def get_product_data_as_dict(self, product_line):
product_data = {
'name' : product_line['Nom'],
'price': product_line['Prix'],
'regular_price': product_line['Prix'],
'stock_quantity': product_line['Stock'],
'manage_stock':True,
'weight':str(product_line['Poids']),
'sku':str(product_line['Numéro de référence']),
'description': product_line['Description'],
'short_description': product_line['Courte Description'],
'slug':product_line['Slug']
}
if product_line['Promo'] is not None:
product_data['sale_price'] = product_line['Promo']
if product_line['Type'] is not None:
product_data['type'] = "variable"
if product_line['Fabrication sur commande'] is not None:
#backoreded = [b.strip() for b in product_line['Fabrication sur commande'].split(",")]
product_data['backorders'] = "notify"
product_data['backorders_allowed'] = True
product_data['backordered'] = True
if product_line['Status du produit']:
if product_line['Status du produit'] == 'P':
product_data['status'] = 'publish'
logger.info(f"Article publié {product_line['Nom']}")
elif product_line['Status du produit'] == 'B':
product_data['status'] = 'draft'
logger.info(f"Article brouillon {product_line['Nom']}")
else:
logger.debug(f"Article ignoré {product_line['Nom']}")
return None
return product_data
def get_product_attributes_as_dict(self, product_line):
attributes = {
"Temps de combustion" : product_line['Temps de combustion'],
"Type de cire" : product_line['Type de cire'],
"Mèche" : product_line['Mèche'],
"Fabrication" : product_line['Fabrication'],
"Composition" : product_line['Composition'],
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
#"Parfums" : product_line['Choix parfums']
#"Volume" : product_line["Volume"]
}
if product_line["Volume"]:
attributes["volume"] = product_line["Volume"]
if product_line['Type de cire']:
attributes['Type de cire'] = product_line['Type de cire']
if product_line['Mèche']:
attributes['Mèche'] = product_line['Mèche']
if product_line['Parfums']:
attributes["Parfums"] = product_line['Parfums']
return attributes
def get_product_tabs_as_dict(self, product_line):
tabs ={
#"Description" : product_line["Description"],
"Conseils d'utilisation" : product_line["Conseils dutilisation"],
"Précautions articles" : product_line["Précautions articles"],
#"Allergènes" : product_line["Allergènes"]
}
return tabs
# --------------------------------- Medias -----------------------------------------------
def has_product_all_medias(self, product_id, medias):
print(f"medias={medias}")
list_medias_by_api = self.product_manager.get_all_medias_as_list(product_id)
names_in_list_medias_by_api = [media['name'] for media in list_medias_by_api]
if set(medias) != set(names_in_list_medias_by_api):
return False
else:
return True
def assign_medias_to_product(self, id, media_slugs):
list_medias_id = []
for slug in media_slugs:
list_medias_id.append(self.media_manager.find_id_by_slug(slug))
logger.info(f"liste des medias id = {list_medias_id}")
self.product_manager.update_data_list_medias_product(id, list_medias_id)
# --------------------------------- Category ----------------------------------------------
def has_product_all_categories(self, product_id, categories):
list_categories_by_api = self.product_manager.get_all_categories_as_list(product_id)
names_in_list_categories_by_api = [category['name'] for category in list_categories_by_api]
if set(categories) != set(names_in_list_categories_by_api):
return False
else:
return True
def assign_categories_to_product(self, id, categorie_slugs):
list_categories_id = []
for slug in categorie_slugs:
list_categories_id.append(self.category_manager.find_id_by_slug(slug))
self.product_manager.update_data_list_category_product(id, list_categories_id)
# --------------------------------- Attributes ----------------------------------------------
def is_exist_attribute_in_one_product(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response:
products_info = response.json()
if products_info['attributes']:
return True
else:
return False
else:
return False
def has_product_all_attributes(self, product_id, attributes):
#pprint.pprint(self.product_manager.get_all_attributes_as_list(product_id))
pprint.pprint(attributes)
list_attributes_by_api = self.product_manager.get_all_attributes_as_list(product_id)
pprint.pprint(list_attributes_by_api)
"""if list_attributes_by_api == []:
return True
# Comparaison directe
not_found_same_value = False
for i, d in enumerate(list_attributes_by_api):
for key, value in attributes.items():
for k, v in d.items():
if k == 'options':
if value in d[k]:
not_found_same_value = False
print(f"v = {v}, value = {value} d[k] = {d[k]}")
print("trouvé")
else:
print(f"v = {v}, value = {value} d[k] = {d[k]}")
print("pas trouvé")
#return True"""
"""common_keys = list_attributes_by_api.keys() & attributes.keys()
found_same_value = False
for key in common_keys:
pprint.pprint(common_keys)
if list_attributes_by_api[key] != attributes[key]:
found_same_value = True
break
else:
pass
if found_same_value == True:
return True
else:
return False"""
def assign_attributes_to_product(self, product_id, attributes):
for attr in attributes:
for key, value in attributes.items():
#logger.debug(f"key = {key} - value = {value} - attr = {attr}")
if attr == key :
self.attribute_manager.create_for_product(product_id=product_id, name=key, value=value, variation=self.is_variable(key, value))
# --------------------------------- Product ----------------------------------------------
def has_product_all_datas(self, product_id, product_data):
list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id)
common_keys = list_product_data_by_api.keys() & product_data.keys()
# Étape 2 : comparaison des valeurs
found_same_value = False
for key in common_keys:
if list_product_data_by_api[key] != product_data[key]:
found_same_value = True
break
else:
pass
if found_same_value == True:
return True
else:
return False
def assign_product_changement(self, product_id, product_data):
list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id)
common_keys = list_product_data_by_api.keys() & product_data.keys()
# Étape 2 : comparaison des valeurs
differences = {}
for key in common_keys:
if list_product_data_by_api[key] != product_data[key]:
differences[key] = product_data[key]
self.wcapi.put(f"products/{product_id}", differences)
# -------------------------------------------- Tabs ------------------------------------------------------
#def missing_tabs_for_product(self, product_id, tabs):
# ---------------------------------------------------------------------------------------------------------
def assign_tabs_to_product(self, product_id, tabs):
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
def process_file_from_to(self, range_start, range_end=None):
medias = self.media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
#logger.info(f"self.fetch_all_product_rows(range_start, range_end): = {self.fetch_all_product_rows(range_start, range_end)}")
for product_line in self.fetch_all_product_rows(range_start, range_end):
# define expected settings
expected_product_data = self.get_product_data_as_dict(product_line)
expected_categories = self.get_list_category_for_product(product_line['Catégories'])
expected_medias = self.get_list_media_id_for_product(product_line)
expected_attributes = self.get_product_attributes_as_dict(product_line)
expected_tabs = self.get_product_tabs_as_dict(product_line)
# step1: make sure product exists
product_id = self.product_manager.find_id_by_slug(product_line['Slug'])
if product_id is None:
#product_id = self.create_product(product_data=expected_product_data, attributes=expected_attributes, tabs=expected_tabs, categories=expected_categories, medias=expected_medias, json_data=product_line)
product_id = self.product_manager.create_product(product_data=expected_product_data)
logger.info('produit créé')
# step2: make sure categories are assigned to product
if not self.has_product_all_categories(product_id=product_id, categories=expected_categories):
self.assign_categories_to_product(id=product_id, categorie_slugs=expected_categories)
logger.info('catégorie créée')
# step3: make sure attributes are assigned to product
#print(f"attr = {self.product_manager.is_exist_attribute_in_one_product(product_id)}")
# if self.product_manager.is_exist_attribute_in_one_product(product_id):
if self.has_product_all_attributes(product_id=product_id, attributes=expected_attributes):
#self.assign_attributes_to_product(product_id=product_id, attributes=expected_attributes)
logger.info('attributs créés')
# step4: make sure tabs are assigned to product
if not self.product_manager.is_exist_tabs_in_one_product(product_id):
self.assign_tabs_to_product(product_id=product_id, tabs=expected_tabs)
logger.info('onglets crées')
# step5: make sure media are assigned to product
if not self.has_product_all_medias(product_id=product_id, medias=expected_medias):
self.assign_medias_to_product(id=product_id, media_slugs=expected_medias)
logger.info('medias crées')
if not self.has_product_all_datas(product_id, expected_product_data):
logger.info('ici')
def delete_all_informations(self):
self.media_manager.delete_all_images()
self.attribute_manager.delete_all()
self.product_manager.delete_all_product()
self.category_manager.delete_all_category()
def delete_information_by_slug(self):
self.product_manager.delete_product_by_slug("chope-adoucissant")
#category_manager.delete_all_category()
class OrderManager:
def __init__(self, wcapi, ath):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0"
}
def delete_all_orders(self):
response = self.wcapi.get("orders/",params={"per_page": 100})
print(f"response = {response.status_code}")
if response.status_code == 200:
orders = response.json()
for index, order in enumerate(orders):
self.wcapi.delete(f"orders/{order['id']}", params={"force": True}).json()
class SeoManager(OdsReader):
def __init__(self, ath, filename_ods):# filename_ods
super().__init__(filename_ods) # filename_ods
self.ath = ath
self.page_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/pages"
def get_all_pages(self):
print("coucou")
"""Récupère toutes les images en gérant la pagination"""
all_pages = []
dict_id_slug = {}
response = self.wcapi.get(f"seo", params={"per_page": 100})
if response.status_code != 200:
pass
list_pages = response.json()
if not list_pages:
pass
for index, page in enumerate(list_pages):
dict_id_slug[list_pages[index]['id']] = list_pages[index]['slug']
all_pages.append(dict_id_slug)
dict_id_slug = {}
return all_pages
def update_seo_page(self):
all_pages = self.get_all_pages()
pprint.pprint(all_pages)
seo_lines = self.get_all_seo_lines()
#pprint.pprint(seo_lines)
for page_id_slug in all_pages:
for key_page, slug_page in page_id_slug.items():
print(f"key_page = {key_page}")
for line in seo_lines:
#dict_seo = {}
if line['Slug'] == slug_page:
data = {
"meta": {
"og_title": line["Titre"],
"og_description": line["Description"],
#"_yoast_wpseo_opengraph-title": line["Titre"],
#"_yoast_wpseo_opengraph-description": line["Description"]
}
}
response = self.wcapi.post(f"seo/{key_page}")
""""meta": {
"_yoast_wpseo_title": line["Titre"],
"_yoast_wpseo_metadesc": line["Description"],
"_yoast_wpseo_opengraph-title": line["Titre"],
"_yoast_wpseo_opengraph-description": line["Description"]
}"""
"""dict_seo['yoast_head_json']['description'] = line['Description']
dict_seo['yoast_head_json']['og_description'] = line['Description']
dict_seo['yoast_head_json']['og_title'] = line['Titre']
response = requests.post(
f"{self.page_api_url}/{page['id']}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
#"Authorization": f"Basic {self.ath['auth_base64']}",
#"Content-Disposition": f"attachment; filename={image_name}"
},
json=dict_seo,
verify=False
)"""
#page['yoast_head_json']['description']
#page['yoast_head_json']['og_description']
#page['yoast_head_json']['og_title']
#ALL_TABS = ["Allergènes", "Conseils dutilisation", "Description", "Précautions articles"]
#ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"]
if __name__ == "__main__":
media_manager = MediaManager(ath=ath, filename_ods=FILENAME_ODS)
"""
utilisation
module argparse
# on va appeler ça importation d'un fichier ods, d'où l'action import-ods
# on va appeler cette commande, "la commande de base"
wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key=<consumer_key> --wc-secret=<consumer_secret> import-ods --ods-path=fichier.ods
# traitement de l'intégralité d'un fichier ods
... --all
# traitement des medias seulement, on peut en option spécifier une plage de média à importer
... --medias [--media-range=1:40]
plu tard ...
# traitement des catégories seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de la catégorie
... --categories [--categories-regex=<regex>]
ex: traiter uniquement les catégories dont le nom contient le terme "bougie"
... --categories [--categories-regex=.*bougie.*]
# traitement des articles seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de l'article'
# ... --products [--products-regex=<regex>]
ex: traiter uniquement les articles dont le nom contient le terme "bougie"
... --categories [--products-regex=.*bougie.*]
"""
#parser = argparse.ArgumentParser(description="Script de traitement WooCommerce")
#wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key=<consumer_key> --wc-secret=<consumer_secret> import-ods --ods-path=fichier.ods