2025-04-07 12:20:48 +02:00
|
|
|
|
from woocommerce import API as WoocommerceApi
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
import ezodf
|
|
|
|
|
|
import requests
|
|
|
|
|
|
import pprint
|
|
|
|
|
|
import base64
|
|
|
|
|
|
import unicodedata
|
|
|
|
|
|
import os
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
from watermark import create_watermark_image
|
2025-05-18 21:19:20 +02:00
|
|
|
|
from base64 import b64encode
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
import logging
|
2025-05-08 12:15:42 +02:00
|
|
|
|
logger = logging.getLogger(__name__)
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
# 🧪 Test
|
|
|
|
|
|
"""logger.debug("Démarrage du programme (DEBUG)")
|
|
|
|
|
|
logger.info("Traitement en cours (INFO)")
|
|
|
|
|
|
logger.warning("Avertissement (WARNING)")
|
|
|
|
|
|
logger.error("Erreur (ERROR)")
|
|
|
|
|
|
logger.critical("Erreur critique (CRITICAL)")"""
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
# via consumer key and consumer secret :
|
|
|
|
|
|
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
#url="https://lescreationsdemissbleue.local",
|
2025-05-18 21:19:20 +02:00
|
|
|
|
#url="https://les-creations-de-missbleue.local",
|
|
|
|
|
|
#consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
|
|
|
|
|
|
#consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-18 21:19:20 +02:00
|
|
|
|
class AuthentificationWpApi():
|
2025-04-07 12:20:48 +02:00
|
|
|
|
# Identifiants WordPress (et non WooCommerce)
|
|
|
|
|
|
wordpress_username = "admin_lcdm" # Remplace par ton username WordPress
|
2025-05-18 21:19:20 +02:00
|
|
|
|
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #'W6Zt N5CU 2Gj6 TlKm clGn LvIz' #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
# Générer l'authentification Basic en base64
|
|
|
|
|
|
auth_str = f"{wordpress_username}:{wordpress_application_password}"
|
|
|
|
|
|
auth_bytes = auth_str.encode("utf-8")
|
|
|
|
|
|
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
|
|
|
|
|
|
|
|
|
|
|
|
ath = AuthentificationWpApi()
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
WEBSITE_URL = "https://les-creations-de-missbleue.local"
|
2025-05-18 21:19:20 +02:00
|
|
|
|
#WEBSITE_URL = "https://les-creations-de-missbleue.com"
|
2025-05-08 12:15:42 +02:00
|
|
|
|
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods"
|
2025-05-18 21:19:20 +02:00
|
|
|
|
#BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
|
|
|
|
|
|
BASE_PATH = "photos/photos_site/Photos_site/"
|
2025-04-07 12:20:48 +02:00
|
|
|
|
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
|
2025-05-18 21:19:20 +02:00
|
|
|
|
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_version_finale.ods"
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
class OdsReader:
|
|
|
|
|
|
def __init__(self, filename_ods=FILENAME_ODS):
|
|
|
|
|
|
self.filename_ods = filename_ods
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_product_lines(self):
|
|
|
|
|
|
return self.get_doc_ods(2)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def fetch_all_product_rows(self, start, end=None):
|
|
|
|
|
|
return self.extract_ods_row(2, start, end)
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_line_by_value(self, search_value):
|
|
|
|
|
|
return self.get_doc_ods_by_value(2, search_value)
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_by_slug_from_ods(self, slug):
|
|
|
|
|
|
for product in self.get_all_product_lines():
|
|
|
|
|
|
if product['Slug'] == slug: return product
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def get_all_media_lines(self):
|
|
|
|
|
|
return self.get_doc_ods(0)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def fetch_all_media_rows(self, start, end=None):
|
|
|
|
|
|
return self.extract_ods_row(0, start, end)
|
|
|
|
|
|
|
|
|
|
|
|
def get_media_line_by_value(self, search_value):
|
|
|
|
|
|
return self.get_doc_ods_by_value(0, search_value)
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def get_all_attribute_and_tab_lines(self):
|
|
|
|
|
|
return self.get_doc_ods(3)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def get_attribute_and_tab_lines(self, search_value):
|
|
|
|
|
|
return self.get_doc_ods_by_value(3, search_value)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def get_all_category_lines(self):
|
|
|
|
|
|
return self.get_doc_ods(1)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def get_category_line_by_value(self, search_value):
|
|
|
|
|
|
return self.get_doc_ods_by_value(1, search_value)
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_seo_lines(self):
|
|
|
|
|
|
return self.get_doc_ods(6)
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def get_doc_ods(self, number_sheet):
|
|
|
|
|
|
doc = ezodf.opendoc(self.filename_ods)
|
|
|
|
|
|
sheet = doc.sheets[number_sheet]
|
|
|
|
|
|
data = []
|
|
|
|
|
|
for row in sheet.rows():
|
|
|
|
|
|
data.append([cell.value for cell in row])
|
|
|
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(data)
|
|
|
|
|
|
df.columns = df.iloc[0]
|
|
|
|
|
|
df = df[1:].reset_index(drop=True)
|
|
|
|
|
|
df = df.dropna(how='all')
|
|
|
|
|
|
json_data = df.to_dict(orient="records")
|
|
|
|
|
|
return json_data
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def get_doc_ods_by_value(self, number_sheet, search_value=None):
|
|
|
|
|
|
doc = ezodf.opendoc(self.filename_ods)
|
|
|
|
|
|
sheet = doc.sheets[number_sheet]
|
|
|
|
|
|
data = []
|
|
|
|
|
|
|
|
|
|
|
|
for row in sheet.rows():
|
|
|
|
|
|
data.append([cell.value for cell in row])
|
|
|
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(data)
|
|
|
|
|
|
df.columns = df.iloc[0]
|
|
|
|
|
|
df = df[1:].reset_index(drop=True)
|
|
|
|
|
|
df = df.dropna(how='all')
|
|
|
|
|
|
|
|
|
|
|
|
if search_value:
|
|
|
|
|
|
try:
|
|
|
|
|
|
print(f"Recherche de la valeur : {search_value}")
|
|
|
|
|
|
|
|
|
|
|
|
# Vérifier que le DataFrame n'est pas vide
|
|
|
|
|
|
if df.empty:
|
|
|
|
|
|
raise ValueError("Le DataFrame est vide")
|
|
|
|
|
|
|
|
|
|
|
|
# Nettoyer le search_value pour enlever les espaces superflus
|
|
|
|
|
|
search_value = str(search_value).strip()
|
|
|
|
|
|
|
|
|
|
|
|
# Dynamique sur la colonne à rechercher
|
|
|
|
|
|
column_name = 'Nom' # à modifier selon la situation
|
|
|
|
|
|
|
|
|
|
|
|
if column_name not in df.columns:
|
|
|
|
|
|
raise ValueError(f"La colonne '{column_name}' n'existe pas dans le DataFrame")
|
|
|
|
|
|
|
|
|
|
|
|
# Supprimer les espaces avant et après dans la colonne cible
|
|
|
|
|
|
df[column_name] = df[column_name].str.strip()
|
|
|
|
|
|
|
|
|
|
|
|
# Remplir les NaN par des chaînes vides
|
|
|
|
|
|
df[column_name] = df[column_name].fillna('')
|
|
|
|
|
|
|
|
|
|
|
|
# Recherche avec contains sur la colonne
|
|
|
|
|
|
mask = df[column_name].str.contains(str(search_value), case=False, na=False)
|
|
|
|
|
|
#print(f"Masque généré :\n{mask}")
|
|
|
|
|
|
|
|
|
|
|
|
if mask.sum() == 0: # Si aucune ligne ne correspond
|
|
|
|
|
|
raise ValueError(f"Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'")
|
|
|
|
|
|
|
|
|
|
|
|
# Filtrage du DataFrame
|
|
|
|
|
|
df = df[mask]
|
|
|
|
|
|
#print(f"df après filtrage :\n{df}")
|
|
|
|
|
|
|
|
|
|
|
|
except ValueError as ve:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#print(f"Erreur : {ve}")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
logger.exception(f"🚫 Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#print(f"Erreur lors de la recherche : {e}")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
logger.exception(f"🚫 Erreur lors de la recherche de '{search_value}' dans la colonne '{column_name}'. Exception : {e}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("Aucun search_value fourni")
|
|
|
|
|
|
|
|
|
|
|
|
# Convertir en json_data pour le retour
|
|
|
|
|
|
json_data = df.to_dict(orient="records")
|
|
|
|
|
|
return json_data
|
|
|
|
|
|
|
|
|
|
|
|
def extract_ods_row(self, number_sheet, start_row=None, end_row=None):
|
|
|
|
|
|
doc = ezodf.opendoc(self.filename_ods)
|
|
|
|
|
|
sheet = doc.sheets[number_sheet]
|
|
|
|
|
|
data = []
|
|
|
|
|
|
|
|
|
|
|
|
for row in sheet.rows():
|
|
|
|
|
|
data.append([cell.value for cell in row])
|
2025-05-24 10:34:04 +00:00
|
|
|
|
logger.debug(f'extract_ods_row: {len(data)} rows found in ods sheet #{number_sheet}')
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(data)
|
|
|
|
|
|
df.columns = df.iloc[0]
|
|
|
|
|
|
df = df[1:].reset_index(drop=True)
|
|
|
|
|
|
|
|
|
|
|
|
if start_row is not None and end_row is not None:
|
|
|
|
|
|
df = df.iloc[start_row:end_row]
|
|
|
|
|
|
elif start_row is not None:
|
|
|
|
|
|
df = df.iloc[start_row:]
|
|
|
|
|
|
elif end_row is not None:
|
|
|
|
|
|
df = df.iloc[:end_row]
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
"""# Voir tous les noms de colonnes
|
|
|
|
|
|
print(df.columns.tolist())
|
|
|
|
|
|
# Compter les doublons de noms
|
|
|
|
|
|
from collections import Counter
|
|
|
|
|
|
print([item for item, count in Counter(df.columns).items() if count > 1])
|
|
|
|
|
|
# Nettoyer les noms de colonnes (supprimer espaces, etc.)
|
|
|
|
|
|
df.columns = df.columns.str.strip()"""
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
df = df.dropna(how='all')
|
|
|
|
|
|
return df.to_dict(orient="records")
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
class MediaManager(OdsReader):
|
|
|
|
|
|
|
2025-05-18 21:19:20 +02:00
|
|
|
|
def __init__(self, ath, wcapi, filename_ods):# filename_ods
|
2025-05-08 12:15:42 +02:00
|
|
|
|
super().__init__(filename_ods) # filename_ods
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.ath = ath
|
2025-05-18 21:19:20 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
#self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
|
|
|
|
|
|
#self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings"
|
|
|
|
|
|
self.dict_equivalence = {'ambre':'ambré', 'meringue':'meringué', 'givree':'givrée', 'sale':'salé', 'bresilien':'brésilien', 'epices':'épices', 'noel':'noël', 'a':'à', 'petales':'pétales',
|
|
|
|
|
|
'lumiere':'lumière', 'allumee':'allumée', 'eteinte':'éteinte', 'celebration':'célébration', 'argente':'argenté', 'dore':'doré', 'accroche':'accroché', 'pose':'posé', 'colore':'coloré',
|
|
|
|
|
|
'kevin': 'Kévin', 'interieur':'intérieur', 'cafe':'café', 'bresil':'Brésil', 'dagrumes': "d'agrumes", "iles":"îles", 'apero': 'apéro', 'quebecois':'québecois', 'defendu':'défendu',
|
2025-05-24 10:34:04 +00:00
|
|
|
|
'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé', 'peche' : 'pêche'
|
2025-05-18 21:19:20 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def upload_media(self, search_value=None):
|
|
|
|
|
|
if search_value:
|
|
|
|
|
|
json_data = self.get_media_line_by_value(search_value)
|
|
|
|
|
|
else:
|
|
|
|
|
|
json_data = self.get_all_media_lines()
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
for media in json_data:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
media_chemin = media['Chemin'].replace("\\", "/")
|
|
|
|
|
|
path = Path(BASE_PATH + media_chemin)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
image_name = path.name
|
2025-05-08 12:15:42 +02:00
|
|
|
|
try:
|
|
|
|
|
|
if not self.is_exists(media, image_name):
|
2025-05-24 10:34:04 +00:00
|
|
|
|
image_path = BASE_PATH + media_chemin
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
# 👇 Tentative d'ouverture et d'envoi
|
|
|
|
|
|
with open(image_path, "rb") as image_file:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.post("media",files={"file": image_file})
|
2025-05-08 12:15:42 +02:00
|
|
|
|
if response.status_code == 201:
|
|
|
|
|
|
media_data = response.json()
|
|
|
|
|
|
self.update_data_media(media, media_data['id'])
|
|
|
|
|
|
logger.info(f"✅ Image uploadée : {image_name}")
|
|
|
|
|
|
else:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
else:
|
|
|
|
|
|
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
|
|
|
|
|
|
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
|
|
|
|
|
|
|
|
|
|
|
|
except requests.RequestException as e:
|
|
|
|
|
|
logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_and_update_media(self, media, image_name, path, watermark=False):
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#print(f"image_path = {path}")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
try:
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if not self.is_exists(media, image_name):
|
2025-05-08 12:15:42 +02:00
|
|
|
|
if watermark:
|
|
|
|
|
|
image_path = path
|
|
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
media_chemin = media['Chemin'].replace("\\", "/")
|
|
|
|
|
|
image_path = BASE_PATH + media_chemin
|
2025-05-08 12:15:42 +02:00
|
|
|
|
# 👇 Tentative d'ouverture et d'envoi
|
|
|
|
|
|
with open(image_path, "rb") as image_file:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.post("media",files={"file": image_file})
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
#print(f"response = {response.status_code}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if response.status_code == 201:
|
|
|
|
|
|
media_data = response.json()
|
|
|
|
|
|
self.update_data_media(media, media_data['id'])
|
2025-05-08 12:15:42 +02:00
|
|
|
|
logger.info(f"✅ Image uploadée : {image_name}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
else:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}")
|
|
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
if self.is_txt_alt_exists(media, image_name):
|
|
|
|
|
|
media_id = self.is_txt_alt_exists(media, image_name)
|
|
|
|
|
|
self.update_data_media(media, media_id)
|
|
|
|
|
|
logger.info(f"✅ Image déjà existante et mise à jour : {image_name}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
|
|
|
|
|
|
except requests.RequestException as e:
|
|
|
|
|
|
logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_media_from_to(self, range_start, range_end=None):
|
2025-05-24 10:34:04 +00:00
|
|
|
|
json_data = self.fetch_all_media_rows(range_start, range_end)
|
|
|
|
|
|
logger.debug(f'{len(json_data)} to process')
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
for media in json_data:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
media_chemin = media['Chemin'].replace("\\", "/")
|
|
|
|
|
|
path = Path(BASE_PATH + media_chemin)
|
2025-05-08 12:15:42 +02:00
|
|
|
|
image_name = path.name
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#first_folder = media['Chemin'].split("\\")[0]
|
|
|
|
|
|
first_folder = media_chemin.split("/")[0]
|
2025-05-08 12:15:42 +02:00
|
|
|
|
watermarked_path = Path(create_watermark_image(str(path)))
|
|
|
|
|
|
watermarked_name = watermarked_path.name
|
|
|
|
|
|
if first_folder == 'Logo':
|
|
|
|
|
|
self.create_and_update_media(media,image_name,path)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
else:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.create_and_update_media(media, watermarked_name, watermarked_path, True)
|
2025-05-18 21:19:20 +02:00
|
|
|
|
try:
|
|
|
|
|
|
os.remove(watermarked_path)
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def is_exists(self, media, image_name):
|
|
|
|
|
|
all_images = self.get_all_images()
|
|
|
|
|
|
name_without_extension, extension = os.path.splitext(image_name)
|
|
|
|
|
|
for image in all_images:
|
|
|
|
|
|
if media['Slug'] == image['slug']:
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
return False
|
2025-05-18 21:19:20 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
def is_txt_alt_exists(self, media, image_name):
|
|
|
|
|
|
all_images = self.get_all_images()
|
|
|
|
|
|
name_without_extension, extension = os.path.splitext(image_name)
|
|
|
|
|
|
for image in all_images:
|
|
|
|
|
|
if media['Slug'] == image['slug']:
|
|
|
|
|
|
if image['alt_text'] == "":
|
|
|
|
|
|
return image['id']
|
|
|
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-05-18 21:19:20 +02:00
|
|
|
|
def get_title_and_alt_text_media(self, media):
|
|
|
|
|
|
sentence = media['Slug'].replace('-', ' ')
|
|
|
|
|
|
sentence = sentence.replace('img', '').strip()
|
|
|
|
|
|
title = sentence.capitalize()
|
|
|
|
|
|
alt_text = title
|
|
|
|
|
|
return title, alt_text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_accent_in_sentence(self, sentence):
|
|
|
|
|
|
words = sentence.split()
|
|
|
|
|
|
new_words = [self.dict_equivalence[word.lower()] if word.lower() in self.dict_equivalence else word for word in words]
|
|
|
|
|
|
new_sentence = " ".join(new_words)
|
|
|
|
|
|
return new_sentence
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def update_data_media(self, media, id_img):
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if media['Nom'] is None or media['Description'] is None:
|
|
|
|
|
|
title, alt_text = self.get_title_and_alt_text_media(media)
|
|
|
|
|
|
else:
|
|
|
|
|
|
title = media['Nom']
|
|
|
|
|
|
alt_text = media['Description']
|
|
|
|
|
|
|
|
|
|
|
|
title = self.update_accent_in_sentence(title)
|
|
|
|
|
|
alt_text = self.update_accent_in_sentence(alt_text)
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
update_data = {
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"title" : title,
|
|
|
|
|
|
"alt_text": alt_text,
|
2025-04-07 12:20:48 +02:00
|
|
|
|
"slug": media['Slug'],
|
|
|
|
|
|
}
|
2025-05-24 10:34:04 +00:00
|
|
|
|
media_chemin = media['Chemin'].replace("\\", "/")
|
|
|
|
|
|
path = Path(BASE_PATH + media_chemin)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
image_name = path.name
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.post(f"media/{id_img}", data=update_data)
|
|
|
|
|
|
"""response = self.wcapi.post(
|
|
|
|
|
|
f"media/{id_img}",
|
2025-04-07 12:20:48 +02:00
|
|
|
|
headers={
|
|
|
|
|
|
"Authorization": f"Basic {self.ath.auth_base64}",
|
2025-05-08 12:15:42 +02:00
|
|
|
|
#"Authorization": f"Basic {self.ath['auth_base64']}",
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"Content-Disposition": f"attachment; filename={image_name}",
|
|
|
|
|
|
"User-Agent": "Mozilla/5.0"
|
2025-04-07 12:20:48 +02:00
|
|
|
|
},
|
|
|
|
|
|
json=update_data,
|
|
|
|
|
|
verify=False
|
2025-05-18 21:19:20 +02:00
|
|
|
|
)"""
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
return response.json()
|
|
|
|
|
|
else:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
def find_id_by_name(self, name):
|
|
|
|
|
|
images = self.get_all_images()
|
|
|
|
|
|
for img in images:
|
|
|
|
|
|
#pprint.pprint(img)
|
|
|
|
|
|
if img['title']['rendered'] == name:
|
|
|
|
|
|
return img['id']
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def find_id_by_slug(self, slug):
|
|
|
|
|
|
images = self.get_all_images()
|
|
|
|
|
|
for img in images:
|
|
|
|
|
|
if img['slug'] == slug:
|
|
|
|
|
|
return img['id']
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_as_slug_dict(self):
|
|
|
|
|
|
all_slug_dict = {}
|
|
|
|
|
|
images = self.get_all_images()
|
|
|
|
|
|
for img in images:
|
|
|
|
|
|
all_slug_dict[img['id']] = img['slug']
|
|
|
|
|
|
return all_slug_dict
|
|
|
|
|
|
|
|
|
|
|
|
def delete_media_by_slug(self, slug):
|
|
|
|
|
|
images = self.get_all_images()
|
|
|
|
|
|
for img in images:
|
|
|
|
|
|
if img['slug'] == slug:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
self.wcapi.delete(f"media/{img['id']}", params = {"force": True})
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def get_all_images(self):
|
|
|
|
|
|
"""Récupère toutes les images en gérant la pagination"""
|
|
|
|
|
|
all_images = []
|
|
|
|
|
|
page = 1
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#print(f"self.ath.auth_base64 = {self.ath.auth_base64}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
while True:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"""response = self.wcapi.get(f"wp-json/wp/v2/media?per_page=100&page={page}",
|
|
|
|
|
|
headers={"Authorization": f"Basic {self.ath.auth_base64}",
|
|
|
|
|
|
"User-Agent": "Mozilla/5.0"},
|
2025-04-07 12:20:48 +02:00
|
|
|
|
verify=False
|
2025-05-18 21:19:20 +02:00
|
|
|
|
)"""
|
|
|
|
|
|
response = self.wcapi.get("media", params={"per_page": 100, "page": page})
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#print(f"response.status_code = {response.status_code}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
images = response.json()
|
|
|
|
|
|
if not images:
|
|
|
|
|
|
break
|
|
|
|
|
|
all_images.extend(images)
|
2025-05-18 21:19:20 +02:00
|
|
|
|
page += 1
|
2025-04-07 12:20:48 +02:00
|
|
|
|
return all_images
|
|
|
|
|
|
|
|
|
|
|
|
def delete_images(self, images):
|
|
|
|
|
|
"""Supprime toutes les images récupérées"""
|
|
|
|
|
|
for img in images:
|
|
|
|
|
|
img_id = img['id']
|
2025-05-18 21:19:20 +02:00
|
|
|
|
|
|
|
|
|
|
response = self.wcapi.delete(f"media/{img_id}")
|
|
|
|
|
|
"""delete_url = f"media/{img_id}?force=true"
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
response = requests.delete(delete_url,
|
2025-05-18 21:19:20 +02:00
|
|
|
|
headers={"Authorization": f"Basic {self.ath.auth_base64}",
|
|
|
|
|
|
"User-Agent": "Mozilla/5.0"},
|
2025-05-08 12:15:42 +02:00
|
|
|
|
#{"Authorization": f"Basic {self.ath['auth_base64']}"},
|
2025-05-18 21:19:20 +02:00
|
|
|
|
verify=False)"""
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if response.status_code in [200, 410]: # 410 = déjà supprimé
|
|
|
|
|
|
print(f"Image {img_id} supprimée.")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all_images(self):
|
|
|
|
|
|
images = self.get_all_images()
|
|
|
|
|
|
for img in images:
|
|
|
|
|
|
img_id = img['id']
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.delete(f"media/{img_id}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
if response.status_code in [200, 410]: # 410 = déjà supprimé
|
|
|
|
|
|
print(f"Image {img_id} supprimée.")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
|
|
|
|
|
|
|
|
|
|
|
|
def assign_image_logo(self):
|
|
|
|
|
|
images = self.get_all_images()
|
|
|
|
|
|
for img in images:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if img['slug'] == "img-logo-lescreationsdemissbleue":
|
2025-04-07 12:20:48 +02:00
|
|
|
|
data = {
|
|
|
|
|
|
"site_logo":img['id'],
|
|
|
|
|
|
"site_icon" : img['id']
|
|
|
|
|
|
}
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.post("settings", data=data)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
print("Logo mis à jour avec succès !")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"Erreur lors de la mise à jour du logo : {response.text}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CategoryManager(OdsReader):
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def __init__(self, wcapi, ath, filename_ods, medias=None):
|
|
|
|
|
|
super().__init__(filename_ods)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
self.ath = ath
|
|
|
|
|
|
self.medias = medias
|
|
|
|
|
|
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
|
|
|
|
|
|
self.error_log = []
|
|
|
|
|
|
self.headers = {
|
|
|
|
|
|
"Authorization": f"Basic {self.ath.auth_base64}",
|
|
|
|
|
|
"Content-Type": "application/json"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def find_id_by_slug(self, slug):
|
|
|
|
|
|
response = self.wcapi.get("products/categories/",params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
categories = response.json()
|
|
|
|
|
|
for cat in categories:
|
|
|
|
|
|
if cat['slug'] == slug:
|
|
|
|
|
|
return cat['id']
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
def find_id_by_name(self, name):
|
|
|
|
|
|
response = self.wcapi.get("products/categories/",params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
categories = response.json()
|
|
|
|
|
|
for cat in categories:
|
|
|
|
|
|
if cat['name'] == name:
|
|
|
|
|
|
return cat['id']
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def create_category(self, name, description, slug):
|
|
|
|
|
|
category_data = {
|
|
|
|
|
|
"name": name,
|
|
|
|
|
|
"description": description,
|
|
|
|
|
|
"slug":slug
|
|
|
|
|
|
}
|
|
|
|
|
|
if self.find_id_by_slug(slug):
|
2025-05-08 12:15:42 +02:00
|
|
|
|
logger.debug(f"Catégorie contenant comme slug '{slug}' existe déjà")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
else:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
try:
|
|
|
|
|
|
response = self.wcapi.post("products/categories/", category_data)
|
|
|
|
|
|
if response.status_code == 201:
|
|
|
|
|
|
logger.info(f"Catégorie créé avec succès. ID: {response.json()['id']}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"Erreur inattendue lors de l'envoi de la catégorie à WooCommerce: {e}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def assign_parent_category(self, parent_slug, slug):
|
|
|
|
|
|
response = self.wcapi.get("products/categories/",params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
categories = response.json()
|
|
|
|
|
|
for cat in categories:
|
|
|
|
|
|
parent_id = self.find_id_by_parent_slug(parent_slug)
|
|
|
|
|
|
if parent_id:
|
|
|
|
|
|
if cat['slug'] == slug:
|
|
|
|
|
|
self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id})
|
|
|
|
|
|
|
|
|
|
|
|
def find_id_by_parent_slug(self, parent_slug):
|
|
|
|
|
|
response = self.wcapi.get("products/categories/",params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
categories = response.json()
|
|
|
|
|
|
for cat in categories:
|
|
|
|
|
|
if cat['slug'] == parent_slug:
|
|
|
|
|
|
return cat['id']
|
|
|
|
|
|
|
|
|
|
|
|
def find_media_id_by_slug(self, media_slug):
|
|
|
|
|
|
for id, slug in self.medias.items():
|
|
|
|
|
|
if media_slug == slug:
|
|
|
|
|
|
return id
|
|
|
|
|
|
|
|
|
|
|
|
def update_media_id_for_category(self, media_id, cat_id):
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.get(f"media/{media_id}", params={"per_page": 1, "page": 1})
|
2025-04-07 12:20:48 +02:00
|
|
|
|
update_category_data = {
|
|
|
|
|
|
"image" : {'id':media_id},
|
|
|
|
|
|
}
|
|
|
|
|
|
self.wcapi.put(f"products/categories/{cat_id}", update_category_data)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def update_data_categories(self, search_value=None):
|
|
|
|
|
|
if search_value:
|
|
|
|
|
|
json_data = self.get_category_line_by_value(search_value)
|
|
|
|
|
|
else:
|
|
|
|
|
|
json_data = self.get_all_category_lines()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
for category in json_data:
|
|
|
|
|
|
self.create_category(category['Nom'], category['Description'], category['Slug'])
|
|
|
|
|
|
cat_id = self.find_id_by_slug(category['Slug'])
|
|
|
|
|
|
media_id = self.find_media_id_by_slug(category['Media Slug'])
|
|
|
|
|
|
self.assign_parent_category(category['Parent Slug'], category['Slug'])
|
|
|
|
|
|
self.update_media_id_for_category(media_id,cat_id)
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all_category(self):
|
|
|
|
|
|
response = self.wcapi.get(f"products/categories",params={"per_page": 100})
|
|
|
|
|
|
for cat in response.json():
|
|
|
|
|
|
self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
def delete_media_category(self, media_slug):
|
|
|
|
|
|
media_id = self.find_media_id_by_slug(media_slug)
|
2025-05-18 21:19:20 +02:00
|
|
|
|
self.wcapi.delete(f"media/{media_id}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def delete_category_by_id(self, category_id):
|
|
|
|
|
|
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
def delete_category_by_slug(self, slug):
|
|
|
|
|
|
category_id = self.find_id_by_slug(slug)
|
|
|
|
|
|
#print(f"category_id = {category_id}")
|
|
|
|
|
|
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
def get_errors(self):
|
|
|
|
|
|
return print(f"self.error_log = {self.error_log}")
|
|
|
|
|
|
|
|
|
|
|
|
class ProductManager(OdsReader):
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def __init__(self, wcapi, ath, filename_ods, medias=None):
|
|
|
|
|
|
super().__init__(filename_ods)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
self.ath = ath
|
|
|
|
|
|
self.medias = medias
|
|
|
|
|
|
self.error_log = []
|
|
|
|
|
|
self.headers = {
|
|
|
|
|
|
"Authorization": f"Basic {self.ath.auth_base64}",
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
|
|
"User-Agent": "Mozilla/5.0"
|
2025-04-07 12:20:48 +02:00
|
|
|
|
}
|
|
|
|
|
|
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
|
|
|
|
|
|
|
|
|
|
|
|
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
|
|
|
|
|
|
product_data = {
|
|
|
|
|
|
'categories':list_category_id,
|
|
|
|
|
|
'images':list_img_id,
|
|
|
|
|
|
}
|
|
|
|
|
|
self.wcapi.put(f"products/{product_id}", product_data)
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
def update_data_list_category_product(self, product_id, list_category_id):
|
|
|
|
|
|
#print(f"list_category_id = {list_category_id}")
|
|
|
|
|
|
categories = [{'id': cat_id} for cat_id in list_category_id if cat_id is not None]
|
|
|
|
|
|
product_data = {
|
|
|
|
|
|
'categories': categories,
|
|
|
|
|
|
}
|
|
|
|
|
|
self.wcapi.put(f"products/{product_id}", product_data)
|
|
|
|
|
|
|
|
|
|
|
|
def update_data_list_medias_product(self, product_id, list_medias_id):
|
|
|
|
|
|
images = [{'id': media_id} for media_id in list_medias_id if media_id is not None]
|
|
|
|
|
|
product_data = {
|
|
|
|
|
|
'images':images,
|
|
|
|
|
|
}
|
|
|
|
|
|
logger.debug(f"{product_data} - {product_id}")
|
|
|
|
|
|
self.wcapi.put(f"products/{product_id}", product_data)
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def get_list_media_id_for_product(self, medias):
|
|
|
|
|
|
list_media_id_for_product = []
|
|
|
|
|
|
for id, media_slug in self.medias.items():
|
|
|
|
|
|
for media in medias:
|
|
|
|
|
|
if media == media_slug:
|
|
|
|
|
|
image_id = {'id':id}
|
|
|
|
|
|
list_media_id_for_product.append(image_id)
|
|
|
|
|
|
return list_media_id_for_product[::-1]
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def get_list_category_for_product(self, categories):
|
|
|
|
|
|
response = self.wcapi.get("products/categories",params={"per_page": 100})
|
|
|
|
|
|
list_category_for_product = []
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#logger.debug(f"response.json() = {response.json()}")
|
|
|
|
|
|
try :
|
|
|
|
|
|
for category in response.json():
|
|
|
|
|
|
for cat in categories:
|
|
|
|
|
|
if category['name'] == cat:
|
|
|
|
|
|
id_category = {'id':category['id']}
|
|
|
|
|
|
list_category_for_product.append(id_category)
|
|
|
|
|
|
#logger.debug(f"list_category_for_product = {list_category_for_product}")
|
|
|
|
|
|
return list_category_for_product
|
|
|
|
|
|
except requests.exceptions.JSONDecodeError as e:
|
|
|
|
|
|
logger.debug(f"text = {response.text}")
|
|
|
|
|
|
raise e
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def find_product_by_id(self, id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{id}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
product = response.json()
|
|
|
|
|
|
return product
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def find_id_by_slug(self, slug):
|
2025-05-24 10:34:04 +00:00
|
|
|
|
response = self.wcapi.get("products",params={"slug": slug})
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
products = response.json()
|
|
|
|
|
|
for pro in products:
|
|
|
|
|
|
if pro['slug'] == slug:
|
|
|
|
|
|
return pro['id']
|
2025-05-24 10:34:04 +00:00
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
"""def find_id_by_slug(self, slug):
|
|
|
|
|
|
response = self.wcapi.get("products/",params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
products = response.json()
|
|
|
|
|
|
for pro in products:
|
|
|
|
|
|
if pro['slug'] == slug:
|
|
|
|
|
|
return pro['id']"""
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def find_media_id_by_slug(self, media_slug):
|
|
|
|
|
|
for id, slug in self.medias.items():
|
|
|
|
|
|
if media_slug == slug:
|
|
|
|
|
|
return id
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
def is_exist_attribute_in_one_product(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response:
|
|
|
|
|
|
products_info = response.json()
|
|
|
|
|
|
if products_info['attributes']:
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_exist_tabs_in_one_product(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response:
|
|
|
|
|
|
#logger.info(pprint.pformat(response.json()))
|
|
|
|
|
|
is_exist_tab = False
|
|
|
|
|
|
products_info = response.json()
|
|
|
|
|
|
for data in products_info['meta_data']:
|
|
|
|
|
|
if data.get('key') == "wb_custom_tabs" and not data.get('value'):
|
|
|
|
|
|
is_exist_tab = True
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
if is_exist_tab:
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
"""def get_product_data_by_api(self, product_id, product_datas):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
products_info = response.json()
|
|
|
|
|
|
new_datas = {}
|
|
|
|
|
|
for key, data in product_datas.items:
|
|
|
|
|
|
if products_info[key] == data:
|
|
|
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
|
|
|
products_info[key] = data
|
|
|
|
|
|
new_datas[key] = data
|
|
|
|
|
|
|
|
|
|
|
|
self.wcapi.put(f"products/{product_id}", new_datas)"""
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_data_by_api(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
products_info = response.json()
|
|
|
|
|
|
return products_info
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def create_tabs_from_custom_dict(self, product_id, product):
|
|
|
|
|
|
product_tabs_data = {}
|
|
|
|
|
|
list_product_tabs_data = []
|
|
|
|
|
|
x = 1
|
|
|
|
|
|
for key in product.keys():
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description": #or key == "Allergènes":
|
2025-04-07 12:20:48 +02:00
|
|
|
|
product_tabs_data['title'] = key
|
|
|
|
|
|
product_tabs_data['content'] = product[key]
|
|
|
|
|
|
product_tabs_data['nickname'] = ''
|
|
|
|
|
|
product_tabs_data['position'] = x
|
|
|
|
|
|
product_tabs_data['tab_type'] = 'local'
|
|
|
|
|
|
list_product_tabs_data.append(product_tabs_data)
|
|
|
|
|
|
product_tabs_data = {}
|
|
|
|
|
|
x += 1
|
|
|
|
|
|
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
meta_data = []
|
|
|
|
|
|
meta_data.append(
|
|
|
|
|
|
{'key': 'wb_custom_tabs', 'value': list_product_tabs_data}
|
2025-05-08 12:15:42 +02:00
|
|
|
|
)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
meta_data_data = {
|
|
|
|
|
|
'meta_data': meta_data
|
|
|
|
|
|
}
|
|
|
|
|
|
res = self.wcapi.post(f"products/{product_id}", meta_data_data)
|
|
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
print(f"error - {product_id} - {response.status_code}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
def create_product(self, product_data):
|
|
|
|
|
|
logger.debug(f"ProductManager::create_product called (product_data = {product_data})")
|
|
|
|
|
|
if product_data is None:
|
|
|
|
|
|
logger.info(f"ProductManager::create_product called no product_data supplied - ignored")
|
|
|
|
|
|
return None
|
2025-05-08 12:15:42 +02:00
|
|
|
|
try:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
product_id = self.find_id_by_slug(product_data['slug'])
|
|
|
|
|
|
if product_id:
|
|
|
|
|
|
return product_id
|
2025-05-08 12:15:42 +02:00
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
response = self.wcapi.post("products/", product_data)
|
|
|
|
|
|
if response.status_code == 201:
|
|
|
|
|
|
logger.debug(f"ProductManager::create_product product created {response.json()['id']} - end")
|
|
|
|
|
|
return response.json()['id']
|
2025-05-08 12:15:42 +02:00
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
logger.error(f"Erreur lors de la création du produit. {product_data['name']} Code: {response.status_code}, Message: {response.text}")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
# Le produit n'a pas été créé, mais il y a une réponse avec un code d'erreur
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"Erreur inattendue lors de l'envoi du produit à WooCommerce: {e}")
|
2025-05-24 10:34:04 +00:00
|
|
|
|
logger.debug(f"ProductManager::create_product - end")
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def update_data_product(self, product_data, categories, medias, json_data):
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
#json_data = self.get_all_product_lines()
|
2025-05-24 10:34:04 +00:00
|
|
|
|
for field in json_data:
|
|
|
|
|
|
#logger.info(f"update_data_product = {product}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
product_id = self.find_id_by_slug(product_data['slug'])
|
|
|
|
|
|
list_category_id = self.get_list_category_for_product(categories)
|
|
|
|
|
|
list_img_id = self.get_list_media_id_for_product(medias)
|
|
|
|
|
|
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
|
|
|
|
|
|
|
|
|
|
|
|
def update_data_product_by_slug(self, slug):
|
|
|
|
|
|
json_data = self.get_all_product_lines()
|
|
|
|
|
|
for product in json_data:
|
|
|
|
|
|
if product['Slug'] == slug:
|
|
|
|
|
|
self.create_product(product)
|
|
|
|
|
|
product_id = self.find_id_by_slug(product['Slug'])
|
|
|
|
|
|
list_category_id = self.get_list_category_for_product(product['Catégories'])
|
|
|
|
|
|
list_img_id = self.get_list_media_id_for_product(product['Media Slugs'])
|
|
|
|
|
|
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_products(self):
|
|
|
|
|
|
"""Récupère tous les produits en gérant la pagination"""
|
|
|
|
|
|
all_products = []
|
|
|
|
|
|
page = 1
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
response = self.wcapi.get("products", params={"per_page": 100, "page": page})
|
|
|
|
|
|
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
|
print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}")
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
products = response.json()
|
|
|
|
|
|
if not products: # Si la page est vide, on arrête la boucle
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
all_products.extend(products)
|
|
|
|
|
|
page += 1 # On passe à la page suivante
|
|
|
|
|
|
|
|
|
|
|
|
return all_products
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
def get_all_categories_as_list(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response:
|
|
|
|
|
|
product = response.json()
|
|
|
|
|
|
return product['categories']
|
|
|
|
|
|
else:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_medias_as_list(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response:
|
|
|
|
|
|
product = response.json()
|
|
|
|
|
|
return product['images']
|
|
|
|
|
|
else:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_attributes_as_list(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response:
|
|
|
|
|
|
product = response.json()
|
|
|
|
|
|
return product["attributes"]
|
|
|
|
|
|
else:
|
|
|
|
|
|
[]
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def delete_product(self):
|
|
|
|
|
|
json_data = self.get_all_product_lines()
|
|
|
|
|
|
for product in json_data:
|
|
|
|
|
|
list_products = self.wcapi.get(f"products/")
|
|
|
|
|
|
for pro in list_products.json():
|
|
|
|
|
|
if product['Nom'] == pro['name']:
|
|
|
|
|
|
self.wcapi.delete(f"products/{pro['id']}")
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all_product(self):
|
|
|
|
|
|
products = self.get_all_products()
|
|
|
|
|
|
if products:
|
|
|
|
|
|
for pro in products:
|
|
|
|
|
|
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
def delete_media_product(self, media_slug):
|
|
|
|
|
|
media_id = self.find_media_id_by_slug(media_slug)
|
2025-05-18 21:19:20 +02:00
|
|
|
|
self.wcapi.delete(f"media/{media_id['id']}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def delete_product_by_id(self, product_id):
|
|
|
|
|
|
self.wcapi.delete(f"products/{product_id}", params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
def delete_product_by_slug(self, slug):
|
|
|
|
|
|
product_id = self.find_id_by_slug(slug)
|
|
|
|
|
|
self.wcapi.delete(f"products/{product_id}", params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_string(text):
|
|
|
|
|
|
return unicodedata.normalize("NFKC", text).strip().lower()
|
|
|
|
|
|
|
|
|
|
|
|
def tab_exists(self, product_id, name_tab):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
response_json = self.wcapi.get(f"products/{product_id}").json()
|
|
|
|
|
|
for meta_data in response_json['meta_data']:
|
|
|
|
|
|
for key_meta_data, value_meta_data in meta_data.items():
|
|
|
|
|
|
if key_meta_data == "value":
|
|
|
|
|
|
if isinstance(value_meta_data, list):
|
|
|
|
|
|
for tab in value_meta_data:
|
|
|
|
|
|
if name_tab == tab['title']:
|
|
|
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
class AttributeManager(OdsReader):
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def __init__(self, wcapi, filename_ods):
|
|
|
|
|
|
super().__init__(filename_ods)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
|
|
|
|
|
|
def get_attributes(self):
|
|
|
|
|
|
attributes = self.wcapi.get(f"products/attributes").json()
|
|
|
|
|
|
one_attribute = self.wcapi.get(f"products/attributes/1/terms").json()
|
|
|
|
|
|
return attributes
|
|
|
|
|
|
|
|
|
|
|
|
def get_by_name(self, name):
|
|
|
|
|
|
attributes = self.wcapi.get(f"products/attributes").json()
|
|
|
|
|
|
for attr in attributes:
|
|
|
|
|
|
if attr['name'] == name:
|
|
|
|
|
|
attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json()
|
|
|
|
|
|
return attribute
|
|
|
|
|
|
|
|
|
|
|
|
def get_list_name_data(self):
|
|
|
|
|
|
list_name_data = []
|
|
|
|
|
|
json_data = self.get_all_attribute_and_tab_lines()
|
|
|
|
|
|
for item in json_data:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
#if item['Onglet'].strip() == "Informations Complémentaires":
|
|
|
|
|
|
if item['Onglet'] == "Informations Complémentaires":
|
2025-04-07 12:20:48 +02:00
|
|
|
|
list_name_data.append(item['Nom'])
|
|
|
|
|
|
return list_name_data
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def create(self, search_value=None):
|
|
|
|
|
|
if search_value:
|
|
|
|
|
|
features_json_data = self.get_attribute_and_tab_lines(search_value)
|
|
|
|
|
|
else:
|
|
|
|
|
|
features_json_data = self.get_all_attribute_and_tab_lines()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
for item in features_json_data:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
#if item['Onglet'].strip() == "Informations Complémentaires":
|
|
|
|
|
|
if item['Onglet'] == "Informations Complémentaires":
|
2025-04-07 12:20:48 +02:00
|
|
|
|
attribute_data = {
|
|
|
|
|
|
'name' : item["Nom"]
|
|
|
|
|
|
}
|
|
|
|
|
|
self.wcapi.post(f"products/attributes", attribute_data)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def get_term(self, search_value=None):
|
2025-04-07 12:20:48 +02:00
|
|
|
|
term_dict = {}
|
2025-05-08 12:15:42 +02:00
|
|
|
|
if search_value:
|
|
|
|
|
|
term_json_data = self.get_attribute_and_tab_lines(search_value)
|
|
|
|
|
|
else:
|
|
|
|
|
|
term_json_data = self.get_all_attribute_and_tab_lines()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
for item in term_json_data:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
list_item = []
|
2025-05-18 21:19:20 +02:00
|
|
|
|
#if item['Onglet'].strip() == "Informations Complémentaires":
|
|
|
|
|
|
if item['Onglet'] == "Informations Complémentaires":
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if "," in item["Valeurs"]:
|
|
|
|
|
|
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
|
|
|
|
|
|
else:
|
|
|
|
|
|
item['Valeurs'].strip()
|
|
|
|
|
|
if list_item:
|
|
|
|
|
|
term_dict[item['Nom']] = list_item
|
|
|
|
|
|
else:
|
|
|
|
|
|
term_dict[item['Nom']] = item['Valeurs']
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
return term_dict
|
|
|
|
|
|
|
|
|
|
|
|
def configure_term(self):
|
|
|
|
|
|
term_dict = self.get_term()
|
|
|
|
|
|
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
attributes = response.json()
|
|
|
|
|
|
for attribute in attributes:
|
|
|
|
|
|
for name, value in term_dict.items():
|
|
|
|
|
|
if attribute['name'] == name:
|
|
|
|
|
|
if isinstance(value, list):
|
|
|
|
|
|
for v in value:
|
|
|
|
|
|
term = {
|
|
|
|
|
|
'name' : v
|
|
|
|
|
|
}
|
|
|
|
|
|
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
|
|
|
|
|
|
else:
|
|
|
|
|
|
term = {
|
|
|
|
|
|
'name' : value
|
|
|
|
|
|
}
|
|
|
|
|
|
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
def create_for_product(self, product_id, name, value, variation=False):
|
2025-04-07 12:20:48 +02:00
|
|
|
|
data_attribute = {
|
|
|
|
|
|
'name': name,
|
|
|
|
|
|
'options':value
|
|
|
|
|
|
}
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#logger.info(f"name = {name} - value = {value}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
#list_product_tabs_data.append(data_tab)
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response.status_code == 200:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
print("passe d'abord ici 1")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
product_meta_data = response.json()
|
|
|
|
|
|
existing_attributes_data = product_meta_data.get("attributes", [])
|
|
|
|
|
|
already_exist = False
|
2025-05-24 10:34:04 +00:00
|
|
|
|
"""for data in existing_attributes_data:
|
2025-04-07 12:20:48 +02:00
|
|
|
|
for key_data, value_data in data.items():
|
|
|
|
|
|
if key_data == "value":
|
|
|
|
|
|
if isinstance(value_data, list):
|
|
|
|
|
|
for value in value_data:
|
|
|
|
|
|
if value['name'] == name:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
already_exist = True"""
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
if already_exist == False:
|
|
|
|
|
|
found = False
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#print(f"attributes_data = {existing_attributes_data}")
|
|
|
|
|
|
#print(f"data_attribute = {data_attribute}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
for attribute in existing_attributes_data:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#logger.info(f"name = {name} - value = {value}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if attribute["name"] == name:
|
|
|
|
|
|
attribute["options"].append(data_attribute)
|
|
|
|
|
|
found = True
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
|
|
|
|
|
|
if not found:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if value is not None:
|
|
|
|
|
|
value = [v.strip() for v in value.split(",")]
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
logger.info(f"value = {value}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
existing_attributes_data.append({
|
|
|
|
|
|
"name": name,
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"options": value,
|
2025-04-07 12:20:48 +02:00
|
|
|
|
"visible":True,
|
2025-05-08 12:15:42 +02:00
|
|
|
|
"variation": variation,
|
|
|
|
|
|
#"parent_id":product_id
|
2025-04-07 12:20:48 +02:00
|
|
|
|
})
|
|
|
|
|
|
attributes_data = {
|
|
|
|
|
|
'attributes': existing_attributes_data
|
|
|
|
|
|
}
|
2025-05-24 10:34:04 +00:00
|
|
|
|
logger.debug(f"existing_attributes_data = {existing_attributes_data}")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
res = self.wcapi.put(f"products/{product_id}", attributes_data)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print('already_exist')
|
|
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
print(f"error 1")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def delete_all_for_product(self):
|
|
|
|
|
|
response_product = self.wcapi.get(f"products/", params={"per_page": 100})
|
|
|
|
|
|
if response_product.status_code == 200:
|
|
|
|
|
|
products = response_product.json()
|
|
|
|
|
|
for product in products:
|
|
|
|
|
|
existing_attributes_data = product.get("attributes", [])
|
|
|
|
|
|
if existing_attributes_data == []:
|
|
|
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
|
|
|
attribute_data = {
|
|
|
|
|
|
'attributes': []
|
|
|
|
|
|
}
|
|
|
|
|
|
res = self.wcapi.post(f"products/{product['id']}", attribute_data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all_term(self):
|
|
|
|
|
|
response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100})
|
|
|
|
|
|
if response_attribute.status_code == 200:
|
|
|
|
|
|
attributes = response_attribute.json()
|
|
|
|
|
|
for attribute in attributes:
|
|
|
|
|
|
response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100})
|
|
|
|
|
|
if response_attribute_term.status_code == 200:
|
|
|
|
|
|
attributes_term = response_attribute_term.json()
|
|
|
|
|
|
for term in attributes_term:
|
|
|
|
|
|
self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all(self):
|
|
|
|
|
|
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
attributes = response.json()
|
|
|
|
|
|
for attribute in attributes:
|
|
|
|
|
|
self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TabManager(OdsReader):
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def __init__(self, wcapi,filename_ods):
|
|
|
|
|
|
super().__init__(filename_ods)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def get_list_name_data(self, search_value=None):
|
2025-04-07 12:20:48 +02:00
|
|
|
|
list_name_data = []
|
2025-05-08 12:15:42 +02:00
|
|
|
|
"""if search_value:
|
|
|
|
|
|
json_data = self.get_attribute_and_tab_lines(search_value)
|
|
|
|
|
|
else:"""
|
2025-04-07 12:20:48 +02:00
|
|
|
|
json_data = self.get_all_attribute_and_tab_lines()
|
|
|
|
|
|
for item in json_data:
|
|
|
|
|
|
if item['Onglet'].strip() != "Informations Complémentaires":
|
|
|
|
|
|
list_name_data.append(item['Nom'])
|
|
|
|
|
|
return list_name_data
|
|
|
|
|
|
|
|
|
|
|
|
def create_or_update_for_product(self, product_id, tabs):
|
|
|
|
|
|
position = 1
|
|
|
|
|
|
for title, content in tabs.items():
|
|
|
|
|
|
position += 1
|
|
|
|
|
|
data_tab = {
|
|
|
|
|
|
'title': title,
|
|
|
|
|
|
'content':content,
|
|
|
|
|
|
'nickname':'',
|
|
|
|
|
|
'position':position,
|
|
|
|
|
|
'tab_type': 'local'
|
|
|
|
|
|
}
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#logger.debug(response)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
if response.status_code == 200:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
print("passe d'abord ici 2")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
product_meta_data = response.json()
|
|
|
|
|
|
existing_meta_data = product_meta_data.get("meta_data", [])
|
|
|
|
|
|
already_exist = False
|
|
|
|
|
|
for data in existing_meta_data:
|
|
|
|
|
|
for key_data, value_data in data.items():
|
|
|
|
|
|
if key_data == "value":
|
|
|
|
|
|
if isinstance(value_data, list):
|
|
|
|
|
|
for value in value_data:
|
|
|
|
|
|
if value['title'] == title:
|
|
|
|
|
|
already_exist = True
|
|
|
|
|
|
if already_exist == False:
|
|
|
|
|
|
found = False
|
|
|
|
|
|
for meta in existing_meta_data:
|
|
|
|
|
|
if meta["key"] == "wb_custom_tabs":
|
|
|
|
|
|
meta["value"].append(data_tab)
|
|
|
|
|
|
found = True
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
|
|
|
|
|
|
if not found:
|
|
|
|
|
|
existing_meta_data.append({
|
|
|
|
|
|
"key": "wb_custom_tabs",
|
|
|
|
|
|
"value": [data_tab]
|
|
|
|
|
|
})
|
|
|
|
|
|
meta_data_data = {
|
|
|
|
|
|
'meta_data': existing_meta_data
|
|
|
|
|
|
}
|
|
|
|
|
|
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
|
|
|
|
|
|
else:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
#print('else')
|
2025-04-07 12:20:48 +02:00
|
|
|
|
data_tab = {
|
|
|
|
|
|
'content':content,
|
|
|
|
|
|
}
|
|
|
|
|
|
meta_data_data = {
|
|
|
|
|
|
'meta_data': existing_meta_data
|
|
|
|
|
|
}
|
|
|
|
|
|
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
|
|
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
print(f"error 2")
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def delete_by_product_id(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
product_meta_data = response.json()
|
|
|
|
|
|
existing_meta_data = product_meta_data.get("meta_data", [])
|
|
|
|
|
|
if existing_meta_data == []:
|
|
|
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
|
|
|
meta_data = {
|
|
|
|
|
|
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
|
|
|
|
|
|
}
|
|
|
|
|
|
res = self.wcapi.post(f"products/{product_id}", meta_data)
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all(self):
|
|
|
|
|
|
response = self.wcapi.get(f"products/", params={"per_page": 100})
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
product_meta_data = response.json()
|
|
|
|
|
|
for product in product_meta_data:
|
|
|
|
|
|
existing_meta_data = product.get("meta_data", [])
|
|
|
|
|
|
if existing_meta_data == []:
|
|
|
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
|
|
|
meta_data = {
|
|
|
|
|
|
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
|
|
|
|
|
|
}
|
|
|
|
|
|
res = self.wcapi.post(f"products/{product['id']}", meta_data)
|
|
|
|
|
|
|
|
|
|
|
|
class VariationsManager(OdsReader):
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def __init__(self, wcapi, filename_ods):
|
|
|
|
|
|
super().__init__(filename_ods)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
|
|
|
|
|
|
def get_attribute_id(self, product_data):
|
|
|
|
|
|
response = self.wcapi.get(f"products/attributes")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
attributes = response.json()
|
|
|
|
|
|
for key, value in product_data.items():
|
|
|
|
|
|
for attr_key, attr_value in attributes.items():
|
|
|
|
|
|
if attr_value['name'] == key:
|
|
|
|
|
|
attribute_id = attr_value['id']
|
|
|
|
|
|
return attribute_id
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
def update_product_attributes_merged(self, wcapi, product_id, attribute_name, new_options):
|
|
|
|
|
|
"""
|
|
|
|
|
|
Met à jour l'attribut d'un produit WooCommerce en ajoutant de nouvelles options,
|
|
|
|
|
|
sans écraser les autres attributs existants.
|
|
|
|
|
|
|
|
|
|
|
|
:param wcapi: Instance API WooCommerce (wcapi = API(...))
|
|
|
|
|
|
:param product_id: ID du produit à mettre à jour
|
|
|
|
|
|
:param attribute_name: Nom de l'attribut à enrichir (ex: "Parfums")
|
|
|
|
|
|
:param new_options: Liste des nouvelles valeurs à ajouter (ex: ["Lavande", "Citron"])
|
|
|
|
|
|
"""
|
|
|
|
|
|
# Nettoyer les nouvelles options
|
|
|
|
|
|
new_options = [opt.strip() for opt in new_options.split('|') if opt.strip()]
|
|
|
|
|
|
# 1. Récupérer le produit existant
|
|
|
|
|
|
response = wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
|
print(f"❌ Impossible de récupérer le produit {product_id}")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
product = response.json()
|
|
|
|
|
|
attributes = product.get("attributes", [])
|
|
|
|
|
|
|
|
|
|
|
|
# 2. Chercher l'attribut ciblé
|
|
|
|
|
|
found = False
|
|
|
|
|
|
for attr in attributes:
|
|
|
|
|
|
if attr["name"].lower() == attribute_name.lower():
|
|
|
|
|
|
existing_options = attr.get("options", [])
|
|
|
|
|
|
merged_options = list(set(existing_options + new_options))
|
|
|
|
|
|
attr["options"] = merged_options
|
|
|
|
|
|
attr["variation"] = True
|
|
|
|
|
|
attr["visible"] = True
|
|
|
|
|
|
attr["parent_id"] = product_id
|
|
|
|
|
|
attr["manage_stock"] = "parent"
|
|
|
|
|
|
found = True
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
# 3. Si l'attribut n'existe pas, on l'ajoute
|
|
|
|
|
|
if not found:
|
|
|
|
|
|
attributes.append({
|
|
|
|
|
|
"name": attribute_name,
|
|
|
|
|
|
"variation": True,
|
|
|
|
|
|
"visible": True,
|
|
|
|
|
|
"options": new_options
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
# 4. Mettre à jour le produit avec les attributs fusionnés
|
|
|
|
|
|
update_data = {
|
|
|
|
|
|
"attributes": attributes
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
update_res = wcapi.put(f"products/{product_id}", update_data)
|
|
|
|
|
|
if update_res.status_code == 200:
|
|
|
|
|
|
print(f"✅ Attribut '{attribute_name}' mis à jour avec succès.")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"❌ Erreur lors de la mise à jour : {update_res.status_code}")
|
|
|
|
|
|
print(update_res.json())
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def create_variations_products(self, product_id, product_data):
|
2025-05-18 21:19:20 +02:00
|
|
|
|
products_lines = self.get_all_product_lines()
|
2025-05-08 12:15:42 +02:00
|
|
|
|
product_line = self.get_product_by_slug_from_ods(product_data['slug'])
|
2025-05-18 21:19:20 +02:00
|
|
|
|
parfums = None
|
|
|
|
|
|
volumes = None
|
|
|
|
|
|
price_per_product_variable = None
|
|
|
|
|
|
if product_line['Type'] == "Variable":
|
|
|
|
|
|
if product_line['Choix parfums'] is not None:
|
|
|
|
|
|
parfums = [p.strip() for p in product_line['Choix parfums'].split(",")]
|
|
|
|
|
|
print(f"parfums = {parfums}")
|
|
|
|
|
|
if product_line['Volume'] is not None:
|
|
|
|
|
|
volumes = [v.strip() for v in product_line['Volume'].split(",")]
|
|
|
|
|
|
print(f"volumes = {volumes}")
|
|
|
|
|
|
|
|
|
|
|
|
"""if product_line['Prix pour'] is not None:
|
|
|
|
|
|
#products = [v.strip() for v in product_line['Prix pour'].split(",")]
|
|
|
|
|
|
products = product_line['Prix pour'].split(",")
|
|
|
|
|
|
price_per_product_variable = {}
|
|
|
|
|
|
for p in products:
|
|
|
|
|
|
name, price = p.split("=")
|
|
|
|
|
|
price_per_product_variable[name.strip()] = price.strip()
|
|
|
|
|
|
|
|
|
|
|
|
pprint.pprint(price_per_product_variable)"""
|
|
|
|
|
|
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
#existing_product = response.json()
|
|
|
|
|
|
#self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums)
|
|
|
|
|
|
if parfums is not None:
|
|
|
|
|
|
for parfum in parfums:
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'attributes': [
|
|
|
|
|
|
{
|
|
|
|
|
|
'name': 'Choix parfums',
|
|
|
|
|
|
'option': parfum
|
|
|
|
|
|
}
|
|
|
|
|
|
],
|
|
|
|
|
|
'manage_stock': False,
|
|
|
|
|
|
'in_stock':True,
|
|
|
|
|
|
'regular_price': product_data['price'],
|
|
|
|
|
|
}
|
|
|
|
|
|
print(f"Posting variation: {data}")
|
|
|
|
|
|
response = self.wcapi.post(f"products/{product_id}/variations", data)
|
|
|
|
|
|
logger.info(f"Variation de parfums a bien été créé")
|
|
|
|
|
|
if volumes is not None:
|
|
|
|
|
|
for volume in volumes:
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'attributes': [
|
|
|
|
|
|
{
|
|
|
|
|
|
'name': 'Volume',
|
|
|
|
|
|
'option': volume
|
|
|
|
|
|
}
|
|
|
|
|
|
],
|
|
|
|
|
|
'manage_stock': False,
|
|
|
|
|
|
'in_stock':True,
|
|
|
|
|
|
'regular_price': product_data['price'],
|
|
|
|
|
|
}
|
|
|
|
|
|
print(f"Posting variation: {data}")
|
|
|
|
|
|
result = self.wcapi.post(f"products/{product_id}/variations", data)
|
|
|
|
|
|
logger.info(f"Variation de volumes a bien été créé")
|
|
|
|
|
|
"""if price_per_product_variable is not None:
|
|
|
|
|
|
for name, price in price_per_product_variable.items():
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'attributes': [
|
|
|
|
|
|
{
|
|
|
|
|
|
'name': 'Volume',
|
|
|
|
|
|
'option': name
|
|
|
|
|
|
}
|
|
|
|
|
|
],
|
|
|
|
|
|
'manage_stock': False,
|
|
|
|
|
|
'in_stock':True,
|
|
|
|
|
|
'regular_price': price,
|
|
|
|
|
|
}
|
|
|
|
|
|
result = self.wcapi.post(f"products/{product_id}/variations", data)
|
|
|
|
|
|
logger.info(f"Variation de prix selon objet bien créé")"""
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.exception(f"Erreur lors de la création du produit de variation : {e}")
|
|
|
|
|
|
#logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}")
|
|
|
|
|
|
"""
|
|
|
|
|
|
for product_line_key, products_line_value in product_line.items():
|
|
|
|
|
|
if product_line_key == "Choix parfums":
|
2025-05-08 12:15:42 +02:00
|
|
|
|
name_attribute = product_line_key
|
|
|
|
|
|
parfums = products_line_value
|
|
|
|
|
|
if product_line_key == "Type":
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if product_data['type'] == "Variable":
|
2025-05-08 12:15:42 +02:00
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
existing_product = response.json()
|
|
|
|
|
|
self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums)
|
|
|
|
|
|
|
|
|
|
|
|
parfums = [p.strip() for p in parfums.split("|") if p.strip()]
|
|
|
|
|
|
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}/variations")
|
|
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
for parfum in parfums:
|
|
|
|
|
|
data = {
|
|
|
|
|
|
'attributes': [
|
|
|
|
|
|
{
|
|
|
|
|
|
'name': name_attribute,
|
|
|
|
|
|
'option': parfum
|
|
|
|
|
|
}
|
|
|
|
|
|
],
|
|
|
|
|
|
'manage_stock': False,
|
|
|
|
|
|
'in_stock':True,
|
|
|
|
|
|
'regular_price': product_data['price'],
|
|
|
|
|
|
}
|
|
|
|
|
|
print(f"Posting variation: {data}")
|
|
|
|
|
|
result = self.wcapi.post(f"products/{product_id}/variations", data)
|
|
|
|
|
|
print(result.status_code)
|
|
|
|
|
|
pprint.pprint(result.json())
|
|
|
|
|
|
else:
|
2025-05-18 21:19:20 +02:00
|
|
|
|
return False"""
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
class WooCommerceManager(OdsReader):
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager, filename_ods):
|
|
|
|
|
|
super().__init__(filename_ods)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
self.media_manager = media_manager
|
|
|
|
|
|
self.category_manager = category_manager
|
|
|
|
|
|
self.product_manager = product_manager
|
|
|
|
|
|
self.tab_manager = tab_manager
|
|
|
|
|
|
self.attribute_manager = attribute_manager
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.variation_manager = variation_manager
|
|
|
|
|
|
self.filename_ods = filename_ods
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
# access to managers
|
|
|
|
|
|
@property
|
|
|
|
|
|
def mm(self): return self.media_manager
|
|
|
|
|
|
@property
|
|
|
|
|
|
def cm(self): return self.category_manager
|
|
|
|
|
|
@property
|
|
|
|
|
|
def pm(self): return self.product_manager
|
|
|
|
|
|
@property
|
|
|
|
|
|
def tm(self): return self.tab_manager
|
|
|
|
|
|
@property
|
|
|
|
|
|
def am(self): return self.attribute_manager
|
|
|
|
|
|
@property
|
|
|
|
|
|
def vm(self): return self.variation_manager
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def tab_exists(self, product_id, name_tab):
|
|
|
|
|
|
return self.product_manager.tab_exists(product_id, name_tab)
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_tab_details(self):
|
|
|
|
|
|
all_products_json = self.get_all_attribute_and_tab_lines()
|
|
|
|
|
|
all_tabs = self.tab_manager.get_list_name_data()
|
|
|
|
|
|
dict = {}
|
|
|
|
|
|
for product in all_products_json:
|
|
|
|
|
|
line = []
|
|
|
|
|
|
for tab in all_tabs:
|
|
|
|
|
|
line.append([tab, product[tab]])
|
|
|
|
|
|
dict[product["Parfum"]] = line
|
|
|
|
|
|
return dict
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_attributes_details(self):
|
|
|
|
|
|
ret = []
|
2025-05-08 12:15:42 +02:00
|
|
|
|
all_products_json = self.get_all_product_lines()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
all_attributes = self.attribute_manager.get_list_name_data()
|
|
|
|
|
|
for product in all_products_json:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
for attribute in all_attributes:
|
|
|
|
|
|
ret.append([attribute, product[attribute]])
|
2025-04-07 12:20:48 +02:00
|
|
|
|
return ret
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def update_product_tab_by_slug(self, slug):
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(slug)
|
|
|
|
|
|
product = self.product_manager.find_product_by_id(product_id)
|
|
|
|
|
|
products_tab_details = self.get_product_tab_details()
|
|
|
|
|
|
x=1
|
|
|
|
|
|
for value in products_tab_details.values():
|
|
|
|
|
|
for key in products_tab_details.keys():
|
|
|
|
|
|
for title, content in value:
|
|
|
|
|
|
if key:
|
|
|
|
|
|
if key in product['short_description']:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
x=x+1
|
|
|
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
|
|
|
print('no key')
|
|
|
|
|
|
x=1
|
|
|
|
|
|
|
|
|
|
|
|
def update_product_attribute_by_slug(self, slug):
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(slug)
|
2025-05-08 12:15:42 +02:00
|
|
|
|
product_ods = self.get_product_by_slug_from_ods(slug)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
products_attribute_details = self.get_product_attributes_details()
|
|
|
|
|
|
for name, value in products_attribute_details:
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.attribute_manager.create_for_product(product_id=product_id,
|
|
|
|
|
|
name=name, value=value,
|
|
|
|
|
|
variation=self.is_variable(product_ods['Type']))
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def update_product(self):
|
|
|
|
|
|
self.update_product_tab()
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def update_product_by_slug(self, slug):
|
|
|
|
|
|
self.product_manager.update_data_product_by_slug(slug)
|
|
|
|
|
|
self.update_product_tab_by_slug(slug)
|
|
|
|
|
|
#self.update_product_attribute_by_slug(slug)
|
|
|
|
|
|
|
|
|
|
|
|
def create_all_informations(self):
|
|
|
|
|
|
medias = self.media_manager.get_all_as_slug_dict()
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.product_manager.medias = medias
|
|
|
|
|
|
self.process_file(FILENAME_ODS)
|
|
|
|
|
|
self.update_product()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def get_list_category_for_product(self, category):
|
|
|
|
|
|
category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")]
|
|
|
|
|
|
return category_list_by_doc
|
|
|
|
|
|
|
2025-05-18 21:19:20 +02:00
|
|
|
|
def get_list_variable_attributes(self,attributes):
|
|
|
|
|
|
list_variable_attributes_by_doc = [attr.strip() for attr in attributes.split(",")]
|
|
|
|
|
|
return list_variable_attributes_by_doc
|
|
|
|
|
|
|
|
|
|
|
|
def get_list_media_id_for_product(self, product):
|
|
|
|
|
|
#list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")]
|
|
|
|
|
|
list_media_by_doc = []
|
|
|
|
|
|
list_media_by_doc.append(product['Image1'])
|
|
|
|
|
|
list_media_by_doc.append(product['Image2'])
|
|
|
|
|
|
list_media_by_doc.append(product['Image3'])
|
|
|
|
|
|
list_media_by_doc.append(product['Pyramide'])
|
2025-05-24 10:34:04 +00:00
|
|
|
|
list_media_by_doc.append(product['Clp'])
|
2025-04-07 12:20:48 +02:00
|
|
|
|
return list_media_by_doc
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-05-18 21:19:20 +02:00
|
|
|
|
def is_variable(self, name, value):
|
|
|
|
|
|
if name == "Volume" or name =="Choix parfums":
|
|
|
|
|
|
if value is not None:
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
def update_product_attribute(self, attributes, product_data):
|
2025-05-18 21:19:20 +02:00
|
|
|
|
print(f"product_data = {product_data}")
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
|
|
|
|
|
|
for name, value in attributes.items():
|
2025-05-18 21:19:20 +02:00
|
|
|
|
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(name, value))
|
|
|
|
|
|
#self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data))
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def update_product_variations(self, product_data):
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
|
|
|
|
|
|
self.variation_manager.create_variations_products(product_id, product_data)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def update_product_tab(self, product_data):
|
|
|
|
|
|
for product in product_data:
|
|
|
|
|
|
self.update_product_tab_by_id(product['id'])
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_product(self, product_data, attributes, tabs, categories, medias, json_data):
|
|
|
|
|
|
self.product_manager.create_product(product_data=product_data)
|
|
|
|
|
|
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data)
|
|
|
|
|
|
self.update_product_attribute(attributes=attributes, product_data=product_data)
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
|
|
|
|
|
|
#self.update_product_variations(product_data)
|
|
|
|
|
|
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def create_or_update_product(self, product_data, attributes, tabs, categories, medias, json_data):
|
|
|
|
|
|
try:
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
|
2025-05-24 10:34:04 +00:00
|
|
|
|
if product_id:
|
|
|
|
|
|
#self.product_manager.update_data_product(attributes=attributes, product_data=product_data)
|
|
|
|
|
|
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data)
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.product_manager.create_product(product_data=product_data)
|
|
|
|
|
|
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias, json_data=json_data)
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.update_product_attribute(attributes=attributes, product_data=product_data)
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
|
2025-05-24 10:34:04 +00:00
|
|
|
|
#self.update_product_variations(product_data)
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"Erreur lors de la mise à jour du produit: {e}")
|
2025-05-24 10:34:04 +00:00
|
|
|
|
logger.exception(f"Erreur lors de la mise à jour du produit: {e}")
|
|
|
|
|
|
logger.debug()
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def get_product_lines(self, search_value=None):
|
|
|
|
|
|
if search_value:
|
|
|
|
|
|
return self.get_product_line_by_value(search_value)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return self.get_all_product_lines()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
def process_file(self, search_value=None):
|
2025-04-07 12:20:48 +02:00
|
|
|
|
# refresh media cache
|
2025-05-08 12:15:42 +02:00
|
|
|
|
medias = self.media_manager.get_all_as_slug_dict()
|
2025-04-07 12:20:48 +02:00
|
|
|
|
self.product_manager.medias = medias
|
2025-05-08 12:15:42 +02:00
|
|
|
|
# read provided file
|
|
|
|
|
|
products_lines = self.get_product_lines(search_value)
|
|
|
|
|
|
#pprint.pprint(products_lines)
|
|
|
|
|
|
for product_line in products_lines:
|
2025-04-07 12:20:48 +02:00
|
|
|
|
# standard product data
|
|
|
|
|
|
product_data = {
|
|
|
|
|
|
'name' : product_line['Nom'],
|
|
|
|
|
|
'price': product_line['Prix'],
|
|
|
|
|
|
'regular_price': product_line['Prix'],
|
|
|
|
|
|
'stock_quantity': product_line['Stock'],
|
|
|
|
|
|
'manage_stock':True,
|
|
|
|
|
|
'weight':str(product_line['Poids']),
|
|
|
|
|
|
'sku':str(product_line['Numéro de référence']),
|
|
|
|
|
|
'description': product_line['Description'],
|
|
|
|
|
|
'short_description': product_line['Courte Description'],
|
|
|
|
|
|
'slug':product_line['Slug']
|
|
|
|
|
|
}
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if product_line['Promo'] is not None:
|
|
|
|
|
|
product_data['sale_price'] = product_line['Promo']
|
|
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
"""if product_line['Type'] == "parfums":
|
2025-05-08 12:15:42 +02:00
|
|
|
|
product_data['type'] = "variable"
|
2025-05-18 21:19:20 +02:00
|
|
|
|
if product_line['Volume'] is not None:
|
|
|
|
|
|
values_attributes = self.get_list_variable_attributes(product_line['Volume'])
|
|
|
|
|
|
attributes['Volume'] = values_attributes
|
2025-05-08 12:15:42 +02:00
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
product_data['type'] = "simple"""
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
attributes = {
|
|
|
|
|
|
"Temps de combustion" : product_line['Temps de combustion'],
|
|
|
|
|
|
"Type de cire" : product_line['Type de cire'],
|
|
|
|
|
|
"Mèche" : product_line['Mèche'],
|
|
|
|
|
|
"Fabrication" : product_line['Fabrication'],
|
|
|
|
|
|
"Composition" : product_line['Composition'],
|
|
|
|
|
|
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"Parfums" : product_line['Parfums'],
|
|
|
|
|
|
"Volume" : product_line["Volume"]
|
2025-04-07 12:20:48 +02:00
|
|
|
|
}
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
tabs ={
|
|
|
|
|
|
#"Description" : product_line["Description"],
|
|
|
|
|
|
"Conseils d'utilisation" : product_line["Conseils d’utilisation"],
|
|
|
|
|
|
"Précautions articles" : product_line["Précautions articles"],
|
|
|
|
|
|
#"Allergènes" : product_line["Allergènes"]
|
|
|
|
|
|
}
|
|
|
|
|
|
# ... associated categories
|
|
|
|
|
|
categories = self.get_list_category_for_product(product_line['Catégories'])
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
# ... associated medias
|
2025-04-07 12:20:48 +02:00
|
|
|
|
medias = self.get_list_media_id_for_product(product_line['Media Slugs'])
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
# create or update product
|
|
|
|
|
|
self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias)
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_data_as_dict(self, product_line):
|
|
|
|
|
|
product_data = {
|
|
|
|
|
|
'name' : product_line['Nom'],
|
|
|
|
|
|
'price': product_line['Prix'],
|
|
|
|
|
|
'regular_price': product_line['Prix'],
|
|
|
|
|
|
'stock_quantity': product_line['Stock'],
|
|
|
|
|
|
'manage_stock':True,
|
|
|
|
|
|
'weight':str(product_line['Poids']),
|
|
|
|
|
|
'sku':str(product_line['Numéro de référence']),
|
|
|
|
|
|
'description': product_line['Description'],
|
|
|
|
|
|
'short_description': product_line['Courte Description'],
|
|
|
|
|
|
'slug':product_line['Slug']
|
|
|
|
|
|
}
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
if product_line['Promo'] is not None:
|
|
|
|
|
|
product_data['sale_price'] = product_line['Promo']
|
|
|
|
|
|
|
|
|
|
|
|
if product_line['Type'] is not None:
|
|
|
|
|
|
product_data['type'] = "variable"
|
|
|
|
|
|
|
|
|
|
|
|
if product_line['Fabrication sur commande'] is not None:
|
|
|
|
|
|
#backoreded = [b.strip() for b in product_line['Fabrication sur commande'].split(",")]
|
|
|
|
|
|
product_data['backorders'] = "notify"
|
|
|
|
|
|
product_data['backorders_allowed'] = True
|
|
|
|
|
|
product_data['backordered'] = True
|
|
|
|
|
|
|
|
|
|
|
|
if product_line['Status du produit']:
|
|
|
|
|
|
if product_line['Status du produit'] == 'P':
|
|
|
|
|
|
product_data['status'] = 'publish'
|
|
|
|
|
|
logger.info(f"Article publié {product_line['Nom']}")
|
|
|
|
|
|
elif product_line['Status du produit'] == 'B':
|
|
|
|
|
|
product_data['status'] = 'draft'
|
|
|
|
|
|
logger.info(f"Article brouillon {product_line['Nom']}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.debug(f"Article ignoré {product_line['Nom']}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
return product_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_attributes_as_dict(self, product_line):
|
|
|
|
|
|
attributes = {
|
|
|
|
|
|
"Temps de combustion" : product_line['Temps de combustion'],
|
|
|
|
|
|
"Type de cire" : product_line['Type de cire'],
|
|
|
|
|
|
"Mèche" : product_line['Mèche'],
|
|
|
|
|
|
"Fabrication" : product_line['Fabrication'],
|
|
|
|
|
|
"Composition" : product_line['Composition'],
|
|
|
|
|
|
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
|
|
|
|
|
|
#"Parfums" : product_line['Choix parfums']
|
|
|
|
|
|
#"Volume" : product_line["Volume"]
|
|
|
|
|
|
}
|
|
|
|
|
|
if product_line["Volume"]:
|
|
|
|
|
|
attributes["volume"] = product_line["Volume"]
|
|
|
|
|
|
if product_line['Type de cire']:
|
|
|
|
|
|
attributes['Type de cire'] = product_line['Type de cire']
|
|
|
|
|
|
if product_line['Mèche']:
|
|
|
|
|
|
attributes['Mèche'] = product_line['Mèche']
|
|
|
|
|
|
if product_line['Parfums']:
|
|
|
|
|
|
attributes["Parfums"] = product_line['Parfums']
|
|
|
|
|
|
|
|
|
|
|
|
return attributes
|
|
|
|
|
|
|
|
|
|
|
|
def get_product_tabs_as_dict(self, product_line):
|
|
|
|
|
|
tabs ={
|
|
|
|
|
|
#"Description" : product_line["Description"],
|
|
|
|
|
|
"Conseils d'utilisation" : product_line["Conseils d’utilisation"],
|
|
|
|
|
|
"Précautions articles" : product_line["Précautions articles"],
|
|
|
|
|
|
#"Allergènes" : product_line["Allergènes"]
|
|
|
|
|
|
}
|
|
|
|
|
|
return tabs
|
|
|
|
|
|
|
|
|
|
|
|
# --------------------------------- Medias -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def has_product_all_medias(self, product_id, medias):
|
|
|
|
|
|
print(f"medias={medias}")
|
|
|
|
|
|
list_medias_by_api = self.product_manager.get_all_medias_as_list(product_id)
|
|
|
|
|
|
names_in_list_medias_by_api = [media['name'] for media in list_medias_by_api]
|
|
|
|
|
|
|
|
|
|
|
|
if set(medias) != set(names_in_list_medias_by_api):
|
|
|
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
def assign_medias_to_product(self, id, media_slugs):
|
|
|
|
|
|
list_medias_id = []
|
|
|
|
|
|
for slug in media_slugs:
|
|
|
|
|
|
list_medias_id.append(self.media_manager.find_id_by_slug(slug))
|
|
|
|
|
|
logger.info(f"liste des medias id = {list_medias_id}")
|
|
|
|
|
|
self.product_manager.update_data_list_medias_product(id, list_medias_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --------------------------------- Category ----------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def has_product_all_categories(self, product_id, categories):
|
|
|
|
|
|
|
|
|
|
|
|
list_categories_by_api = self.product_manager.get_all_categories_as_list(product_id)
|
|
|
|
|
|
names_in_list_categories_by_api = [category['name'] for category in list_categories_by_api]
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
if set(categories) != set(names_in_list_categories_by_api):
|
|
|
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
|
|
|
return True
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
def assign_categories_to_product(self, id, categorie_slugs):
|
|
|
|
|
|
list_categories_id = []
|
|
|
|
|
|
for slug in categorie_slugs:
|
|
|
|
|
|
list_categories_id.append(self.category_manager.find_id_by_slug(slug))
|
|
|
|
|
|
self.product_manager.update_data_list_category_product(id, list_categories_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --------------------------------- Attributes ----------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_exist_attribute_in_one_product(self, product_id):
|
|
|
|
|
|
response = self.wcapi.get(f"products/{product_id}")
|
|
|
|
|
|
if response:
|
|
|
|
|
|
products_info = response.json()
|
|
|
|
|
|
if products_info['attributes']:
|
|
|
|
|
|
return True
|
2025-05-08 12:15:42 +02:00
|
|
|
|
else:
|
2025-05-24 10:34:04 +00:00
|
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def has_product_all_attributes(self, product_id, attributes):
|
|
|
|
|
|
#pprint.pprint(self.product_manager.get_all_attributes_as_list(product_id))
|
|
|
|
|
|
pprint.pprint(attributes)
|
|
|
|
|
|
list_attributes_by_api = self.product_manager.get_all_attributes_as_list(product_id)
|
|
|
|
|
|
pprint.pprint(list_attributes_by_api)
|
|
|
|
|
|
|
|
|
|
|
|
"""if list_attributes_by_api == []:
|
|
|
|
|
|
return True
|
|
|
|
|
|
# Comparaison directe
|
|
|
|
|
|
not_found_same_value = False
|
|
|
|
|
|
for i, d in enumerate(list_attributes_by_api):
|
|
|
|
|
|
for key, value in attributes.items():
|
|
|
|
|
|
for k, v in d.items():
|
|
|
|
|
|
if k == 'options':
|
|
|
|
|
|
if value in d[k]:
|
|
|
|
|
|
not_found_same_value = False
|
|
|
|
|
|
print(f"v = {v}, value = {value} d[k] = {d[k]}")
|
|
|
|
|
|
print("trouvé")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"v = {v}, value = {value} d[k] = {d[k]}")
|
|
|
|
|
|
print("pas trouvé")
|
|
|
|
|
|
#return True"""
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
"""common_keys = list_attributes_by_api.keys() & attributes.keys()
|
2025-05-18 21:19:20 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
found_same_value = False
|
|
|
|
|
|
for key in common_keys:
|
|
|
|
|
|
pprint.pprint(common_keys)
|
|
|
|
|
|
if list_attributes_by_api[key] != attributes[key]:
|
|
|
|
|
|
found_same_value = True
|
|
|
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
|
|
|
pass
|
2025-05-18 21:19:20 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
if found_same_value == True:
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False"""
|
2025-05-18 21:19:20 +02:00
|
|
|
|
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
|
|
|
|
|
def assign_attributes_to_product(self, product_id, attributes):
|
|
|
|
|
|
|
|
|
|
|
|
for attr in attributes:
|
|
|
|
|
|
for key, value in attributes.items():
|
|
|
|
|
|
#logger.debug(f"key = {key} - value = {value} - attr = {attr}")
|
|
|
|
|
|
if attr == key :
|
|
|
|
|
|
self.attribute_manager.create_for_product(product_id=product_id, name=key, value=value, variation=self.is_variable(key, value))
|
|
|
|
|
|
|
|
|
|
|
|
# --------------------------------- Product ----------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def has_product_all_datas(self, product_id, product_data):
|
|
|
|
|
|
|
|
|
|
|
|
list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id)
|
|
|
|
|
|
common_keys = list_product_data_by_api.keys() & product_data.keys()
|
|
|
|
|
|
|
|
|
|
|
|
# Étape 2 : comparaison des valeurs
|
|
|
|
|
|
found_same_value = False
|
|
|
|
|
|
for key in common_keys:
|
|
|
|
|
|
if list_product_data_by_api[key] != product_data[key]:
|
|
|
|
|
|
found_same_value = True
|
|
|
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
if found_same_value == True:
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def assign_product_changement(self, product_id, product_data):
|
|
|
|
|
|
list_product_data_by_api = self.product_manager.get_product_data_by_api(product_id)
|
|
|
|
|
|
common_keys = list_product_data_by_api.keys() & product_data.keys()
|
|
|
|
|
|
|
|
|
|
|
|
# Étape 2 : comparaison des valeurs
|
|
|
|
|
|
differences = {}
|
|
|
|
|
|
for key in common_keys:
|
|
|
|
|
|
if list_product_data_by_api[key] != product_data[key]:
|
|
|
|
|
|
differences[key] = product_data[key]
|
|
|
|
|
|
self.wcapi.put(f"products/{product_id}", differences)
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------- Tabs ------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
#def missing_tabs_for_product(self, product_id, tabs):
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def assign_tabs_to_product(self, product_id, tabs):
|
|
|
|
|
|
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_file_from_to(self, range_start, range_end=None):
|
|
|
|
|
|
|
|
|
|
|
|
medias = self.media_manager.get_all_as_slug_dict()
|
|
|
|
|
|
self.product_manager.medias = medias
|
|
|
|
|
|
#logger.info(f"self.fetch_all_product_rows(range_start, range_end): = {self.fetch_all_product_rows(range_start, range_end)}")
|
|
|
|
|
|
for product_line in self.fetch_all_product_rows(range_start, range_end):
|
|
|
|
|
|
|
|
|
|
|
|
# define expected settings
|
|
|
|
|
|
expected_product_data = self.get_product_data_as_dict(product_line)
|
|
|
|
|
|
expected_categories = self.get_list_category_for_product(product_line['Catégories'])
|
|
|
|
|
|
expected_medias = self.get_list_media_id_for_product(product_line)
|
|
|
|
|
|
expected_attributes = self.get_product_attributes_as_dict(product_line)
|
|
|
|
|
|
expected_tabs = self.get_product_tabs_as_dict(product_line)
|
|
|
|
|
|
|
|
|
|
|
|
# step1: make sure product exists
|
|
|
|
|
|
product_id = self.product_manager.find_id_by_slug(product_line['Slug'])
|
|
|
|
|
|
if product_id is None:
|
|
|
|
|
|
#product_id = self.create_product(product_data=expected_product_data, attributes=expected_attributes, tabs=expected_tabs, categories=expected_categories, medias=expected_medias, json_data=product_line)
|
|
|
|
|
|
product_id = self.product_manager.create_product(product_data=expected_product_data)
|
|
|
|
|
|
logger.info('produit créé')
|
|
|
|
|
|
|
|
|
|
|
|
# step2: make sure categories are assigned to product
|
|
|
|
|
|
if not self.has_product_all_categories(product_id=product_id, categories=expected_categories):
|
|
|
|
|
|
self.assign_categories_to_product(id=product_id, categorie_slugs=expected_categories)
|
|
|
|
|
|
logger.info('catégorie créée')
|
|
|
|
|
|
|
|
|
|
|
|
# step3: make sure attributes are assigned to product
|
|
|
|
|
|
#print(f"attr = {self.product_manager.is_exist_attribute_in_one_product(product_id)}")
|
|
|
|
|
|
# if self.product_manager.is_exist_attribute_in_one_product(product_id):
|
|
|
|
|
|
if self.has_product_all_attributes(product_id=product_id, attributes=expected_attributes):
|
|
|
|
|
|
#self.assign_attributes_to_product(product_id=product_id, attributes=expected_attributes)
|
|
|
|
|
|
logger.info('attributs créés')
|
|
|
|
|
|
|
|
|
|
|
|
# step4: make sure tabs are assigned to product
|
|
|
|
|
|
if not self.product_manager.is_exist_tabs_in_one_product(product_id):
|
|
|
|
|
|
self.assign_tabs_to_product(product_id=product_id, tabs=expected_tabs)
|
|
|
|
|
|
logger.info('onglets crées')
|
|
|
|
|
|
|
|
|
|
|
|
# step5: make sure media are assigned to product
|
|
|
|
|
|
if not self.has_product_all_medias(product_id=product_id, medias=expected_medias):
|
|
|
|
|
|
self.assign_medias_to_product(id=product_id, media_slugs=expected_medias)
|
|
|
|
|
|
logger.info('medias crées')
|
|
|
|
|
|
|
|
|
|
|
|
if not self.has_product_all_datas(product_id, expected_product_data):
|
|
|
|
|
|
logger.info('ici')
|
|
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
def delete_all_informations(self):
|
|
|
|
|
|
self.media_manager.delete_all_images()
|
|
|
|
|
|
self.attribute_manager.delete_all()
|
|
|
|
|
|
self.product_manager.delete_all_product()
|
|
|
|
|
|
self.category_manager.delete_all_category()
|
|
|
|
|
|
|
|
|
|
|
|
def delete_information_by_slug(self):
|
2025-05-08 12:15:42 +02:00
|
|
|
|
self.product_manager.delete_product_by_slug("chope-adoucissant")
|
2025-04-07 12:20:48 +02:00
|
|
|
|
#category_manager.delete_all_category()
|
2025-05-08 12:15:42 +02:00
|
|
|
|
|
|
|
|
|
|
class OrderManager:
|
|
|
|
|
|
def __init__(self, wcapi, ath):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self.wcapi = wcapi
|
|
|
|
|
|
self.ath = ath
|
|
|
|
|
|
self.error_log = []
|
|
|
|
|
|
self.headers = {
|
|
|
|
|
|
"Authorization": f"Basic {self.ath.auth_base64}",
|
2025-05-18 21:19:20 +02:00
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
|
|
"User-Agent": "Mozilla/5.0"
|
2025-05-08 12:15:42 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def delete_all_orders(self):
|
|
|
|
|
|
response = self.wcapi.get("orders/",params={"per_page": 100})
|
|
|
|
|
|
print(f"response = {response.status_code}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
orders = response.json()
|
|
|
|
|
|
for index, order in enumerate(orders):
|
|
|
|
|
|
self.wcapi.delete(f"orders/{order['id']}", params={"force": True}).json()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SeoManager(OdsReader):
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, ath, filename_ods):# filename_ods
|
|
|
|
|
|
super().__init__(filename_ods) # filename_ods
|
|
|
|
|
|
self.ath = ath
|
|
|
|
|
|
self.page_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/pages"
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_pages(self):
|
|
|
|
|
|
print("coucou")
|
|
|
|
|
|
"""Récupère toutes les images en gérant la pagination"""
|
|
|
|
|
|
all_pages = []
|
|
|
|
|
|
dict_id_slug = {}
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.get(f"seo", params={"per_page": 100})
|
2025-05-08 12:15:42 +02:00
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
list_pages = response.json()
|
|
|
|
|
|
if not list_pages:
|
|
|
|
|
|
pass
|
2025-05-18 21:19:20 +02:00
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
for index, page in enumerate(list_pages):
|
|
|
|
|
|
dict_id_slug[list_pages[index]['id']] = list_pages[index]['slug']
|
|
|
|
|
|
all_pages.append(dict_id_slug)
|
|
|
|
|
|
dict_id_slug = {}
|
|
|
|
|
|
return all_pages
|
|
|
|
|
|
|
|
|
|
|
|
def update_seo_page(self):
|
|
|
|
|
|
all_pages = self.get_all_pages()
|
|
|
|
|
|
pprint.pprint(all_pages)
|
|
|
|
|
|
seo_lines = self.get_all_seo_lines()
|
|
|
|
|
|
#pprint.pprint(seo_lines)
|
|
|
|
|
|
for page_id_slug in all_pages:
|
|
|
|
|
|
for key_page, slug_page in page_id_slug.items():
|
|
|
|
|
|
print(f"key_page = {key_page}")
|
|
|
|
|
|
for line in seo_lines:
|
|
|
|
|
|
#dict_seo = {}
|
|
|
|
|
|
if line['Slug'] == slug_page:
|
|
|
|
|
|
data = {
|
|
|
|
|
|
"meta": {
|
|
|
|
|
|
"og_title": line["Titre"],
|
|
|
|
|
|
"og_description": line["Description"],
|
|
|
|
|
|
#"_yoast_wpseo_opengraph-title": line["Titre"],
|
|
|
|
|
|
#"_yoast_wpseo_opengraph-description": line["Description"]
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-05-18 21:19:20 +02:00
|
|
|
|
response = self.wcapi.post(f"seo/{key_page}")
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
""""meta": {
|
|
|
|
|
|
"_yoast_wpseo_title": line["Titre"],
|
|
|
|
|
|
"_yoast_wpseo_metadesc": line["Description"],
|
|
|
|
|
|
"_yoast_wpseo_opengraph-title": line["Titre"],
|
|
|
|
|
|
"_yoast_wpseo_opengraph-description": line["Description"]
|
|
|
|
|
|
}"""
|
|
|
|
|
|
|
|
|
|
|
|
"""dict_seo['yoast_head_json']['description'] = line['Description']
|
|
|
|
|
|
dict_seo['yoast_head_json']['og_description'] = line['Description']
|
|
|
|
|
|
dict_seo['yoast_head_json']['og_title'] = line['Titre']
|
|
|
|
|
|
|
|
|
|
|
|
response = requests.post(
|
|
|
|
|
|
f"{self.page_api_url}/{page['id']}",
|
|
|
|
|
|
headers={
|
|
|
|
|
|
"Authorization": f"Basic {self.ath.auth_base64}",
|
|
|
|
|
|
#"Authorization": f"Basic {self.ath['auth_base64']}",
|
|
|
|
|
|
#"Content-Disposition": f"attachment; filename={image_name}"
|
|
|
|
|
|
},
|
|
|
|
|
|
json=dict_seo,
|
|
|
|
|
|
verify=False
|
|
|
|
|
|
)"""
|
|
|
|
|
|
|
|
|
|
|
|
#page['yoast_head_json']['description']
|
|
|
|
|
|
#page['yoast_head_json']['og_description']
|
|
|
|
|
|
#page['yoast_head_json']['og_title']
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
#ALL_TABS = ["Allergènes", "Conseils d’utilisation", "Description", "Précautions articles"]
|
|
|
|
|
|
#ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"]
|
2025-05-08 12:15:42 +02:00
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
media_manager = MediaManager(ath=ath, filename_ods=FILENAME_ODS)
|
2025-05-24 10:34:04 +00:00
|
|
|
|
|
2025-04-07 12:20:48 +02:00
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
utilisation
|
|
|
|
|
|
module argparse
|
|
|
|
|
|
# on va appeler ça importation d'un fichier ods, d'où l'action import-ods
|
|
|
|
|
|
# on va appeler cette commande, "la commande de base"
|
|
|
|
|
|
wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key=<consumer_key> --wc-secret=<consumer_secret> import-ods --ods-path=fichier.ods
|
|
|
|
|
|
|
|
|
|
|
|
# traitement de l'intégralité d'un fichier ods
|
|
|
|
|
|
... --all
|
|
|
|
|
|
|
|
|
|
|
|
# traitement des medias seulement, on peut en option spécifier une plage de média à importer
|
|
|
|
|
|
... --medias [--media-range=1:40]
|
|
|
|
|
|
|
|
|
|
|
|
plu tard ...
|
|
|
|
|
|
# traitement des catégories seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de la catégorie
|
|
|
|
|
|
... --categories [--categories-regex=<regex>]
|
|
|
|
|
|
ex: traiter uniquement les catégories dont le nom contient le terme "bougie"
|
|
|
|
|
|
... --categories [--categories-regex=.*bougie.*]
|
|
|
|
|
|
|
|
|
|
|
|
# traitement des articles seulement, on peut en option spécifier une expression régulière qui va s'appliquer au nom de l'article'
|
|
|
|
|
|
# ... --products [--products-regex=<regex>]
|
|
|
|
|
|
ex: traiter uniquement les articles dont le nom contient le terme "bougie"
|
|
|
|
|
|
... --categories [--products-regex=.*bougie.*]
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-05-08 12:15:42 +02:00
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#parser = argparse.ArgumentParser(description="Script de traitement WooCommerce")
|
|
|
|
|
|
|
|
|
|
|
|
#wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key=<consumer_key> --wc-secret=<consumer_secret> import-ods --ods-path=fichier.ods
|
|
|
|
|
|
|