Correction and create command terminal api woocommerce

This commit is contained in:
Bene
2025-04-24 17:27:31 +02:00
parent 8760966e4c
commit 832205450f
19 changed files with 9854 additions and 93 deletions

View File

@ -12,22 +12,50 @@ import unicodedata
import logging
import os
import time
import argparse
from logging.handlers import TimedRotatingFileHandler
from watermark import create_watermark_image
logger = logging.getLogger(__name__)
# Créer un dossier 'logs' s'il n'existe pas
log_directory = "logs"
os.makedirs(log_directory, exist_ok=True)
# 1 Configurer le logger
logging.basicConfig(
filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés
level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log
datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date
# 🔧 Configuration du handler avec rotation quotidienne
log_file = os.path.join(log_directory, "woocommerce.log")
handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # ⏰ Rotation tous les jours à minuit
interval=1, # 📅 Chaque 1 jour
backupCount=7, # ♻️ Garde les 7 derniers fichiers de log
encoding='utf-8' # 🧾 Pour supporter tous les caractères
)
# 📋 Format du log
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
# 🔌 Récupère le logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # 👁 Niveau minimum à capturer
logger.addHandler(handler)
# 🧪 Test
"""logger.debug("Démarrage du programme (DEBUG)")
logger.info("Traitement en cours (INFO)")
logger.warning("Avertissement (WARNING)")
logger.error("Erreur (ERROR)")
logger.critical("Erreur critique (CRITICAL)")"""
# via consumer key and consumer secret :
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
wcapi = WoocommerceApi(
url="https://lescreationsdemissbleue.local",
#url="https://lescreationsdemissbleue.local",
url="https://les-creations-de-missbleue.local",
consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
wp_api=True,
@ -48,10 +76,14 @@ class AuthentificationWpApi:
ath = AuthentificationWpApi()
WEBSITE_URL = "https://lescreationsdemissbleue.local"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods"
#WEBSITE_URL = "https://lescreationsdemissbleue.local"
WEBSITE_URL = "https://les-creations-de-missbleue.local"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods"
BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_corrige.ods"
class OdsReader:
def __init__(self, filename_ods=FILENAME_ODS):
@ -60,6 +92,12 @@ class OdsReader:
def get_all_product_lines(self):
return self.get_doc_ods(2)
def fetch_all_product_rows(self, start, end=None):
return self.extract_ods_row(2, start, end)
def get_product_line_by_value(self, search_value):
return self.get_doc_ods_by_value(2, search_value)
def get_product_by_slug_from_ods(self, slug):
for product in self.get_all_product_lines():
if product['Slug'] == slug: return product
@ -68,12 +106,27 @@ class OdsReader:
def get_all_media_lines(self):
return self.get_doc_ods(0)
def fetch_all_media_rows(self, start, end=None):
return self.extract_ods_row(0, start, end)
def get_media_line_by_value(self, search_value):
return self.get_doc_ods_by_value(0, search_value)
def get_all_attribute_and_tab_lines(self):
return self.get_doc_ods(3)
def get_attribute_and_tab_lines(self, search_value):
return self.get_doc_ods_by_value(3, search_value)
def get_all_category_lines(self):
return self.get_doc_ods(1)
def get_category_line_by_value(self, search_value):
return self.get_doc_ods_by_value(1, search_value)
def get_all_seo_lines(self):
return self.get_doc_ods(6)
def get_doc_ods(self, number_sheet):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
@ -88,21 +141,151 @@ class OdsReader:
json_data = df.to_dict(orient="records")
return json_data
def get_doc_ods_by_value(self, number_sheet, search_value=None):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
df = df.dropna(how='all')
if search_value:
try:
print(f"Recherche de la valeur : {search_value}")
# Vérifier que le DataFrame n'est pas vide
if df.empty:
raise ValueError("Le DataFrame est vide")
# Nettoyer le search_value pour enlever les espaces superflus
search_value = str(search_value).strip()
# Dynamique sur la colonne à rechercher
column_name = 'Nom' # à modifier selon la situation
if column_name not in df.columns:
raise ValueError(f"La colonne '{column_name}' n'existe pas dans le DataFrame")
# Supprimer les espaces avant et après dans la colonne cible
df[column_name] = df[column_name].str.strip()
# Remplir les NaN par des chaînes vides
df[column_name] = df[column_name].fillna('')
# Recherche avec contains sur la colonne
mask = df[column_name].str.contains(str(search_value), case=False, na=False)
#print(f"Masque généré :\n{mask}")
if mask.sum() == 0: # Si aucune ligne ne correspond
raise ValueError(f"Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'")
# Filtrage du DataFrame
df = df[mask]
#print(f"df après filtrage :\n{df}")
except ValueError as ve:
print(f"Erreur : {ve}")
logger.exception(f"🚫 Aucune correspondance trouvée pour '{search_value}' dans la colonne '{column_name}'")
except Exception as e:
print(f"Erreur lors de la recherche : {e}")
logger.exception(f"🚫 Erreur lors de la recherche de '{search_value}' dans la colonne '{column_name}'. Exception : {e}")
else:
print("Aucun search_value fourni")
# Convertir en json_data pour le retour
json_data = df.to_dict(orient="records")
return json_data
def extract_ods_row(self, number_sheet, start_row=None, end_row=None):
doc = ezodf.opendoc(self.filename_ods)
sheet = doc.sheets[number_sheet]
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
if start_row is not None and end_row is not None:
df = df.iloc[start_row:end_row]
elif start_row is not None:
df = df.iloc[start_row:]
elif end_row is not None:
df = df.iloc[:end_row]
df = df.dropna(how='all')
return df.to_dict(orient="records")
class MediaManager(OdsReader):
def __init__(self, ath):
super().__init__()
def __init__(self, ath, filename_ods):# filename_ods
super().__init__(filename_ods) # filename_ods
self.ath = ath
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings"
def upload_media(self):
json_data = self.get_all_media_lines()
def upload_media(self, search_value=None):
if search_value:
json_data = self.get_media_line_by_value(search_value)
else:
json_data = self.get_all_media_lines()
for media in json_data:
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
try:
if not self.is_exists(media, image_name):
image_path = BASE_PATH + media['Chemin']
# 👇 Tentative d'ouverture et d'envoi
with open(image_path, "rb") as image_file:
response = requests.post(
self.media_api_url,
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
files={"file": image_file},
verify=False
)
if response.status_code == 201:
media_data = response.json()
self.update_data_media(media, media_data['id'])
logger.info(f"✅ Image uploadée : {image_name}")
else:
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.media_api_url}")
else:
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
except requests.RequestException as e:
logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}")
except Exception as e:
logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}")
def create_and_update_media(self, media, image_name, path, watermark=False):
try:
if not self.is_exists(media, image_name):
with open(BASE_PATH + media['Chemin'], "rb") as image_file:
if watermark:
image_path = path
else:
image_path = BASE_PATH + media['Chemin']
print(f"image_path = {image_path}")
# 👇 Tentative d'ouverture et d'envoi
with open(image_path, "rb") as image_file:
response = requests.post(
self.media_api_url,
headers={
@ -115,10 +298,34 @@ class MediaManager(OdsReader):
if response.status_code == 201:
media_data = response.json()
self.update_data_media(media, media_data['id'])
logger.info(f"✅ Image uploadée : {image_name}")
else:
return None
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.media_api_url}")
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
except requests.RequestException as e:
logger.exception(f"🔌 Problème réseau/API lors de l'upload de {image_name} : {e}")
except Exception as e:
logger.exception(f"🔥 Erreur inattendue lors de l'upload de {image_name} : {e}")
def upload_media_from_to(self, range_start, range_end=None):
json_data = self.fetch_all_media_rows(range_start, range_end)
for media in json_data:
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
first_folder = media['Chemin'].split("\\")[0]
print(f"first_folder = {first_folder}")
watermarked_path = Path(create_watermark_image(str(path)))
watermarked_name = watermarked_path.name
if first_folder == 'Logo':
self.create_and_update_media(media,image_name,path)
else:
pass
self.create_and_update_media(media, watermarked_name, watermarked_path, True)
def is_exists(self, media, image_name):
all_images = self.get_all_images()
@ -139,10 +346,13 @@ class MediaManager(OdsReader):
}
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
response = requests.post(
f"{self.media_api_url}/{id_img}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
#"Authorization": f"Basic {self.ath['auth_base64']}",
"Content-Disposition": f"attachment; filename={image_name}"
},
json=update_data,
@ -173,6 +383,7 @@ class MediaManager(OdsReader):
if img['slug'] == slug:
delete_url = f"{self.media_api_url}/{img['id']}?force=true"
response = requests.delete(delete_url,
#headers={"Authorization": f"Basic {self.ath['auth_base64']}"},
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
@ -183,6 +394,7 @@ class MediaManager(OdsReader):
while True:
response = requests.get(f"{self.media_api_url}?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#headers={"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False
)
if response.status_code != 200:
@ -205,6 +417,7 @@ class MediaManager(OdsReader):
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#{"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
@ -219,6 +432,7 @@ class MediaManager(OdsReader):
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
@ -248,8 +462,8 @@ class MediaManager(OdsReader):
class CategoryManager(OdsReader):
def __init__(self, wcapi, ath, medias=None):
super().__init__()
def __init__(self, wcapi, ath, filename_ods, medias=None):
super().__init__(filename_ods)
self.wcapi = wcapi
self.ath = ath
self.medias = medias
@ -275,9 +489,17 @@ class CategoryManager(OdsReader):
"slug":slug
}
if self.find_id_by_slug(slug):
self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà")
#self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà")
logger.debug(f"Catégorie contenant comme slug '{slug}' existe déjà")
else:
self.wcapi.post("products/categories/", category_data)
try:
response = self.wcapi.post("products/categories/", category_data)
if response.status_code == 201:
logger.info(f"Catégorie créé avec succès. ID: {response.json()['id']}")
else:
logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}")
except Exception as e:
logger.error(f"Erreur inattendue lors de l'envoi de la catégorie à WooCommerce: {e}")
def assign_parent_category(self, parent_slug, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
@ -298,6 +520,8 @@ class CategoryManager(OdsReader):
return cat['id']
def find_media_id_by_slug(self, media_slug):
#print(f"media_slug = {media_slug}")
#pprint.pprint(self.medias.items())
for id, slug in self.medias.items():
if media_slug == slug:
return id
@ -312,8 +536,13 @@ class CategoryManager(OdsReader):
}
self.wcapi.put(f"products/categories/{cat_id}", update_category_data)
def update_data_categories(self):
json_data = self.get_all_category_lines()
def update_data_categories(self, search_value=None):
if search_value:
print("la")
json_data = self.get_category_line_by_value(search_value)
else:
print("oula")
json_data = self.get_all_category_lines()
for category in json_data:
self.create_category(category['Nom'], category['Description'], category['Slug'])
cat_id = self.find_id_by_slug(category['Slug'])
@ -346,8 +575,8 @@ class CategoryManager(OdsReader):
return print(f"self.error_log = {self.error_log}")
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, medias=None):
super().__init__()
def __init__(self, wcapi, ath, filename_ods, medias=None):
super().__init__(filename_ods)
self.wcapi = wcapi
self.ath = ath
self.medias = medias
@ -431,12 +660,22 @@ class ProductManager(OdsReader):
else:
print(f"error")
def create_product(self, product_data):
def create_product(self, product_data):
if self.find_id_by_slug(product_data['slug']):
self.error_log.append(f"Produit contenant comme slug '{product_data['slug']}' existe déjà")
#self.error_log.append(f"Produit contenant comme slug '{product_data['slug']}' existe déjà")
logger.debug(f"Produit contenant comme slug '{product_data['slug']}' existe déjà")
else:
response = self.wcapi.post("products/", product_data)
try:
response = self.wcapi.post("products/", product_data)
if response.status_code == 201:
# Le produit a été créé avec succès
logger.info(f"Produit créé avec succès. ID: {response.json()['id']}")
else:
# Le produit n'a pas été créé, mais il y a une réponse avec un code d'erreur
logger.error(f"Erreur lors de la création du produit. Code: {response.status_code}, Message: {response.text}")
except Exception as e:
logger.error(f"Erreur inattendue lors de l'envoi du produit à WooCommerce: {e}")
def update_data_product(self, product_data, categories, medias):
json_data = self.get_all_product_lines()
for product in json_data:
@ -526,8 +765,8 @@ class ProductManager(OdsReader):
class AttributeManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
def __init__(self, wcapi, filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
def get_attributes(self):
@ -550,8 +789,11 @@ class AttributeManager(OdsReader):
list_name_data.append(item['Nom'])
return list_name_data
def create(self):
features_json_data = self.get_all_attribute_and_tab_lines()
def create(self, search_value=None):
if search_value:
features_json_data = self.get_attribute_and_tab_lines(search_value)
else:
features_json_data = self.get_all_attribute_and_tab_lines()
for item in features_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
attribute_data = {
@ -559,11 +801,14 @@ class AttributeManager(OdsReader):
}
self.wcapi.post(f"products/attributes", attribute_data)
def get_term(self):
def get_term(self, search_value=None):
term_dict = {}
list_item = []
term_json_data = self.get_all_attribute_and_tab_lines()
if search_value:
term_json_data = self.get_attribute_and_tab_lines(search_value)
else:
term_json_data = self.get_all_attribute_and_tab_lines()
for item in term_json_data:
list_item = []
if item['Onglet'].strip() == "Informations Complémentaires":
if "," in item["Valeurs"]:
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
@ -573,6 +818,7 @@ class AttributeManager(OdsReader):
term_dict[item['Nom']] = list_item
else:
term_dict[item['Nom']] = item['Valeurs']
return term_dict
def configure_term(self):
@ -677,12 +923,15 @@ class AttributeManager(OdsReader):
class TabManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
def __init__(self, wcapi,filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
def get_list_name_data(self):
def get_list_name_data(self, search_value=None):
list_name_data = []
"""if search_value:
json_data = self.get_attribute_and_tab_lines(search_value)
else:"""
json_data = self.get_all_attribute_and_tab_lines()
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
@ -731,7 +980,7 @@ class TabManager(OdsReader):
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
print('else')
#print('else')
data_tab = {
'content':content,
}
@ -771,8 +1020,8 @@ class TabManager(OdsReader):
class VariationsManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
def __init__(self, wcapi, filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
def get_attribute_id(self, product_data):
@ -880,8 +1129,8 @@ class VariationsManager(OdsReader):
return False
class WooCommerceManager(OdsReader):
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager):
super().__init__()
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager, filename_ods):
super().__init__(filename_ods)
self.wcapi = wcapi
self.media_manager = media_manager
self.category_manager = category_manager
@ -889,6 +1138,7 @@ class WooCommerceManager(OdsReader):
self.tab_manager = tab_manager
self.attribute_manager = attribute_manager
self.variation_manager = variation_manager
self.filename_ods = filename_ods
def tab_exists(self, product_id, name_tab):
return self.product_manager.tab_exists(product_id, name_tab)
@ -923,7 +1173,7 @@ class WooCommerceManager(OdsReader):
for title, content in value:
if key:
if key in product['short_description']:
tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
self.tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
else:
pass
@ -999,20 +1249,83 @@ class WooCommerceManager(OdsReader):
def create_or_update_product(self, product_data, attributes, tabs, categories, medias):
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias)
self.update_product_attribute(attributes=attributes, product_data=product_data)
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
self.update_product_variations(product_data)
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
try:
self.product_manager.update_data_product(product_data=product_data, categories=categories, medias=medias)
self.update_product_attribute(attributes=attributes, product_data=product_data)
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
self.update_product_variations(product_data)
self.tab_manager.create_or_update_for_product(product_id=product_id, tabs=tabs)
except Exception as e:
print(f"Erreur lors de la mise à jour du produit: {e}")
logger.exception(f"Erreur lors de la mise à jour du produit: {e}")
def get_product_lines(self, search_value=None):
if search_value:
print('')
return self.get_product_line_by_value(search_value)
else:
return self.get_all_product_lines()
def process_file(self, filename):
def process_file(self, search_value=None):
# refresh media cache
medias = media_manager.get_all_as_slug_dict()
medias = self.media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
# read provided file
products_lines = self.get_product_lines(search_value)
print('yoooo')
#pprint.pprint(products_lines)
for product_line in products_lines:
# standard product data
product_data = {
'name' : product_line['Nom'],
'price': product_line['Prix'],
'regular_price': product_line['Prix'],
'stock_quantity': product_line['Stock'],
'manage_stock':True,
'weight':str(product_line['Poids']),
'sku':str(product_line['Numéro de référence']),
'description': product_line['Description'],
'short_description': product_line['Courte Description'],
'slug':product_line['Slug']
}
if product_line['Type'] == "parfums":
product_data['type'] = "variable"
else:
product_data['type'] = "simple"
attributes = {
"Temps de combustion" : product_line['Temps de combustion'],
"Type de cire" : product_line['Type de cire'],
"Mèche" : product_line['Mèche'],
"Fabrication" : product_line['Fabrication'],
"Composition" : product_line['Composition'],
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
"Parfums" : product_line['Parfums']
}
tabs ={
#"Description" : product_line["Description"],
"Conseils d'utilisation" : product_line["Conseils dutilisation"],
"Précautions articles" : product_line["Précautions articles"],
#"Allergènes" : product_line["Allergènes"]
}
# ... associated categories
categories = self.get_list_category_for_product(product_line['Catégories'])
# ... associated medias
medias = self.get_list_media_id_for_product(product_line['Media Slugs'])
# create or update product
self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias)
def process_file_from_to(self, range_start, range_end=None):
# refresh media cache
medias = self.media_manager.get_all_as_slug_dict()
self.product_manager.medias = medias
# read provided file
reader = OdsReader(filename)
for product_line in reader.get_all_product_lines():
#reader = OdsReader(filename)
for product_line in self.fetch_all_product_rows(range_start, range_end):
# standard product data
product_data = {
'name' : product_line['Nom'],
@ -1074,48 +1387,174 @@ class WooCommerceManager(OdsReader):
self.category_manager.delete_all_category()
def delete_information_by_slug(self):
product_manager.delete_product_by_slug("chope-adoucissant")
self.product_manager.delete_product_by_slug("chope-adoucissant")
#category_manager.delete_all_category()
class OrderManager:
def __init__(self, wcapi, ath):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def delete_all_orders(self):
response = self.wcapi.get("orders/",params={"per_page": 100})
print(f"response = {response.status_code}")
if response.status_code == 200:
orders = response.json()
for index, order in enumerate(orders):
#print(f"index = {index}")
#print(f"order = {order}")
self.wcapi.delete(f"orders/{order['id']}", params={"force": True}).json()
"""def find_order_id_by_slug(self, slug):
response = self.wcapi.get("orders/",params={"per_page": 100})
if response.status_code == 200:
orders = response.json()
for cat in categories:
if cat['slug'] == slug:
return cat['id']"""
class SeoManager(OdsReader):
def __init__(self, ath, filename_ods):# filename_ods
super().__init__(filename_ods) # filename_ods
self.ath = ath
self.page_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/pages"
def get_all_pages(self):
print("coucou")
"""Récupère toutes les images en gérant la pagination"""
all_pages = []
dict_id_slug = {}
#while True:
response = requests.get(f"{self.page_api_url}?per_page=100",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#headers={"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False
)
if response.status_code != 200:
pass
list_pages = response.json()
#pprint.pprint(page)
#print(page[0]['_links'])
#print(page[0]['slug'])
print(f"count = {len(list_pages)}")
if not list_pages:
pass
#print('_______')
#pprint.pprint(page)
for index, page in enumerate(list_pages):
dict_id_slug[list_pages[index]['id']] = list_pages[index]['slug']
all_pages.append(dict_id_slug)
dict_id_slug = {}
return all_pages
def update_seo_page(self):
all_pages = self.get_all_pages()
pprint.pprint(all_pages)
seo_lines = self.get_all_seo_lines()
#pprint.pprint(seo_lines)
for page_id_slug in all_pages:
for key_page, slug_page in page_id_slug.items():
print(f"key_page = {key_page}")
for line in seo_lines:
#dict_seo = {}
if line['Slug'] == slug_page:
data = {
"meta": {
"og_title": line["Titre"],
"og_description": line["Description"],
#"_yoast_wpseo_opengraph-title": line["Titre"],
#"_yoast_wpseo_opengraph-description": line["Description"]
}
}
response = requests.post(
f"{self.page_api_url}/{key_page}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
},
json=data,
verify=False
)
""""meta": {
"_yoast_wpseo_title": line["Titre"],
"_yoast_wpseo_metadesc": line["Description"],
"_yoast_wpseo_opengraph-title": line["Titre"],
"_yoast_wpseo_opengraph-description": line["Description"]
}"""
"""dict_seo['yoast_head_json']['description'] = line['Description']
dict_seo['yoast_head_json']['og_description'] = line['Description']
dict_seo['yoast_head_json']['og_title'] = line['Titre']
response = requests.post(
f"{self.page_api_url}/{page['id']}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
#"Authorization": f"Basic {self.ath['auth_base64']}",
#"Content-Disposition": f"attachment; filename={image_name}"
},
json=dict_seo,
verify=False
)"""
#page['yoast_head_json']['description']
#page['yoast_head_json']['og_description']
#page['yoast_head_json']['og_title']
#ALL_TABS = ["Allergènes", "Conseils dutilisation", "Description", "Précautions articles"]
#ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"]
media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
#media_manager.delete_all_images()
#media_manager.assign_image_logo()
category_manager = CategoryManager(wcapi=wcapi,ath=ath)
#category_manager.delete_all_category()
product_manager = ProductManager(wcapi=wcapi,ath=ath)
#product_manager.delete_all_product()
#medias=media_manager.get_all_as_slug_dict()
#media_manager.delete_media_by_slug('pyramide-olfactive-frangipanier')
#product_manager.delete_product_by_slug("citron-meringue")
#product_manager.update_data_product()
tab_manager = TabManager(wcapi=wcapi)
attribute_manager = AttributeManager(wcapi=wcapi)
variation_manager = VariationsManager(wcapi=wcapi)
#attribute_manager.create(ALL_ATTRIBUTES)
#attribute_manager.create()
#attribute_manager.configure_term()
#attribute_manager.delete_all_term()
#product_id = product_manager.find_id_by_slug("citron-meringue")"""
woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager, variation_manager=variation_manager)
##woocommerce_manager.delete_all_informations() #
woocommerce_manager.create_all_informations()
##woocommerce_manager.process_file(FILENAME_ODS)
#category_manager.update_data_categories()
#woocommerce_manager.delete_all_informations()
#woocommerce_manager.delete_information_by_slug()
#woocommerce_manager.create_all_informations()
#woocommerce_manager.create_all_categories_and_products()
#woocommerce_manager.update_product_tab()
#woocommerce_manager.tab_manager.delete_by_product_id(1890)
#woocommerce_manager.tab_manager.delete_all()
#woocommerce_manager.update_product()
#woocommerce_manager.attribute_manager.delete_all_for_product()
#woocommerce_manager.update_product_attribute_by_slug('citron-meringue')
#woocommerce_manager.attribute_manager.delete_all_for_product()
if __name__ == "__main__":
#seo_manager = SeoManager(ath=ath, filename_ods=FILENAME_ODS)
#pages = seo_manager.get_all_pages()
#seo_manager.update_seo_page()
#media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
#media_manager.delete_all_images()
#media_manager.assign_image_logo()
#category_manager = CategoryManager(wcapi=wcapi,ath=ath)
#category_manager.delete_all_category()
order_manager = OrderManager(wcapi=wcapi,ath=ath)
order_manager.delete_all_orders()
#product_manager = ProductManager(wcapi=wcapi,ath=ath)
#product_manager.delete_all_product()
#medias=media_manager.get_all_as_slug_dict()
#media_manager.delete_media_by_slug('pyramide-olfactive-frangipanier')
#product_manager.delete_product_by_slug("citron-meringue")
#product_manager.update_data_product()
#tab_manager = TabManager(wcapi=wcapi)
#attribute_manager = AttributeManager(wcapi=wcapi)
#variation_manager = VariationsManager(wcapi=wcapi)
#attribute_manager.create(ALL_ATTRIBUTES)
#attribute_manager.create()
#attribute_manager.configure_term()
#attribute_manager.delete_all_term()
#product_id = product_manager.find_id_by_slug("citron-meringue")"""
#woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager, variation_manager=variation_manager)
##woocommerce_manager.delete_all_informations() #
#woocommerce_manager.create_all_informations()
##woocommerce_manager.process_file(FILENAME_ODS)
#category_manager.update_data_categories()
#woocommerce_manager.delete_all_informations()
#woocommerce_manager.delete_information_by_slug()
#woocommerce_manager.create_all_informations()
#woocommerce_manager.create_all_categories_and_products()
#woocommerce_manager.update_product_tab()
#woocommerce_manager.tab_manager.delete_by_product_id(1890)
#woocommerce_manager.tab_manager.delete_all()
#woocommerce_manager.update_product()
#woocommerce_manager.attribute_manager.delete_all_for_product()
#woocommerce_manager.update_product_attribute_by_slug('citron-meringue')
#woocommerce_manager.attribute_manager.delete_all_for_product()
"""tabs_in_product = []
for tab in ALL_TABS:
@ -1147,4 +1586,10 @@ ex: traiter uniquement les articles dont le nom contient le terme "bougie"
... --categories [--products-regex=.*bougie.*]
"""
"""
#parser = argparse.ArgumentParser(description="Script de traitement WooCommerce")
#wcctl --wc-url=https://lescreationsdemissbleue.local --wc-key=<consumer_key> --wc-secret=<consumer_secret> import-ods --ods-path=fichier.ods