Files
missbleue/api_2.py
2025-04-07 12:20:48 +02:00

1282 lines
51 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from woocommerce import API as WoocommerceApi
from pathlib import Path
import pandas as pd
import ezodf
import requests
import pprint
import base64
import time
import json
import pyexcel_ods3
import unicodedata
import logging
logger = logging.getLogger(__name__)
# 1⃣ Configurer le logger
logging.basicConfig(
filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés
level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log
datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date
)
# via consumer key and consumer secret :
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
wcapi = WoocommerceApi(
url="https://lescreationsdemissbleue.local",
consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
wp_api=True,
version="wc/v3",
verify_ssl=False # Désactive la vérification SSL pour le développement
)
class AuthentificationWpApi:
# Identifiants WordPress (et non WooCommerce)
wordpress_username = "admin_lcdm" # Remplace par ton username WordPress
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
# Générer l'authentification Basic en base64
auth_str = f"{wordpress_username}:{wordpress_application_password}"
auth_bytes = auth_str.encode("utf-8")
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
ath = AuthentificationWpApi()
WEBSITE_URL = "https://lescreationsdemissbleue.local"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue.ods"
BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\Photos_Claudine\\Photos_bougies_Claudine\\"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
class OdsReader:
def __init__(self):
self.filename_ods = FILENAME_ODS
def get_doc_ods(self, number_sheet):
# ezodf.config.set_table_expand_strategy('all')
doc = ezodf.opendoc(self.filename_ods)
#pprint.pprint(doc)
#print(f"type = {type(doc.sheets)}")
#print(f"doc.sheets = {doc.sheets}")
sheets_list = list(doc.sheets)
# 🔹 Vérification : Afficher toutes les feuilles disponibles
print(f"Feuilles disponibles : {[sheet.name for sheet in sheets_list]}")
sheet = doc.sheets[number_sheet]
print(f"Feuilles disponibles : {[sheet.name for sheet in doc.sheets]}") # Debug
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
df = df.dropna(how='all')
json_data = df.to_dict(orient="records")
return json_data
class MediaManager(OdsReader):
def __init__(self, ath):
super().__init__()
self.ath = ath
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
#[cat.strip().replace('"', '') for cat in product['Categorie'].split("/")]
def upload_media(self):
json_data = self.get_doc_ods(0)
for media in json_data:
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
#print(f'path = {path}')
with open(BASE_PATH + media['Chemin'], "rb") as image_file:
response = requests.post(
self.media_api_url,
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
files={"file": image_file},
verify=False
)
#print(f'response.status_code = {response.status_code}, response_text = {response.text}')
if response.status_code == 201:
media_data = response.json()
#print(f"media_data['slug'] = {media_data['slug']}")
self.update_data_media(media, media_data['id'])
else:
return None
def update_data_media(self, media, id_img):
update_data = {
"title" : media['Nom'],
"alt_text": media['Description'],
"slug": media['Slug'],
}
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
response = requests.post(
f"{self.media_api_url}/{id_img}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
json=update_data,
verify=False
)
if response.status_code == 200:
return response.json()
else:
return None
def find_id_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
return img['id']
def get_all_as_slug_dict(self):
all_slug_dict = {}
images = self.get_all_images()
for img in images:
all_slug_dict[img['id']] = img['slug']
return all_slug_dict
def delete_media_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
delete_url = f"{self.media_api_url}/{img['id']}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
def get_all_images(self):
"""Récupère toutes les images en gérant la pagination"""
all_images = []
page = 1
while True:
response = requests.get(f"{self.media_api_url}?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
if response.status_code != 200:
break
images = response.json()
if not images:
break
all_images.extend(images)
page += 1
return all_images
def delete_images(self, images):
"""Supprime toutes les images récupérées"""
for img in images:
img_id = img['id']
delete_url = f"{self.media_api_url}/{img_id}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
"""category_manager = CategoryManager(api=api, medias=media_manager.get_all_as_slug_dict())
category_manager.create(name, short_description, description, )"""
class CategoryManager(OdsReader):
def __init__(self, wcapi, ath, medias):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
#print(f"response.status_code = {response.status_code}")
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == slug:
#pprint.pprint(cat)
return cat['id']
def create_category(self, name, description, slug):
category_data = {
"name": name,
"description": description,
"slug":slug
}
if self.find_id_by_slug(slug):
self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà")
else:
self.wcapi.post("products/categories/", category_data)
def assign_parent_category(self, parent_slug, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
parent_id = self.find_id_by_parent_slug(parent_slug)
if parent_id:
if cat['slug'] == slug:
self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id})
def find_id_by_parent_slug(self, parent_slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == parent_slug:
return cat['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def update_media_id_for_category(self, media_id, cat_id):
update_category_data = {
"image" : {'id':media_id},
}
self.wcapi.put(f"products/categories/{cat_id}", update_category_data)
def update_data_categories(self):
json_data = self.get_doc_ods(1)
for category in json_data:
self.create_category(category['Nom'], category['Description'], category['Slug'])
cat_id = self.find_id_by_slug(category['Slug'])
media_id = self.find_media_id_by_slug(category['Media Slug'])
self.assign_parent_category(category['Parent Slug'], category['Slug'])
self.update_media_id_for_category(media_id,cat_id)
def delete_all_category(self):
response = self.wcapi.get(f"products/categories",params={"per_page": 100})
for cat in response.json():
self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True})
def delete_media_category(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
def delete_category_by_id(self, category_id):
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def delete_category_by_slug(self, slug):
category_id = self.find_id_by_slug(slug)
#print(f"category_id = {category_id}")
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def get_errors(self):
return print(f"self.error_log = {self.error_log}")
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, medias):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
product_data = {
'categories':list_category_id,
'images':list_img_id,
}
print(f"product_id = {product_id}")
#print(f"product_data = {product_data}")
self.wcapi.put(f"products/{product_id}", product_data)
#self.wcapi.get(f"products/{product_id}")
def get_list_media_id_for_product(self, product):
media_id = {}
list_media_id_for_product = []
list_media_by_doc = [img.strip().replace(' ', '') for img in product.split(",")]
#pprint.pprint(self.uploaded_images_pro)
#print(f'list_media_by_doc = {list_media_by_doc}')
for id, media_slug in self.medias.items():
for media in list_media_by_doc:
if media == media_slug:
image_id = {'id':id}
list_media_id_for_product.append(image_id)
#print(f"list_image_id_for_product = {list_media_id_for_product}")
return list_media_id_for_product[::-1]
def get_list_category_for_product(self, product):
response = self.wcapi.get("products/categories",params={"per_page": 100})
category_list_by_doc = [cat.strip().replace('"', '') for cat in product.split("/")]
list_category_for_product = []
for category in response.json():
for cat in category_list_by_doc:
#print(f"category['name'] = {category['name']}")
#print(f"cat = {cat}")
if category['name'] == cat:
id_category = {'id':category['id']}
list_category_for_product.append(id_category)
#print(f'list_category_for_product = {list_category_for_product}')
return list_category_for_product
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/",params={"per_page": 100})
if response.status_code == 200:
products = response.json()
for pro in products:
if pro['slug'] == slug:
return pro['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def create_tabs_from_custom_dict(self, product_id, product):
product_tabs_data = {}
list_product_tabs_data = []
x = 1
for key in product.keys():
#key = key.replace("", "'")
if key == "Conseils dutilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes":
product_tabs_data['title'] = key
product_tabs_data['content'] = product[key]
product_tabs_data['nickname'] = ''
product_tabs_data['position'] = x
product_tabs_data['tab_type'] = 'local'
list_product_tabs_data.append(product_tabs_data)
product_tabs_data = {}
x += 1
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
#product_response = response.json()
meta_data = []
meta_data.append(
{'key': 'wb_custom_tabs', 'value': list_product_tabs_data}
)
meta_data_data = {
'meta_data': meta_data
}
#pprint.pprint(meta_data_data)
res = self.wcapi.post(f"products/{product_id}", meta_data_data)
else:
print(f"error")
def create_product(self, product):
product_data = {
'name' : product['Nom'],
'price': product['Prix'],
'regular_price': product['Prix'],
'stock_quantity': 1,
'manage_stock':True,
#'description': product['Description'],
'short_description': product['Courte Description'],
}
#print(f"product['Prix'] = {product['Prix']}")
if self.find_id_by_slug(product['Slug']):
self.error_log.append(f"Produit contenant comme slug '{product['Slug']}' existe déjà")
else:
self.wcapi.post("products/", product_data)
"""if response.status_code == 201:
product = response.json()
return product["id"]
else:
return None"""
def update_data_product(self):
json_data = self.get_doc_ods(2)
for product in json_data:
self.create_product(product)
product_id = self.find_id_by_slug(product['Slug'])
list_category_id = self.get_list_category_for_product(product['Catégories'])
list_img_id = self.get_list_media_id_for_product(product['Media Slugs'])
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
#self.create_tabs_from_custom_dict(product_id, product)
def get_all_products(self):
"""Récupère tous les produits en gérant la pagination"""
all_products = []
page = 1
while True:
response = self.wcapi.get("products", params={"per_page": 100, "page": page})
if response.status_code != 200:
print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}")
break
products = response.json()
if not products: # Si la page est vide, on arrête la boucle
break
all_products.extend(products)
page += 1 # On passe à la page suivante
#pprint.pprint(all_products)
return all_products
def delete_product(self):
json_data = self.get_doc_ods(2)
for product in json_data:
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
#print(f"product['Nom'] = {product['Nom']}")
#print(f"pro['name'] = {pro['name']}")
if product['Nom'] == pro['name']:
self.wcapi.delete(f"products/{pro['id']}")
"""def delete_all_product(self):
response = self.wcapi.get(f"products/",params={"per_page": 100})
for pro in response.json():
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})"""
def delete_all_product(self):
products = self.get_all_products()
for pro in products:
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})
def delete_media_product(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
def delete_product_by_id(self, product_id):
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def delete_product_by_slug(self, slug):
product_id = self.find_id_by_slug(slug)
#print(f"product_id = {product_id}")
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def normalize_string(text):
return unicodedata.normalize("NFKC", text).strip().lower()
def tab_exists(self, product_id, name_tab):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
response_json = self.wcapi.get(f"products/{product_id}").json()
#pprint.pprint(response_json)
#print(f"response_json['meta_data']= {response_json['meta_data']}")
for meta_data in response_json['meta_data']:
#pprint.pprint(meta_data)
for key_meta_data, value_meta_data in meta_data.items():
#pprint.pprint(response_json['meta_data'])
#print(f"tùype meta_data = {type(response_json['meta_data'][0])}")
#print(f"value = {value}")
#print('ici')
#print(type(value_meta_data))
#print(f"key_meta_data = {key_meta_data}, value_meta_data = {value_meta_data}")
if key_meta_data == "value":
#print('loool')
if isinstance(value_meta_data, list):
for tab in value_meta_data:
# Normalisation des chaînes pour éviter les erreurs subtiles
"""clean_name_tab = name_tab.strip().lower()
clean_tab_title = tab['title'].strip().lower()
print(f"Comparing: '{clean_name_tab}' == '{clean_tab_title}'") # Débugging
if clean_name_tab == clean_tab_title:
print("✅ Correspondance trouvée !")
else:
print("❌ Pas de correspondance")"""
#print(f"name_tab = {name_tab} - tab = {tab}")
if name_tab == tab['title']:
return True
return False
class AttributeManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_attributes(self):
attributes = self.wcapi.get(f"products/attributes").json()
#pprint.pprint(attributes)
one_attribute = self.wcapi.get(f"products/attributes/1/terms").json()
#print('_____')
#pprint.pprint(one_attribute)
return attributes
def get_by_name(self, name):
attributes = self.wcapi.get(f"products/attributes").json()
for attr in attributes:
if attr['name'] == name:
attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json()
#print(f"attribute = {attribute}")
return attribute
def get_list_name_data(self):
list_name_data = []
json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
"""def create(self, list_name_attributes):
for name_attribute in list_name_attributes:
attribute_data = {
'name' : name_attribute
}
self.wcapi.post(f"products/attributes", attribute_data)
"""
def create(self):
features_json_data = self.get_doc_ods(3)
for item in features_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
print(f"nom = {item['Nom']}")
attribute_data = {
'name' : item["Nom"]
}
self.wcapi.post(f"products/attributes", attribute_data)
def get_term(self):
term_dict = {}
list_item = []
term_json_data = self.get_doc_ods(3)
for item in term_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
print(f"item['Valeurs'] = {item['Valeurs']}")
if "," in item["Valeurs"]:
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
else:
item['Valeurs'].strip()
if list_item:
term_dict[item['Nom']] = list_item
else:
term_dict[item['Nom']] = item['Valeurs']
return term_dict
def configure_term(self):
term_dict = self.get_term()
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
for name, value in term_dict.items():
print(f"term_dict = {term_dict}")
if attribute['name'] == name:
if isinstance(value, list):
for v in value:
term = {
'name' : v
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
else:
term = {
'name' : value
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
def create_for_product(self, product_id, name, value):
data_attribute = {
'name': name,
'options':value
}
print(f"ici = {name} et {value}")
#list_product_tabs_data.append(data_tab)
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_attributes_data = product_meta_data.get("attributes", [])
pprint.pprint(existing_attributes_data)
already_exist = False
for data in existing_attributes_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['name'] == name:
already_exist = True
if already_exist == False:
found = False
for attribute in existing_attributes_data:
if attribute["name"] == name:
attribute["options"].append(data_attribute)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_attributes_data.append({
"name": name,
"options": [value],
"visible":True,
})
attributes_data = {
'attributes': existing_attributes_data
}
pprint.pprint(attributes_data)
res = self.wcapi.put(f"products/{product_id}", attributes_data)
else:
print('already_exist')
else:
print(f"error")
def delete_all_for_product(self):
response_product = self.wcapi.get(f"products/", params={"per_page": 100})
if response_product.status_code == 200:
products = response_product.json()
for product in products:
existing_attributes_data = product.get("attributes", [])
if existing_attributes_data == []:
pass
else:
attribute_data = {
'attributes': []
}
res = self.wcapi.post(f"products/{product['id']}", attribute_data)
def delete_all_term(self):
response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response_attribute.status_code == 200:
attributes = response_attribute.json()
for attribute in attributes:
response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100})
if response_attribute_term.status_code == 200:
attributes_term = response_attribute_term.json()
for term in attributes_term:
self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True})
def delete_all(self):
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True})
"""json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
print('ici')
self.wcapi.delete(f"products/attributes",params={"force": True})"""
#def get_names_attributes(self attributes):
"""
def get_list_name_data(self):
list_name_data = []
json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create_new_name_attributes(self, attributes, list_name_attributes):
for key, value in attributes.items():
for name_attribute in list_name_attributes:
if key == name_attribute:
attribute_data = {
'name' : value
}
self.wcapi.post(f"products/attributes", attribute_data)
def delete_all(self):
attributes = self.wcapi.get(f"products/attributes", params={"per_page": 100})
for attr in attributes:
self.wcapi.delete(f"products/attributes/{attr['id']}", params={"force": True})
def delete_by_name(self, name):
attributes = self.wcapi.get(f"products/attributes", params={"per_page": 100}).json()
for attr in attributes:
if attr['name'] == name:
self.wcapi.delete(f"products/attributes/{attr['id']}", params={"force": True})
"""
class TabManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_list_name_data(self):
list_name_data = []
json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create_for_product(self, product_id, title, content, nickname, position, tab_type):
logger.info(f"create_for_product(product_id={product_id}, title={title}, content={str(content)[:20]}, nickname={nickname}, position={position}, tab_type={tab_type}) called")
data_tab = {
'title': title,
'content':content,
'nickname':nickname,
'position':position,
'tab_type':tab_type
}
#list_product_tabs_data.append(data_tab)
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
already_exist = False
for data in existing_meta_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['title'] == title:
already_exist = True
if already_exist == False:
found = False
for meta in existing_meta_data:
if meta["key"] == "wb_custom_tabs":
meta["value"].append(data_tab)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_meta_data.append({
"key": "wb_custom_tabs",
"value": [data_tab]
})
meta_data_data = {
'meta_data': existing_meta_data
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
print('already_exist')
else:
print(f"error")
def delete_by_product_id(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product_id}", meta_data)
def delete_all(self):
response = self.wcapi.get(f"products/", params={"per_page": 100})
if response.status_code == 200:
product_meta_data = response.json()
for product in product_meta_data:
existing_meta_data = product.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product['id']}", meta_data)
#products = self.product_manager.get_all_products()
#for product in products:
class WooCommerceManager(OdsReader):
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager):
super().__init__()
self.wcapi = wcapi
self.media_manager = media_manager
self.category_manager = category_manager
self.product_manager = product_manager
self.tab_manager = tab_manager
self.attribute_manager = attribute_manager
def tab_exists(self, product_id, name_tab):
return self.product_manager.tab_exists(product_id, name_tab)
"""def get_product_tab_details_by_id(self,product_id):
ret = []
#product = self.wcapi.get(f"products/{product_id}", params={"per_page": 100})
all_products_json = self.get_doc_ods(2)
all_tabs = self.tab_manager.get_list_name_data()
pprint.pprint(all_tabs)
for product in all_products_json:
for tab in all_tabs:
ret.append([tab, product[tab]])
return ret
"""
def get_product_tab_details(self):
ret = []
all_products_json = self.get_doc_ods(2)
all_tabs = self.tab_manager.get_list_name_data()
for product in all_products_json:
for tab in all_tabs:
ret.append([tab, product[tab]])
return ret
def get_product_attributes_details(self):
ret = []
all_products_json = self.get_doc_ods(2)
all_attributes = self.attribute_manager.get_list_name_data()
for product in all_products_json:
for attributes in all_attributes:
ret.append([attributes, product[attributes]])
return ret
def update_product_tab(self):
#self.product_manager.update_data_product()
#product_id = self.product_manager.find_id_by_slug(slug)
products_tab_details = self.get_product_tab_details()
products = self.product_manager.get_all_products()
x=1
for product in products:
for title, content in products_tab_details:
tab_manager.create_for_product(product_id=product['id'], title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
def update_product_attribute(self):
products_attributes_details = self.get_product_attributes_details()
products = self.product_manager.get_all_products()
#x=1
for product in products:
for name, value in products_attributes_details:
self.attribute_manager.create_for_product(product_id=product['id'], name=name, value=value)
#x=x+1
def update_product_tab_by_slug(self, slug):
#self.product_manager.update_data_product()
product_id = self.product_manager.find_id_by_slug(slug)
products_tab_details = self.get_product_tab_details()
x=1
for title, content in products_tab_details:
tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
def update_product_attribute_by_slug(self, slug):
#self.product_manager.update_data_product()
product_id = self.product_manager.find_id_by_slug(slug)
products_attribute_details = self.get_product_attributes_details()
for name, value in products_attribute_details:
print(f"name = {name}, valuee = {value}")
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value)
"""def create_all_product(self):
self.product_manager.update_data_product()
x=1
products = self.product_manager.get_all_products()
json_data = self.get_doc_ods(2)
for product in products:
print('aaaaaaaaaaaaa')
#pprint.pprint(product)
#print(f"product['title'] = {product['title']}")
print(f"product['title']['rendered'] = {product['slug']}")
print('aaaaaaaaaaaaa')
products_tabs_data = self.get_content_product(product['id'])
for p in json_data:
print(f"p['Nom']= {p['Nom']}")
print(f"product.id = {product['id']}")
print(f"product.slug = {p['Slug']} - { product['slug']}")
print(f"product.name = {product['name']}")
if p['Slug'] == product['slug']:
#pprint.pprint(products_tabs_data)
print('______________________')
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=product['id'], title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
"""
#attributes = AttributeManager(wcapi)
ALL_TABS = ["Allergènes", "Conseils dutilisation", "Description", "Précautions articles"]
ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"]
#attributes.create_new_attributes(ALL_ATTRIBUTES)
#attributes.get_by_name('Composition')
"""woocommerce_manager.create_product(product_full_dict) # une ligne du tableau "Produits"
# prepare data
product_data={}
product_attributes_data=[]
product_tabs_data=[]
for k,v in product_dict.items():
if k in ALL_ATTRIBUTES:
products_attributes_data.append((k,v))
else if k in ALL_TABS:
products_tabs_data.append((k,v))
else:
products_data[k]=v
# create product and assign attributes
id = product_manager.create(product_data)
# assign tabs values
x=1
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
# assign attributes values
for name, value in products_attributes_data:
attribute_manager.create_for_product(product_id=id, name=name, value=value)"""
media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
category_manager = CategoryManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
product_manager = ProductManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
#product_manager.delete_product_by_slug("citron-meringue")
tab_manager = TabManager(wcapi=wcapi)
attribute_manager = AttributeManager(wcapi=wcapi)
#attribute_manager.create(ALL_ATTRIBUTES)
#attribute_manager.create()
#attribute_manager.configure_term()
#attribute_manager.delete_all_term()
#product_id = product_manager.find_id_by_slug("citron-meringue")"""
woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager)
#woocommerce_manager.update_product_tab()
#woocommerce_manager.tab_manager.delete_by_product_id(1890)
#woocommerce_manager.tab_manager.delete_all()
# woocommerce_manager.update_product_by_slug('bougie-personnalisee')
woocommerce_manager.attribute_manager.delete_all_for_product()
woocommerce_manager.update_product_attribute_by_slug('citron-meringue')
#woocommerce_manager.attribute_manager.delete_all_for_product()
tabs_in_product = []
for tab in ALL_TABS:
tab_in_product = woocommerce_manager.tab_exists(1890, tab)
tabs_in_product.append(tab_in_product)
#print(f"all_tabs = {tabs_in_product}")
"""print("co !!!")
if all(tabs_in_product):
print('true')
#return True
else:
print("false")
#return False"""
#products_data = woocommerce_manager.get_product_content(519)
"""x=1
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
# create product with api
# assign tabs values with api
# assign attributes values with api"""
#media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
#media_manager.delete_media_by_slug(slug="chope-citron-meringue-face")
#id = media_manager.find_id_by_slug(slug="chope-citron-meringue-face")
#print(f'id = {id}')
#slug_dict = media_manager.get_all_as_slug_dict()
#pprint.pprint(slug_dict)
#category_manager = CategoryManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
#category_manager.find_id_by_slug(slug="chopes")
#category_manager.delete_category_by_slug("bougies-gourmandes")
"""category_manager.delete_category_by_slug("chopes")
category_manager.delete_category_by_slug("gamme-prestige")
category_manager.delete_category_by_slug("nouveau-test")
category_manager.delete_category_by_slug("second-test")
category_manager.delete_category_by_slug("yoyo")
id = category_manager.update_data_categories()
category_manager.get_errors()"""
#product_manager = ProductManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
#product_manager.update_data_product()
#product_manager.find_id_by_slug('citron-meringue')
#product_manager.find_id_by_slug('bougie-chope-de-biere-lavande')
#product_manager.find_id_by_slug('bougie-personnalisee')
#product_manager.delete_product_by_slug("citron-meringue")
#product_manager.update_data_product()
#attributes.get_value_attributes()
"""product = product_manager.get_all_products()
for p in product:
print(p['id'])"""
"""product_manager.create(product_list=...)
manager = WooCommerceManager(media_manager, category_manager, product_manager)
manager.delete_product(slug=...)
"""
"""
#print(f'id = {id}')
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, medias):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.media_api_url = f"{url_website}/wp-json/wp/v2/media"
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
product_data = {
'categories':list_category_id,
'images':list_img_id,
}
print(f"product_id = {product_id}")
self.wcapi.put(f"products/{product_id}", product_data)
#self.wcapi.get(f"products/{product_id}")
def get_list_media_id_for_product(self, product):
media_id = {}
list_media_id_for_product = []
list_media_by_doc = [img.strip().replace(' ', '') for img in product['Media Slugs'].split(",")]
#pprint.pprint(self.uploaded_images_pro)
for id, media_slug in self.medias.items():
for media in list_media_by_doc:
if media == media_slug:
image_id = id
list_media_id_for_product.append(image_id)
#print(f"list_image_id_for_product = {list_image_id_for_product}")
return list_media_id_for_product
def get_list_category_for_product(self, product):
response = self.wcapi.get("products/categories")
category_list_by_doc = [cat.strip().replace('"', '') for cat in product['Categorie'].split("/")]
list_category_for_product = []
for category in response.json():
for cat in category_list_by_doc:
#print(f"category['name'] = {category['name']}")
#print(f"cat = {cat}")
if category['name'] == cat:
id_category = {'id':category['id']}
list_category_for_product.append(id_category)
return list_category_for_product
def find_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def create_product(self, product):
product_data = {
'name' : product['Nom'],
#'price': product['Prix'],
#'stock_quantity': product['Stock'],
#'description': product['Description'],
'short_description': product['Courte Description'],
}
response = self.wcapi.post("products/", product_data)
if response.status_code == 201:
product = response.json()
return product["id"]
else:
return None
def update(self):
json_data = self.get_doc_ods()
for product in json_data:
product_id = self.create_product(product)
list_category_id = self.get_list_category_for_product(product)
list_img_id = self.get_list_media_id_for_product(product)
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
def delete_product(self):
json_data = self.get_doc_ods()
for product in json_data:
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
print(f"product['Nom'] = {product['Nom']}")
print(f"pro['name'] = {pro['name']}")
if product['Nom'] == pro['name']:
self.wcapi.delete(f"products/{pro['id']}")
def delete_img_product(self):
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
#print(f"pro['name'] = {pro['name']}")
for img_pro in self.uploaded_images_pro:
#print(f"img_pro['title'] = {img_pro['title']}")
if pro['name'] == img_pro['title']:
#print(f"img_pro['id'] = {img_pro['id']}")
#print(f"pro['id'] = {pro['id']}")
response = requests.post(
f"{self.media_api_url}/{img_pro['id']}",
headers=self.headers,
json={"force": True}, # 🔥 Forcer la suppressio
verify=False
)
def delete_all_img_product_by_name(self):
response = requests.get(
f"{self.media_api_url}?per_page=100",
headers=self.headers,
verify=False
)
images = response.json()
for img in images:
for path_image, name_image, title_image, text_alt, category_name in self.uploaded_images_pro:
if img['title']['rendered'] == title_image:
print(f"img_id = {img['id']}")
print('iiii')
response = requests.delete(
f"{self.media_api_url}/{img['id']}?force=true",
headers=self.headers,
verify=False
)
print(f"Status Code: {response.status_code}")
pprint.pprint(response.json())
def delete_all_img_product(self):
response = requests.get(
f"{self.media_api_url}?per_page=100",
headers=self.headers,
verify=False
)
images = response.json()
for img in images:
for img_pro in self.uploaded_images_pro:
if img['title']['rendered'] == img_pro['title']:
print('coucou')
print(f"img_pro['id'] = {img_pro['id']}")
print(f"img['id'] = {img['id']}")
response = requests.delete(
#f"{self.media_api_url}/{img['id']}?force=true"
f"{self.media_api_url}/{img_pro['id']}?force=true",
headers=self.headers,
verify=False
)
print(f"Status Code: {response.status_code}")
pprint.pprint(response.json())
def delete(self):
#self.delete_product()
#self.delete_img_product()
#self.delete_all_img_product()
self.delete_all_img_product_by_name()
url_website = "https://lescreationsdemissbleue.local"
filename_ods = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
image_manager = ImageManager(images_to_upload,ath,url_website)
dict_image = image_manager.update()
#category_manager = CategoryManager(wcapi,ath,categories_to_create,url_website,dict_image['category'])
#category_manager.update()
#category_manager = CategoryManager(wcapi,ath,categories_to_create,url_website,images_to_upload[0]["categories"])
#category_manager.delete()
#product_manager = ProductManager(wcapi,ath,filename_ods,dict_image['product'])
#product_manager.update()
#product_manager = ProductManager(wcapi,ath,filename_ods, images_to_upload[1]["products"])
#product_manager.delete()
ALL_MEDIAS=[
{ 'slug': None, 'title': None, 'alt_text': None }
]
ALL_CATEGORIES=[
{ 'name': None, 'parent_name': 'None', 'description': None, 'media_slug': None }
]
ALL_PRODUCTS=[
{ 'name': None, 'parent_name': 'None', 'description': None, 'media_slug_list': [] }
]
"""
"""api = WoocommerceApi(...)
media_manager = MediaManager(api=api)
media_manager.upload(media_list=...)
media_manager.find_id_by_slug(slug=...)
media_manager.delete(slug=...)
category_manager = CategoryManager(api=api, medias=media_manager.get_all_as_slug_dict())
category_manager.create(name, short_description, description, )
product_manager = ProductManager(api=api, medias=media_manager.get_all())
product_manager.create(product_list=...)
manager = WooCommerceManager(media_manager, category_manager, product_manager)
manager.delete_product(slug=...)
attribute_manager = AttributeManager(api=api)
ALL_ATTRIBUTES = ("Mèche", "Fabrication")
attribute_manager.register(ALL_ATTRIBUTES)
product_manager.create(product_dict)
woocommerce_manager = WooCommerceManager(api)
woocommerce_manager.create_product(product_full_dict) # une ligne du tableau "Produits"
# prepare data
product_data={}
product_attributes_data=[]
product_tabs_data=[]
for k,v in product_dict.items():
if k in ALL_ATTRIBUTES:
products_attributes_data.append((k,v))
else if k in ALL_TABS:
products_tabs_data.append((k,v))
else:
products_data[k]=v
# create product and assign attributes
id = product_manager.create(product_data)
# assign tabs values
x=1
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
# assign attributes values
for name, value in products_attributes_data:
attribute_manager.create_for_product(product_id=id, name=name, value=value)
"""