Files
missbleue/api_2.py

1282 lines
51 KiB
Python
Raw Normal View History

2025-04-07 12:20:48 +02:00
from woocommerce import API as WoocommerceApi
from pathlib import Path
import pandas as pd
import ezodf
import requests
import pprint
import base64
import time
import json
import pyexcel_ods3
import unicodedata
import logging
logger = logging.getLogger(__name__)
# 1⃣ Configurer le logger
logging.basicConfig(
filename="woocommerce.log", # 📌 Fichier où les logs seront sauvegardés
level=logging.DEBUG, # 📌 Niveau de log (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # 📌 Format du log
datefmt="%Y-%m-%d %H:%M:%S" # 📌 Format de la date
)
# via consumer key and consumer secret :
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
wcapi = WoocommerceApi(
url="https://lescreationsdemissbleue.local",
consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
wp_api=True,
version="wc/v3",
verify_ssl=False # Désactive la vérification SSL pour le développement
)
class AuthentificationWpApi:
# Identifiants WordPress (et non WooCommerce)
wordpress_username = "admin_lcdm" # Remplace par ton username WordPress
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
# Générer l'authentification Basic en base64
auth_str = f"{wordpress_username}:{wordpress_application_password}"
auth_bytes = auth_str.encode("utf-8")
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
ath = AuthentificationWpApi()
WEBSITE_URL = "https://lescreationsdemissbleue.local"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue.ods"
BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\Photos_Claudine\\Photos_bougies_Claudine\\"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
class OdsReader:
def __init__(self):
self.filename_ods = FILENAME_ODS
def get_doc_ods(self, number_sheet):
# ezodf.config.set_table_expand_strategy('all')
doc = ezodf.opendoc(self.filename_ods)
#pprint.pprint(doc)
#print(f"type = {type(doc.sheets)}")
#print(f"doc.sheets = {doc.sheets}")
sheets_list = list(doc.sheets)
# 🔹 Vérification : Afficher toutes les feuilles disponibles
print(f"Feuilles disponibles : {[sheet.name for sheet in sheets_list]}")
sheet = doc.sheets[number_sheet]
print(f"Feuilles disponibles : {[sheet.name for sheet in doc.sheets]}") # Debug
data = []
for row in sheet.rows():
data.append([cell.value for cell in row])
df = pd.DataFrame(data)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
df = df.dropna(how='all')
json_data = df.to_dict(orient="records")
return json_data
class MediaManager(OdsReader):
def __init__(self, ath):
super().__init__()
self.ath = ath
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
#[cat.strip().replace('"', '') for cat in product['Categorie'].split("/")]
def upload_media(self):
json_data = self.get_doc_ods(0)
for media in json_data:
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
#print(f'path = {path}')
with open(BASE_PATH + media['Chemin'], "rb") as image_file:
response = requests.post(
self.media_api_url,
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
files={"file": image_file},
verify=False
)
#print(f'response.status_code = {response.status_code}, response_text = {response.text}')
if response.status_code == 201:
media_data = response.json()
#print(f"media_data['slug'] = {media_data['slug']}")
self.update_data_media(media, media_data['id'])
else:
return None
def update_data_media(self, media, id_img):
update_data = {
"title" : media['Nom'],
"alt_text": media['Description'],
"slug": media['Slug'],
}
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
response = requests.post(
f"{self.media_api_url}/{id_img}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
json=update_data,
verify=False
)
if response.status_code == 200:
return response.json()
else:
return None
def find_id_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
return img['id']
def get_all_as_slug_dict(self):
all_slug_dict = {}
images = self.get_all_images()
for img in images:
all_slug_dict[img['id']] = img['slug']
return all_slug_dict
def delete_media_by_slug(self, slug):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
delete_url = f"{self.media_api_url}/{img['id']}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
def get_all_images(self):
"""Récupère toutes les images en gérant la pagination"""
all_images = []
page = 1
while True:
response = requests.get(f"{self.media_api_url}?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
if response.status_code != 200:
break
images = response.json()
if not images:
break
all_images.extend(images)
page += 1
return all_images
def delete_images(self, images):
"""Supprime toutes les images récupérées"""
for img in images:
img_id = img['id']
delete_url = f"{self.media_api_url}/{img_id}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
"""category_manager = CategoryManager(api=api, medias=media_manager.get_all_as_slug_dict())
category_manager.create(name, short_description, description, )"""
class CategoryManager(OdsReader):
def __init__(self, wcapi, ath, medias):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
#print(f"response.status_code = {response.status_code}")
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == slug:
#pprint.pprint(cat)
return cat['id']
def create_category(self, name, description, slug):
category_data = {
"name": name,
"description": description,
"slug":slug
}
if self.find_id_by_slug(slug):
self.error_log.append(f"Catégorie contenant comme slug '{slug}' existe déjà")
else:
self.wcapi.post("products/categories/", category_data)
def assign_parent_category(self, parent_slug, slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
parent_id = self.find_id_by_parent_slug(parent_slug)
if parent_id:
if cat['slug'] == slug:
self.wcapi.put(f"products/categories/{cat['id']}",{'parent': parent_id})
def find_id_by_parent_slug(self, parent_slug):
response = self.wcapi.get("products/categories/",params={"per_page": 100})
if response.status_code == 200:
categories = response.json()
for cat in categories:
if cat['slug'] == parent_slug:
return cat['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def update_media_id_for_category(self, media_id, cat_id):
update_category_data = {
"image" : {'id':media_id},
}
self.wcapi.put(f"products/categories/{cat_id}", update_category_data)
def update_data_categories(self):
json_data = self.get_doc_ods(1)
for category in json_data:
self.create_category(category['Nom'], category['Description'], category['Slug'])
cat_id = self.find_id_by_slug(category['Slug'])
media_id = self.find_media_id_by_slug(category['Media Slug'])
self.assign_parent_category(category['Parent Slug'], category['Slug'])
self.update_media_id_for_category(media_id,cat_id)
def delete_all_category(self):
response = self.wcapi.get(f"products/categories",params={"per_page": 100})
for cat in response.json():
self.wcapi.delete(f"products/categories/{cat['id']}", params={"force": True})
def delete_media_category(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
def delete_category_by_id(self, category_id):
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def delete_category_by_slug(self, slug):
category_id = self.find_id_by_slug(slug)
#print(f"category_id = {category_id}")
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
def get_errors(self):
return print(f"self.error_log = {self.error_log}")
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, medias):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
product_data = {
'categories':list_category_id,
'images':list_img_id,
}
print(f"product_id = {product_id}")
#print(f"product_data = {product_data}")
self.wcapi.put(f"products/{product_id}", product_data)
#self.wcapi.get(f"products/{product_id}")
def get_list_media_id_for_product(self, product):
media_id = {}
list_media_id_for_product = []
list_media_by_doc = [img.strip().replace(' ', '') for img in product.split(",")]
#pprint.pprint(self.uploaded_images_pro)
#print(f'list_media_by_doc = {list_media_by_doc}')
for id, media_slug in self.medias.items():
for media in list_media_by_doc:
if media == media_slug:
image_id = {'id':id}
list_media_id_for_product.append(image_id)
#print(f"list_image_id_for_product = {list_media_id_for_product}")
return list_media_id_for_product[::-1]
def get_list_category_for_product(self, product):
response = self.wcapi.get("products/categories",params={"per_page": 100})
category_list_by_doc = [cat.strip().replace('"', '') for cat in product.split("/")]
list_category_for_product = []
for category in response.json():
for cat in category_list_by_doc:
#print(f"category['name'] = {category['name']}")
#print(f"cat = {cat}")
if category['name'] == cat:
id_category = {'id':category['id']}
list_category_for_product.append(id_category)
#print(f'list_category_for_product = {list_category_for_product}')
return list_category_for_product
def find_id_by_slug(self, slug):
response = self.wcapi.get("products/",params={"per_page": 100})
if response.status_code == 200:
products = response.json()
for pro in products:
if pro['slug'] == slug:
return pro['id']
def find_media_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def create_tabs_from_custom_dict(self, product_id, product):
product_tabs_data = {}
list_product_tabs_data = []
x = 1
for key in product.keys():
#key = key.replace("", "'")
if key == "Conseils dutilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes":
product_tabs_data['title'] = key
product_tabs_data['content'] = product[key]
product_tabs_data['nickname'] = ''
product_tabs_data['position'] = x
product_tabs_data['tab_type'] = 'local'
list_product_tabs_data.append(product_tabs_data)
product_tabs_data = {}
x += 1
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
#product_response = response.json()
meta_data = []
meta_data.append(
{'key': 'wb_custom_tabs', 'value': list_product_tabs_data}
)
meta_data_data = {
'meta_data': meta_data
}
#pprint.pprint(meta_data_data)
res = self.wcapi.post(f"products/{product_id}", meta_data_data)
else:
print(f"error")
def create_product(self, product):
product_data = {
'name' : product['Nom'],
'price': product['Prix'],
'regular_price': product['Prix'],
'stock_quantity': 1,
'manage_stock':True,
#'description': product['Description'],
'short_description': product['Courte Description'],
}
#print(f"product['Prix'] = {product['Prix']}")
if self.find_id_by_slug(product['Slug']):
self.error_log.append(f"Produit contenant comme slug '{product['Slug']}' existe déjà")
else:
self.wcapi.post("products/", product_data)
"""if response.status_code == 201:
product = response.json()
return product["id"]
else:
return None"""
def update_data_product(self):
json_data = self.get_doc_ods(2)
for product in json_data:
self.create_product(product)
product_id = self.find_id_by_slug(product['Slug'])
list_category_id = self.get_list_category_for_product(product['Catégories'])
list_img_id = self.get_list_media_id_for_product(product['Media Slugs'])
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
#self.create_tabs_from_custom_dict(product_id, product)
def get_all_products(self):
"""Récupère tous les produits en gérant la pagination"""
all_products = []
page = 1
while True:
response = self.wcapi.get("products", params={"per_page": 100, "page": page})
if response.status_code != 200:
print(f"⚠️ Erreur API WooCommerce: {response.status_code} - {response.json()}")
break
products = response.json()
if not products: # Si la page est vide, on arrête la boucle
break
all_products.extend(products)
page += 1 # On passe à la page suivante
#pprint.pprint(all_products)
return all_products
def delete_product(self):
json_data = self.get_doc_ods(2)
for product in json_data:
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
#print(f"product['Nom'] = {product['Nom']}")
#print(f"pro['name'] = {pro['name']}")
if product['Nom'] == pro['name']:
self.wcapi.delete(f"products/{pro['id']}")
"""def delete_all_product(self):
response = self.wcapi.get(f"products/",params={"per_page": 100})
for pro in response.json():
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})"""
def delete_all_product(self):
products = self.get_all_products()
for pro in products:
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})
def delete_media_product(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
def delete_product_by_id(self, product_id):
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def delete_product_by_slug(self, slug):
product_id = self.find_id_by_slug(slug)
#print(f"product_id = {product_id}")
self.wcapi.delete(f"products/{product_id}", params={"force": True})
def normalize_string(text):
return unicodedata.normalize("NFKC", text).strip().lower()
def tab_exists(self, product_id, name_tab):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
response_json = self.wcapi.get(f"products/{product_id}").json()
#pprint.pprint(response_json)
#print(f"response_json['meta_data']= {response_json['meta_data']}")
for meta_data in response_json['meta_data']:
#pprint.pprint(meta_data)
for key_meta_data, value_meta_data in meta_data.items():
#pprint.pprint(response_json['meta_data'])
#print(f"tùype meta_data = {type(response_json['meta_data'][0])}")
#print(f"value = {value}")
#print('ici')
#print(type(value_meta_data))
#print(f"key_meta_data = {key_meta_data}, value_meta_data = {value_meta_data}")
if key_meta_data == "value":
#print('loool')
if isinstance(value_meta_data, list):
for tab in value_meta_data:
# Normalisation des chaînes pour éviter les erreurs subtiles
"""clean_name_tab = name_tab.strip().lower()
clean_tab_title = tab['title'].strip().lower()
print(f"Comparing: '{clean_name_tab}' == '{clean_tab_title}'") # Débugging
if clean_name_tab == clean_tab_title:
print("✅ Correspondance trouvée !")
else:
print("❌ Pas de correspondance")"""
#print(f"name_tab = {name_tab} - tab = {tab}")
if name_tab == tab['title']:
return True
return False
class AttributeManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_attributes(self):
attributes = self.wcapi.get(f"products/attributes").json()
#pprint.pprint(attributes)
one_attribute = self.wcapi.get(f"products/attributes/1/terms").json()
#print('_____')
#pprint.pprint(one_attribute)
return attributes
def get_by_name(self, name):
attributes = self.wcapi.get(f"products/attributes").json()
for attr in attributes:
if attr['name'] == name:
attribute = self.wcapi.get(f"products/attributes/{attr['id']}", params={"per_page": 100}).json()
#print(f"attribute = {attribute}")
return attribute
def get_list_name_data(self):
list_name_data = []
json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
"""def create(self, list_name_attributes):
for name_attribute in list_name_attributes:
attribute_data = {
'name' : name_attribute
}
self.wcapi.post(f"products/attributes", attribute_data)
"""
def create(self):
features_json_data = self.get_doc_ods(3)
for item in features_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
print(f"nom = {item['Nom']}")
attribute_data = {
'name' : item["Nom"]
}
self.wcapi.post(f"products/attributes", attribute_data)
def get_term(self):
term_dict = {}
list_item = []
term_json_data = self.get_doc_ods(3)
for item in term_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
print(f"item['Valeurs'] = {item['Valeurs']}")
if "," in item["Valeurs"]:
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
else:
item['Valeurs'].strip()
if list_item:
term_dict[item['Nom']] = list_item
else:
term_dict[item['Nom']] = item['Valeurs']
return term_dict
def configure_term(self):
term_dict = self.get_term()
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
for name, value in term_dict.items():
print(f"term_dict = {term_dict}")
if attribute['name'] == name:
if isinstance(value, list):
for v in value:
term = {
'name' : v
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
else:
term = {
'name' : value
}
self.wcapi.post(f"products/attributes/{attribute['id']}/terms", term)
def create_for_product(self, product_id, name, value):
data_attribute = {
'name': name,
'options':value
}
print(f"ici = {name} et {value}")
#list_product_tabs_data.append(data_tab)
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_attributes_data = product_meta_data.get("attributes", [])
pprint.pprint(existing_attributes_data)
already_exist = False
for data in existing_attributes_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['name'] == name:
already_exist = True
if already_exist == False:
found = False
for attribute in existing_attributes_data:
if attribute["name"] == name:
attribute["options"].append(data_attribute)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_attributes_data.append({
"name": name,
"options": [value],
"visible":True,
})
attributes_data = {
'attributes': existing_attributes_data
}
pprint.pprint(attributes_data)
res = self.wcapi.put(f"products/{product_id}", attributes_data)
else:
print('already_exist')
else:
print(f"error")
def delete_all_for_product(self):
response_product = self.wcapi.get(f"products/", params={"per_page": 100})
if response_product.status_code == 200:
products = response_product.json()
for product in products:
existing_attributes_data = product.get("attributes", [])
if existing_attributes_data == []:
pass
else:
attribute_data = {
'attributes': []
}
res = self.wcapi.post(f"products/{product['id']}", attribute_data)
def delete_all_term(self):
response_attribute = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response_attribute.status_code == 200:
attributes = response_attribute.json()
for attribute in attributes:
response_attribute_term = self.wcapi.get(f"products/attributes/{attribute['id']}/terms", params={"per_page": 100})
if response_attribute_term.status_code == 200:
attributes_term = response_attribute_term.json()
for term in attributes_term:
self.wcapi.delete(f"products/attributes/{attribute['id']}/terms/{term['id']}",params={"force": True})
def delete_all(self):
response = self.wcapi.get(f"products/attributes", params={"per_page": 100})
if response.status_code == 200:
attributes = response.json()
for attribute in attributes:
self.wcapi.delete(f"products/attributes/{attribute['id']}",params={"force": True})
"""json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
print('ici')
self.wcapi.delete(f"products/attributes",params={"force": True})"""
#def get_names_attributes(self attributes):
"""
def get_list_name_data(self):
list_name_data = []
json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create_new_name_attributes(self, attributes, list_name_attributes):
for key, value in attributes.items():
for name_attribute in list_name_attributes:
if key == name_attribute:
attribute_data = {
'name' : value
}
self.wcapi.post(f"products/attributes", attribute_data)
def delete_all(self):
attributes = self.wcapi.get(f"products/attributes", params={"per_page": 100})
for attr in attributes:
self.wcapi.delete(f"products/attributes/{attr['id']}", params={"force": True})
def delete_by_name(self, name):
attributes = self.wcapi.get(f"products/attributes", params={"per_page": 100}).json()
for attr in attributes:
if attr['name'] == name:
self.wcapi.delete(f"products/attributes/{attr['id']}", params={"force": True})
"""
class TabManager(OdsReader):
def __init__(self, wcapi):
super().__init__()
self.wcapi = wcapi
def get_list_name_data(self):
list_name_data = []
json_data = self.get_doc_ods(3)
for item in json_data:
if item['Onglet'].strip() != "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
def create_for_product(self, product_id, title, content, nickname, position, tab_type):
logger.info(f"create_for_product(product_id={product_id}, title={title}, content={str(content)[:20]}, nickname={nickname}, position={position}, tab_type={tab_type}) called")
data_tab = {
'title': title,
'content':content,
'nickname':nickname,
'position':position,
'tab_type':tab_type
}
#list_product_tabs_data.append(data_tab)
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
already_exist = False
for data in existing_meta_data:
for key_data, value_data in data.items():
if key_data == "value":
if isinstance(value_data, list):
for value in value_data:
if value['title'] == title:
already_exist = True
if already_exist == False:
found = False
for meta in existing_meta_data:
if meta["key"] == "wb_custom_tabs":
meta["value"].append(data_tab)
found = True
break
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
existing_meta_data.append({
"key": "wb_custom_tabs",
"value": [data_tab]
})
meta_data_data = {
'meta_data': existing_meta_data
}
res = self.wcapi.put(f"products/{product_id}", meta_data_data)
else:
print('already_exist')
else:
print(f"error")
def delete_by_product_id(self, product_id):
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
product_meta_data = response.json()
existing_meta_data = product_meta_data.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product_id}", meta_data)
def delete_all(self):
response = self.wcapi.get(f"products/", params={"per_page": 100})
if response.status_code == 200:
product_meta_data = response.json()
for product in product_meta_data:
existing_meta_data = product.get("meta_data", [])
if existing_meta_data == []:
pass
else:
meta_data = {
'meta_data': [{"key": "wb_custom_tabs","value":[]}]
}
res = self.wcapi.post(f"products/{product['id']}", meta_data)
#products = self.product_manager.get_all_products()
#for product in products:
class WooCommerceManager(OdsReader):
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager):
super().__init__()
self.wcapi = wcapi
self.media_manager = media_manager
self.category_manager = category_manager
self.product_manager = product_manager
self.tab_manager = tab_manager
self.attribute_manager = attribute_manager
def tab_exists(self, product_id, name_tab):
return self.product_manager.tab_exists(product_id, name_tab)
"""def get_product_tab_details_by_id(self,product_id):
ret = []
#product = self.wcapi.get(f"products/{product_id}", params={"per_page": 100})
all_products_json = self.get_doc_ods(2)
all_tabs = self.tab_manager.get_list_name_data()
pprint.pprint(all_tabs)
for product in all_products_json:
for tab in all_tabs:
ret.append([tab, product[tab]])
return ret
"""
def get_product_tab_details(self):
ret = []
all_products_json = self.get_doc_ods(2)
all_tabs = self.tab_manager.get_list_name_data()
for product in all_products_json:
for tab in all_tabs:
ret.append([tab, product[tab]])
return ret
def get_product_attributes_details(self):
ret = []
all_products_json = self.get_doc_ods(2)
all_attributes = self.attribute_manager.get_list_name_data()
for product in all_products_json:
for attributes in all_attributes:
ret.append([attributes, product[attributes]])
return ret
def update_product_tab(self):
#self.product_manager.update_data_product()
#product_id = self.product_manager.find_id_by_slug(slug)
products_tab_details = self.get_product_tab_details()
products = self.product_manager.get_all_products()
x=1
for product in products:
for title, content in products_tab_details:
tab_manager.create_for_product(product_id=product['id'], title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
def update_product_attribute(self):
products_attributes_details = self.get_product_attributes_details()
products = self.product_manager.get_all_products()
#x=1
for product in products:
for name, value in products_attributes_details:
self.attribute_manager.create_for_product(product_id=product['id'], name=name, value=value)
#x=x+1
def update_product_tab_by_slug(self, slug):
#self.product_manager.update_data_product()
product_id = self.product_manager.find_id_by_slug(slug)
products_tab_details = self.get_product_tab_details()
x=1
for title, content in products_tab_details:
tab_manager.create_for_product(product_id=product_id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
def update_product_attribute_by_slug(self, slug):
#self.product_manager.update_data_product()
product_id = self.product_manager.find_id_by_slug(slug)
products_attribute_details = self.get_product_attributes_details()
for name, value in products_attribute_details:
print(f"name = {name}, valuee = {value}")
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value)
"""def create_all_product(self):
self.product_manager.update_data_product()
x=1
products = self.product_manager.get_all_products()
json_data = self.get_doc_ods(2)
for product in products:
print('aaaaaaaaaaaaa')
#pprint.pprint(product)
#print(f"product['title'] = {product['title']}")
print(f"product['title']['rendered'] = {product['slug']}")
print('aaaaaaaaaaaaa')
products_tabs_data = self.get_content_product(product['id'])
for p in json_data:
print(f"p['Nom']= {p['Nom']}")
print(f"product.id = {product['id']}")
print(f"product.slug = {p['Slug']} - { product['slug']}")
print(f"product.name = {product['name']}")
if p['Slug'] == product['slug']:
#pprint.pprint(products_tabs_data)
print('______________________')
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=product['id'], title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
"""
#attributes = AttributeManager(wcapi)
ALL_TABS = ["Allergènes", "Conseils dutilisation", "Description", "Précautions articles"]
ALL_ATTRIBUTES = ["Temps de combustion", "Type de cire", "Mèche", "Fabrication", "Composition", "Ingrédients et engagement"]
#attributes.create_new_attributes(ALL_ATTRIBUTES)
#attributes.get_by_name('Composition')
"""woocommerce_manager.create_product(product_full_dict) # une ligne du tableau "Produits"
# prepare data
product_data={}
product_attributes_data=[]
product_tabs_data=[]
for k,v in product_dict.items():
if k in ALL_ATTRIBUTES:
products_attributes_data.append((k,v))
else if k in ALL_TABS:
products_tabs_data.append((k,v))
else:
products_data[k]=v
# create product and assign attributes
id = product_manager.create(product_data)
# assign tabs values
x=1
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
# assign attributes values
for name, value in products_attributes_data:
attribute_manager.create_for_product(product_id=id, name=name, value=value)"""
media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
category_manager = CategoryManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
product_manager = ProductManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
#product_manager.delete_product_by_slug("citron-meringue")
tab_manager = TabManager(wcapi=wcapi)
attribute_manager = AttributeManager(wcapi=wcapi)
#attribute_manager.create(ALL_ATTRIBUTES)
#attribute_manager.create()
#attribute_manager.configure_term()
#attribute_manager.delete_all_term()
#product_id = product_manager.find_id_by_slug("citron-meringue")"""
woocommerce_manager = WooCommerceManager(wcapi=wcapi, media_manager=media_manager,category_manager=category_manager,product_manager=product_manager, tab_manager=tab_manager, attribute_manager=attribute_manager)
#woocommerce_manager.update_product_tab()
#woocommerce_manager.tab_manager.delete_by_product_id(1890)
#woocommerce_manager.tab_manager.delete_all()
# woocommerce_manager.update_product_by_slug('bougie-personnalisee')
woocommerce_manager.attribute_manager.delete_all_for_product()
woocommerce_manager.update_product_attribute_by_slug('citron-meringue')
#woocommerce_manager.attribute_manager.delete_all_for_product()
tabs_in_product = []
for tab in ALL_TABS:
tab_in_product = woocommerce_manager.tab_exists(1890, tab)
tabs_in_product.append(tab_in_product)
#print(f"all_tabs = {tabs_in_product}")
"""print("co !!!")
if all(tabs_in_product):
print('true')
#return True
else:
print("false")
#return False"""
#products_data = woocommerce_manager.get_product_content(519)
"""x=1
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
# create product with api
# assign tabs values with api
# assign attributes values with api"""
#media_manager = MediaManager(ath=ath)
#media_manager.upload_media()
#media_manager.delete_media_by_slug(slug="chope-citron-meringue-face")
#id = media_manager.find_id_by_slug(slug="chope-citron-meringue-face")
#print(f'id = {id}')
#slug_dict = media_manager.get_all_as_slug_dict()
#pprint.pprint(slug_dict)
#category_manager = CategoryManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
#category_manager.find_id_by_slug(slug="chopes")
#category_manager.delete_category_by_slug("bougies-gourmandes")
"""category_manager.delete_category_by_slug("chopes")
category_manager.delete_category_by_slug("gamme-prestige")
category_manager.delete_category_by_slug("nouveau-test")
category_manager.delete_category_by_slug("second-test")
category_manager.delete_category_by_slug("yoyo")
id = category_manager.update_data_categories()
category_manager.get_errors()"""
#product_manager = ProductManager(wcapi=wcapi,ath=ath, medias=media_manager.get_all_as_slug_dict())
#product_manager.update_data_product()
#product_manager.find_id_by_slug('citron-meringue')
#product_manager.find_id_by_slug('bougie-chope-de-biere-lavande')
#product_manager.find_id_by_slug('bougie-personnalisee')
#product_manager.delete_product_by_slug("citron-meringue")
#product_manager.update_data_product()
#attributes.get_value_attributes()
"""product = product_manager.get_all_products()
for p in product:
print(p['id'])"""
"""product_manager.create(product_list=...)
manager = WooCommerceManager(media_manager, category_manager, product_manager)
manager.delete_product(slug=...)
"""
"""
#print(f'id = {id}')
class ProductManager(OdsReader):
def __init__(self, wcapi, ath, medias):
super().__init__()
self.wcapi = wcapi
self.ath = ath
self.medias = medias
self.media_api_url = f"{url_website}/wp-json/wp/v2/media"
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
}
def update_data_list_cat_product(self, list_category_id, list_img_id, product_id):
product_data = {
'categories':list_category_id,
'images':list_img_id,
}
print(f"product_id = {product_id}")
self.wcapi.put(f"products/{product_id}", product_data)
#self.wcapi.get(f"products/{product_id}")
def get_list_media_id_for_product(self, product):
media_id = {}
list_media_id_for_product = []
list_media_by_doc = [img.strip().replace(' ', '') for img in product['Media Slugs'].split(",")]
#pprint.pprint(self.uploaded_images_pro)
for id, media_slug in self.medias.items():
for media in list_media_by_doc:
if media == media_slug:
image_id = id
list_media_id_for_product.append(image_id)
#print(f"list_image_id_for_product = {list_image_id_for_product}")
return list_media_id_for_product
def get_list_category_for_product(self, product):
response = self.wcapi.get("products/categories")
category_list_by_doc = [cat.strip().replace('"', '') for cat in product['Categorie'].split("/")]
list_category_for_product = []
for category in response.json():
for cat in category_list_by_doc:
#print(f"category['name'] = {category['name']}")
#print(f"cat = {cat}")
if category['name'] == cat:
id_category = {'id':category['id']}
list_category_for_product.append(id_category)
return list_category_for_product
def find_id_by_slug(self, media_slug):
for id, slug in self.medias.items():
if media_slug == slug:
return id
def create_product(self, product):
product_data = {
'name' : product['Nom'],
#'price': product['Prix'],
#'stock_quantity': product['Stock'],
#'description': product['Description'],
'short_description': product['Courte Description'],
}
response = self.wcapi.post("products/", product_data)
if response.status_code == 201:
product = response.json()
return product["id"]
else:
return None
def update(self):
json_data = self.get_doc_ods()
for product in json_data:
product_id = self.create_product(product)
list_category_id = self.get_list_category_for_product(product)
list_img_id = self.get_list_media_id_for_product(product)
self.update_data_list_cat_product(list_category_id, list_img_id, product_id)
def delete_product(self):
json_data = self.get_doc_ods()
for product in json_data:
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
print(f"product['Nom'] = {product['Nom']}")
print(f"pro['name'] = {pro['name']}")
if product['Nom'] == pro['name']:
self.wcapi.delete(f"products/{pro['id']}")
def delete_img_product(self):
list_products = self.wcapi.get(f"products/")
for pro in list_products.json():
#print(f"pro['name'] = {pro['name']}")
for img_pro in self.uploaded_images_pro:
#print(f"img_pro['title'] = {img_pro['title']}")
if pro['name'] == img_pro['title']:
#print(f"img_pro['id'] = {img_pro['id']}")
#print(f"pro['id'] = {pro['id']}")
response = requests.post(
f"{self.media_api_url}/{img_pro['id']}",
headers=self.headers,
json={"force": True}, # 🔥 Forcer la suppressio
verify=False
)
def delete_all_img_product_by_name(self):
response = requests.get(
f"{self.media_api_url}?per_page=100",
headers=self.headers,
verify=False
)
images = response.json()
for img in images:
for path_image, name_image, title_image, text_alt, category_name in self.uploaded_images_pro:
if img['title']['rendered'] == title_image:
print(f"img_id = {img['id']}")
print('iiii')
response = requests.delete(
f"{self.media_api_url}/{img['id']}?force=true",
headers=self.headers,
verify=False
)
print(f"Status Code: {response.status_code}")
pprint.pprint(response.json())
def delete_all_img_product(self):
response = requests.get(
f"{self.media_api_url}?per_page=100",
headers=self.headers,
verify=False
)
images = response.json()
for img in images:
for img_pro in self.uploaded_images_pro:
if img['title']['rendered'] == img_pro['title']:
print('coucou')
print(f"img_pro['id'] = {img_pro['id']}")
print(f"img['id'] = {img['id']}")
response = requests.delete(
#f"{self.media_api_url}/{img['id']}?force=true"
f"{self.media_api_url}/{img_pro['id']}?force=true",
headers=self.headers,
verify=False
)
print(f"Status Code: {response.status_code}")
pprint.pprint(response.json())
def delete(self):
#self.delete_product()
#self.delete_img_product()
#self.delete_all_img_product()
self.delete_all_img_product_by_name()
url_website = "https://lescreationsdemissbleue.local"
filename_ods = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
image_manager = ImageManager(images_to_upload,ath,url_website)
dict_image = image_manager.update()
#category_manager = CategoryManager(wcapi,ath,categories_to_create,url_website,dict_image['category'])
#category_manager.update()
#category_manager = CategoryManager(wcapi,ath,categories_to_create,url_website,images_to_upload[0]["categories"])
#category_manager.delete()
#product_manager = ProductManager(wcapi,ath,filename_ods,dict_image['product'])
#product_manager.update()
#product_manager = ProductManager(wcapi,ath,filename_ods, images_to_upload[1]["products"])
#product_manager.delete()
ALL_MEDIAS=[
{ 'slug': None, 'title': None, 'alt_text': None }
]
ALL_CATEGORIES=[
{ 'name': None, 'parent_name': 'None', 'description': None, 'media_slug': None }
]
ALL_PRODUCTS=[
{ 'name': None, 'parent_name': 'None', 'description': None, 'media_slug_list': [] }
]
"""
"""api = WoocommerceApi(...)
media_manager = MediaManager(api=api)
media_manager.upload(media_list=...)
media_manager.find_id_by_slug(slug=...)
media_manager.delete(slug=...)
category_manager = CategoryManager(api=api, medias=media_manager.get_all_as_slug_dict())
category_manager.create(name, short_description, description, )
product_manager = ProductManager(api=api, medias=media_manager.get_all())
product_manager.create(product_list=...)
manager = WooCommerceManager(media_manager, category_manager, product_manager)
manager.delete_product(slug=...)
attribute_manager = AttributeManager(api=api)
ALL_ATTRIBUTES = ("Mèche", "Fabrication")
attribute_manager.register(ALL_ATTRIBUTES)
product_manager.create(product_dict)
woocommerce_manager = WooCommerceManager(api)
woocommerce_manager.create_product(product_full_dict) # une ligne du tableau "Produits"
# prepare data
product_data={}
product_attributes_data=[]
product_tabs_data=[]
for k,v in product_dict.items():
if k in ALL_ATTRIBUTES:
products_attributes_data.append((k,v))
else if k in ALL_TABS:
products_tabs_data.append((k,v))
else:
products_data[k]=v
# create product and assign attributes
id = product_manager.create(product_data)
# assign tabs values
x=1
for title, content in products_tabs_data:
tab_manager.create_for_product(product_id=id, title=title, content=content, nickname="", position=x, tab_type="local")
x=x+1
# assign attributes values
for name, value in products_attributes_data:
attribute_manager.create_for_product(product_id=id, name=name, value=value)
"""