#4 Changements + améliorations programme

This commit is contained in:
beren
2025-05-18 21:19:20 +02:00
parent c1bf873ca5
commit 9b3979ab7f
7 changed files with 526 additions and 160 deletions

134
api.py Normal file
View File

@ -0,0 +1,134 @@
from woocommerce import API
import base64
import requests
import ssl
from requests.adapters import HTTPAdapter
class WoocommerceApiClient(API):
def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs):
self.max_retry = 20
self.url = url
super().__init__(url, wc_consumer_key, wc_consumer_secret, **kwargs)
auth_str = f"{wp_application_user}:{wp_application_password}"
auth_bytes = auth_str.encode("utf-8")
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
self.headers = {"Authorization": f"Basic {auth_base64}",
"User-Agent": "Mozilla/5.0"
}
self.verify_ssl = verify_ssl
self.init_ssl()
def _is_wc_api(self, endpoint):
return endpoint.startswith("media")
def _is_wp_api(self, endpoint):
return endpoint.startswith("settings")
def _get_requests_general_kwargs(self, endpoint):
return {
'url': f'{self.url}/wp-json/wp/v2/{endpoint}',
'headers': self.headers,
'verify': self.verify_ssl,
}
def init_ssl(self):
class SSLContextAdapter(HTTPAdapter):
def __init__(self, ssl_context=None, **kwargs):
self.ssl_context = ssl_context
super().__init__(**kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
super().init_poolmanager(*args, **kwargs)
# Create your custom SSL context
context = ssl.create_default_context()
context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK')
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.maximum_version = ssl.TLSVersion.TLSv1_3
# Create a session and mount adapter
session = requests.Session()
adapter = SSLContextAdapter(ssl_context=context)
session.mount('https://', adapter)
def get(self, endpoint, **kwargs):
retry = 0
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
print(kwargs) # ✅ Montre tout le dict
print(kwargs["headers"]) # ✅ Si tu veux un champ en particulier
return requests.get(**kwargs)
else:
return super().get(endpoint, **kwargs)
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise
def post(self, endpoint, data=None, files=None, **kwargs):
retry = 0
response = None
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
if files:
kwargs['files'] = files
if data:
kwargs['json'] = data
print("kwargs envoyés à requests.post:", kwargs)
response = requests.post(**kwargs)
elif self._is_wp_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
kwargs['json'] = data
response = requests.post(**kwargs)
else:
response = super().post(endpoint, data, **kwargs)
if response and response.status_code not in (500, 400):
return response
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise
def put(self, endpoint, data, file=None, **kwargs):
retry = 0
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
if file:
kwargs['json'] = file
if data:
kwargs['json'] = data
return requests.put(**kwargs)
else:
return super().put(endpoint, data, **kwargs)
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise
def delete(self, endpoint, **kwargs):
retry = 0
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
return requests.delete(**self._get_requests_general_kwargs(endpoint), params={"force": True})
else:
return super().delete(endpoint, **kwargs)
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise

View File

@ -16,6 +16,10 @@ import argparse
from logging.handlers import TimedRotatingFileHandler
from watermark import create_watermark_image
import ssl
import urllib3
from base64 import b64encode
# Créer un dossier 'logs' s'il n'existe pas
log_directory = "logs"
os.makedirs(log_directory, exist_ok=True)
@ -53,21 +57,16 @@ logger.critical("Erreur critique (CRITICAL)")"""
# via consumer key and consumer secret :
# https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768
wcapi = WoocommerceApi(
#url="https://lescreationsdemissbleue.local",
url="https://les-creations-de-missbleue.local",
consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
wp_api=True,
version="wc/v3",
verify_ssl=False, # Désactive la vérification SSL pour le développement
timeout=30
)
#url="https://les-creations-de-missbleue.local",
#consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e",
#consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768",
class AuthentificationWpApi:
class AuthentificationWpApi():
# Identifiants WordPress (et non WooCommerce)
wordpress_username = "admin_lcdm" # Remplace par ton username WordPress
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #'W6Zt N5CU 2Gj6 TlKm clGn LvIz' #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs
# Générer l'authentification Basic en base64
auth_str = f"{wordpress_username}:{wordpress_application_password}"
@ -76,12 +75,13 @@ class AuthentificationWpApi:
ath = AuthentificationWpApi()
#WEBSITE_URL = "https://lescreationsdemissbleue.local"
WEBSITE_URL = "https://les-creations-de-missbleue.local"
#WEBSITE_URL = "https://les-creations-de-missbleue.com"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods"
BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
#BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\"
#FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_corrige.ods"
FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_version_finale.ods"
@ -226,11 +226,18 @@ class OdsReader:
class MediaManager(OdsReader):
def __init__(self, ath, filename_ods):# filename_ods
def __init__(self, ath, wcapi, filename_ods):# filename_ods
super().__init__(filename_ods) # filename_ods
self.ath = ath
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings"
self.wcapi = wcapi
#self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
#self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings"
self.dict_equivalence = {'ambre':'ambré', 'meringue':'meringué', 'givree':'givrée', 'sale':'salé', 'bresilien':'brésilien', 'epices':'épices', 'noel':'noël', 'a':'à', 'petales':'pétales',
'lumiere':'lumière', 'allumee':'allumée', 'eteinte':'éteinte', 'celebration':'célébration', 'argente':'argenté', 'dore':'doré', 'accroche':'accroché', 'pose':'posé', 'colore':'coloré',
'kevin': 'Kévin', 'interieur':'intérieur', 'cafe':'café', 'bresil':'Brésil', 'dagrumes': "d'agrumes", "iles":"îles", 'apero': 'apéro', 'quebecois':'québecois', 'defendu':'défendu',
'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé'
}
def upload_media(self, search_value=None):
if search_value:
@ -247,22 +254,13 @@ class MediaManager(OdsReader):
# 👇 Tentative d'ouverture et d'envoi
with open(image_path, "rb") as image_file:
response = requests.post(
self.media_api_url,
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
},
files={"file": image_file},
verify=False
)
response = self.wcapi.post("media",files={"file": image_file})
if response.status_code == 201:
media_data = response.json()
self.update_data_media(media, media_data['id'])
logger.info(f"✅ Image uploadée : {image_name}")
else:
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.media_api_url}")
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}")
else:
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
@ -283,24 +281,32 @@ class MediaManager(OdsReader):
image_path = path
else:
image_path = BASE_PATH + media['Chemin']
print(f"image_path = {image_path}")
#print(f"image_path = {image_path}")
# 👇 Tentative d'ouverture et d'envoi
with open(image_path, "rb") as image_file:
response = requests.post(
self.media_api_url,
response = self.wcapi.post("media",files={"file": image_file})
print("kkkkkkkkkkkkkkkkkkkkkkkkkk")
""" response = self.wcapi.post(
"media",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Disposition": f"attachment; filename={image_name}"
"Content-Disposition": f"attachment; filename={image_name}",
"User-Agent": "Mozilla/5.0"
},
files={"file": image_file},
verify=False
)
)"""
print(f"response = {response.status_code}")
if response.status_code == 201:
print('____')
pprint.pprint(media)
media_data = response.json()
self.update_data_media(media, media_data['id'])
logger.info(f"✅ Image uploadée : {image_name}")
else:
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.media_api_url}")
logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}")
else:
logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}")
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
@ -315,16 +321,25 @@ class MediaManager(OdsReader):
json_data = self.fetch_all_media_rows(range_start, range_end)
for media in json_data:
pprint.pprint(media)
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
first_folder = media['Chemin'].split("\\")[0]
print(f"first_folder = {first_folder}")
watermarked_path = Path(create_watermark_image(str(path)))
watermarked_name = watermarked_path.name
print('logo')
if first_folder == 'Logo':
print("first_file")
self.create_and_update_media(media,image_name,path)
else:
print("pas logo")
self.create_and_update_media(media, watermarked_name, watermarked_path, True)
try:
os.remove(watermarked_path)
except FileNotFoundError:
logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})")
def is_exists(self, media, image_name):
@ -337,27 +352,61 @@ class MediaManager(OdsReader):
pass
return False
def get_title_and_alt_text_media(self, media):
sentence = media['Slug'].replace('-', ' ')
sentence = sentence.replace('img', '').strip()
print(f"type = {type(sentence)}")
title = sentence.capitalize()
alt_text = title
return title, alt_text
def update_accent_in_sentence(self, sentence):
print(type(sentence))
print(sentence)
words = sentence.split()
new_words = [self.dict_equivalence[word.lower()] if word.lower() in self.dict_equivalence else word for word in words]
pprint.pprint(new_words)
new_sentence = " ".join(new_words)
print(f"new_sentence = {new_sentence}")
return new_sentence
def update_data_media(self, media, id_img):
print(f"nom = {media['Nom']}")
print(type(media['Nom']))
if media['Nom'] is None or media['Description'] is None:
title, alt_text = self.get_title_and_alt_text_media(media)
else:
title = media['Nom']
alt_text = media['Description']
title = self.update_accent_in_sentence(title)
alt_text = self.update_accent_in_sentence(alt_text)
print(f"title = {title}")
print(f"alt_txt = {alt_text}")
update_data = {
"title" : media['Nom'],
"alt_text": media['Description'],
"title" : title,
"alt_text": alt_text,
"slug": media['Slug'],
}
path = Path(BASE_PATH + media['Chemin'])
image_name = path.name
response = requests.post(
f"{self.media_api_url}/{id_img}",
response = self.wcapi.post(f"media/{id_img}", data=update_data)
"""response = self.wcapi.post(
f"media/{id_img}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
#"Authorization": f"Basic {self.ath['auth_base64']}",
"Content-Disposition": f"attachment; filename={image_name}"
"Content-Disposition": f"attachment; filename={image_name}",
"User-Agent": "Mozilla/5.0"
},
json=update_data,
verify=False
)
)"""
if response.status_code == 200:
return response.json()
@ -381,60 +430,55 @@ class MediaManager(OdsReader):
images = self.get_all_images()
for img in images:
if img['slug'] == slug:
delete_url = f"{self.media_api_url}/{img['id']}?force=true"
response = requests.delete(delete_url,
#headers={"Authorization": f"Basic {self.ath['auth_base64']}"},
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False)
self.wcapi.delete(f"media/{img['id']}", params = {"force": True})
def get_all_images(self):
"""Récupère toutes les images en gérant la pagination"""
all_images = []
page = 1
print(f"self.ath.auth_base64 = {self.ath.auth_base64}")
while True:
response = requests.get(f"{self.media_api_url}?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#headers={"Authorization": f"Basic {self.ath['auth_base64']}"},
"""response = self.wcapi.get(f"wp-json/wp/v2/media?per_page=100&page={page}",
headers={"Authorization": f"Basic {self.ath.auth_base64}",
"User-Agent": "Mozilla/5.0"},
verify=False
)
)"""
response = self.wcapi.get("media", params={"per_page": 100, "page": page})
if response.status_code != 200:
break
images = response.json()
if not images:
break
all_images.extend(images)
page += 1
return all_images
def delete_images(self, images):
"""Supprime toutes les images récupérées"""
for img in images:
img_id = img['id']
delete_url = f"{self.media_api_url}/{img_id}?force=true"
response = self.wcapi.delete(f"media/{img_id}")
"""delete_url = f"media/{img_id}?force=true"
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
headers={"Authorization": f"Basic {self.ath.auth_base64}",
"User-Agent": "Mozilla/5.0"},
#{"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False)
verify=False)"""
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
print(f"Erreur suppression {img_id} :", response.status_code, response.text)
def delete_all_images(self):
print('iciiiii')
images = self.get_all_images()
for img in images:
img_id = img['id']
delete_url = f"{self.media_api_url}/{img_id}?force=true"
print(f"img_id = {img['id']}")
response = self.wcapi.delete(f"media/{img_id}")
response = requests.delete(delete_url,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False)
if response.status_code in [200, 410]: # 410 = déjà supprimé
print(f"Image {img_id} supprimée.")
else:
@ -443,17 +487,12 @@ class MediaManager(OdsReader):
def assign_image_logo(self):
images = self.get_all_images()
for img in images:
if img['slug'] == "logo-lescreationsdemissbleue":
if img['slug'] == "img-logo-lescreationsdemissbleue":
data = {
"site_logo":img['id'],
"site_icon" : img['id']
}
response = requests.post(
self.media_api_settings,
json=data,
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
response = self.wcapi.post("settings", data=data)
if response.status_code == 200:
print("Logo mis à jour avec succès !")
@ -525,10 +564,7 @@ class CategoryManager(OdsReader):
return id
def update_media_id_for_category(self, media_id, cat_id):
response = requests.get(f"{self.media_api_url}/{media_id}",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
verify=False
)
response = self.wcapi.get(f"media/{media_id}", params={"per_page": 1, "page": 1})
update_category_data = {
"image" : {'id':media_id},
}
@ -553,11 +589,7 @@ class CategoryManager(OdsReader):
def delete_media_category(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
self.wcapi.delete(f"media/{media_id}")
def delete_category_by_id(self, category_id):
self.wcapi.delete(f"products/categories/{category_id}", params={"force": True})
@ -579,7 +611,8 @@ class ProductManager(OdsReader):
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0"
}
self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media"
@ -633,7 +666,7 @@ class ProductManager(OdsReader):
list_product_tabs_data = []
x = 1
for key in product.keys():
if key == "Conseils dutilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes":
if key == "Conseils dutilisation" or key == "Précautions articles" or key == "Description": #or key == "Allergènes":
product_tabs_data['title'] = key
product_tabs_data['content'] = product[key]
product_tabs_data['nickname'] = ''
@ -724,15 +757,12 @@ class ProductManager(OdsReader):
products = self.get_all_products()
if products:
for pro in products:
print(f"pro['id'] = {pro['id']}")
self.wcapi.delete(f"products/{pro['id']}", params={"force": True})
def delete_media_product(self, media_slug):
media_id = self.find_media_id_by_slug(media_slug)
requests.delete(
f"{self.media_api_url}/{media_id['id']}",
headers=self.headers,
verify=False
)
self.wcapi.delete(f"media/{media_id['id']}")
def delete_product_by_id(self, product_id):
self.wcapi.delete(f"products/{product_id}", params={"force": True})
@ -780,7 +810,8 @@ class AttributeManager(OdsReader):
list_name_data = []
json_data = self.get_all_attribute_and_tab_lines()
for item in json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
#if item['Onglet'].strip() == "Informations Complémentaires":
if item['Onglet'] == "Informations Complémentaires":
list_name_data.append(item['Nom'])
return list_name_data
@ -790,7 +821,8 @@ class AttributeManager(OdsReader):
else:
features_json_data = self.get_all_attribute_and_tab_lines()
for item in features_json_data:
if item['Onglet'].strip() == "Informations Complémentaires":
#if item['Onglet'].strip() == "Informations Complémentaires":
if item['Onglet'] == "Informations Complémentaires":
attribute_data = {
'name' : item["Nom"]
}
@ -804,7 +836,8 @@ class AttributeManager(OdsReader):
term_json_data = self.get_all_attribute_and_tab_lines()
for item in term_json_data:
list_item = []
if item['Onglet'].strip() == "Informations Complémentaires":
#if item['Onglet'].strip() == "Informations Complémentaires":
if item['Onglet'] == "Informations Complémentaires":
if "," in item["Valeurs"]:
list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")]
else:
@ -838,6 +871,8 @@ class AttributeManager(OdsReader):
def create_for_product(self, product_id, name, value, variation=False):
print(f"variation = {variation}")
print(f"value = {value}")
data_attribute = {
'name': name,
'options':value
@ -858,6 +893,8 @@ class AttributeManager(OdsReader):
if already_exist == False:
found = False
print(f"attributes_data = {existing_attributes_data}")
print(f"data_attribute = {data_attribute}")
for attribute in existing_attributes_data:
if attribute["name"] == name:
attribute["options"].append(data_attribute)
@ -866,9 +903,14 @@ class AttributeManager(OdsReader):
# Si l'onglet `wb_custom_tabs` n'existe pas, on le crée
if not found:
print(f"value = {value}")
if value is not None:
value = [v.strip() for v in value.split(",")]
print(f"value = {value}")
existing_attributes_data.append({
"name": name,
"options": [value],
"options": value,
"visible":True,
"variation": variation,
#"parent_id":product_id
@ -1086,14 +1128,95 @@ class VariationsManager(OdsReader):
print(update_res.json())
def create_variations_products(self, product_id, product_data):
#products_lines = self.get_all_product_lines()
products_lines = self.get_all_product_lines()
product_line = self.get_product_by_slug_from_ods(product_data['slug'])
parfums = None
volumes = None
price_per_product_variable = None
if product_line['Type'] == "Variable":
if product_line['Choix parfums'] is not None:
parfums = [p.strip() for p in product_line['Choix parfums'].split(",")]
print(f"parfums = {parfums}")
if product_line['Volume'] is not None:
volumes = [v.strip() for v in product_line['Volume'].split(",")]
print(f"volumes = {volumes}")
"""if product_line['Prix pour'] is not None:
#products = [v.strip() for v in product_line['Prix pour'].split(",")]
products = product_line['Prix pour'].split(",")
price_per_product_variable = {}
for p in products:
name, price = p.split("=")
price_per_product_variable[name.strip()] = price.strip()
pprint.pprint(price_per_product_variable)"""
response = self.wcapi.get(f"products/{product_id}")
print(f"product_id = {product_id}")
print(f"response = {response.status_code}")
try:
if response.status_code == 200:
#existing_product = response.json()
#self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums)
if parfums is not None:
for parfum in parfums:
data = {
'attributes': [
{
'name': 'Choix parfums',
'option': parfum
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': product_data['price'],
}
print(f"Posting variation: {data}")
response = self.wcapi.post(f"products/{product_id}/variations", data)
print(response.status_code)
print(response.json())
logger.info(f"Variation de parfums a bien été créé")
if volumes is not None:
for volume in volumes:
data = {
'attributes': [
{
'name': 'Volume',
'option': volume
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': product_data['price'],
}
print(f"Posting variation: {data}")
result = self.wcapi.post(f"products/{product_id}/variations", data)
logger.info(f"Variation de volumes a bien été créé")
"""if price_per_product_variable is not None:
for name, price in price_per_product_variable.items():
data = {
'attributes': [
{
'name': 'Volume',
'option': name
}
],
'manage_stock': False,
'in_stock':True,
'regular_price': price,
}
result = self.wcapi.post(f"products/{product_id}/variations", data)
logger.info(f"Variation de prix selon objet bien créé")"""
except Exception as e:
logger.exception(f"Erreur lors de la création du produit de variation : {e}")
#logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}")
"""
for product_line_key, products_line_value in product_line.items():
if product_line_key == "Parfums":
if product_line_key == "Choix parfums":
name_attribute = product_line_key
parfums = products_line_value
if product_line_key == "Type":
if product_data['type'] == "variable":
if product_data['type'] == "Variable":
response = self.wcapi.get(f"products/{product_id}")
if response.status_code == 200:
existing_product = response.json()
@ -1121,7 +1244,7 @@ class VariationsManager(OdsReader):
print(result.status_code)
pprint.pprint(result.json())
else:
return False
return False"""
class WooCommerceManager(OdsReader):
def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager, filename_ods):
@ -1195,8 +1318,8 @@ class WooCommerceManager(OdsReader):
self.update_product_tab()
self.update_product_attribute()"""
def update_product_variation(self, product_id, product_data):
pass
"""def update_product_variation(self, product_id, product_data):
pass"""
def update_product_by_slug(self, slug):
self.product_manager.update_data_product_by_slug(slug)
@ -1222,17 +1345,52 @@ class WooCommerceManager(OdsReader):
category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")]
return category_list_by_doc
def get_list_media_id_for_product(self, media):
list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")]
def get_list_variable_attributes(self,attributes):
list_variable_attributes_by_doc = [attr.strip() for attr in attributes.split(",")]
return list_variable_attributes_by_doc
def get_list_media_id_for_product(self, product):
#list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")]
list_media_by_doc = []
list_media_by_doc.append(product['Image1'])
list_media_by_doc.append(product['Image2'])
list_media_by_doc.append(product['Image3'])
list_media_by_doc.append(product['Pyramide'])
return list_media_by_doc
def is_variable(self, type):
"""def is_variable(self, type):
#print(f"type.lower = { type.lower}")
#print(f"type = {type}")
return type.lower() == "parfums"
#if type.lower() == 'Choix parfums' or type.lower() == 'Volume' or type.lower() == 'Prix pour':
# return True"""
"""def is_variable(self, type):
return type.lower() == "parfums"""
"""def is_variable(self, product_data):
if product_data['Choix parfums'] is not None or product_data['Volume'] is not None or product_data['Prix pour'] is not None:
return True"""
def is_variable(self, name, value):
"""print(f"typeee = {type}")
if type == 'variable':
return True"""
if name == "Volume" or name =="Choix parfums":
if value is not None:
return True
else:
return False
def update_product_attribute(self, attributes, product_data):
print(f"product_data = {product_data}")
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
for name, value in attributes.items():
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data['type']))
print(f"self.is_variable(product_data['type'])) = {self.is_variable(name, value)}")
self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(name, value))
#self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data))
def update_product_variations(self, product_data):
product_id = self.product_manager.find_id_by_slug(product_data['slug'])
@ -1293,8 +1451,14 @@ class WooCommerceManager(OdsReader):
'short_description': product_line['Courte Description'],
'slug':product_line['Slug']
}
if product_line['Promo'] is not None:
product_data['sale_price'] = product_line['Promo']
if product_line['Type'] == "parfums":
product_data['type'] = "variable"
if product_line['Volume'] is not None:
values_attributes = self.get_list_variable_attributes(product_line['Volume'])
attributes['Volume'] = values_attributes
else:
product_data['type'] = "simple"
@ -1305,7 +1469,8 @@ class WooCommerceManager(OdsReader):
"Fabrication" : product_line['Fabrication'],
"Composition" : product_line['Composition'],
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
"Parfums" : product_line['Parfums']
"Parfums" : product_line['Parfums'],
"Volume" : product_line["Volume"]
}
tabs ={
@ -1336,6 +1501,7 @@ class WooCommerceManager(OdsReader):
if self.product_manager.find_id_by_slug(product_line['Slug']):
logger.debug(f"Produit contenant comme slug '{product_line['Slug']}' existe déjà")
else:
print(f"process_file_from_to {product_line['Nom']}")
# standard product data
product_data = {
'name' : product_line['Nom'],
@ -1349,10 +1515,6 @@ class WooCommerceManager(OdsReader):
'short_description': product_line['Courte Description'],
'slug':product_line['Slug']
}
if product_line['Type'] == "parfums":
product_data['type'] = "variable"
else:
product_data['type'] = "simple"
attributes = {
"Temps de combustion" : product_line['Temps de combustion'],
@ -1361,9 +1523,33 @@ class WooCommerceManager(OdsReader):
"Fabrication" : product_line['Fabrication'],
"Composition" : product_line['Composition'],
"Ingrédients et engagements" : product_line['Ingrédients et engagements'],
"Parfums" : product_line['Parfums']
#"Parfums" : product_line['Choix parfums']
#"Volume" : product_line["Volume"]
}
if product_line['Promo'] is not None:
product_data['sale_price'] = product_line['Promo']
if product_line['Type'] == "Variable":
product_data['type'] = "variable"
if product_line['Choix parfums'] is not None:
values_attributes = self.get_list_variable_attributes(product_line['Choix parfums'])
print(f"values_attributes = {values_attributes}")
attributes['Choix parfums'] = values_attributes
if product_line['Volume'] is not None:
values_attributes = self.get_list_variable_attributes(product_line['Volume'])
print(f"values_attributes = {values_attributes}")
attributes['Volume'] = product_line['Volume']
"""if product_line['Prix pour'] is not None:
values_attributes = self.get_list_variable_attributes(product_line['Prix pour'])
print(f"values_attributes = {values_attributes}")
attributes['Prix pour'] = values_attributes"""
else:
product_data['type'] = "simple"
print('attributes')
pprint.pprint(attributes)
tabs ={
#"Description" : product_line["Description"],
"Conseils d'utilisation" : product_line["Conseils dutilisation"],
@ -1374,23 +1560,12 @@ class WooCommerceManager(OdsReader):
categories = self.get_list_category_for_product(product_line['Catégories'])
# ... associated medias
print(f"product_line['Media Slugs'] = {product_line['Media Slugs']}")
medias = self.get_list_media_id_for_product(product_line['Media Slugs'])
medias = self.get_list_media_id_for_product(product_line)
# create or update product
self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias, json_data=product_line)
"""def put_social_data(self):
response = requests.post(url,
auth=HTTPBasicAuth("consumer_key", "consumer_secret"),
json={
"acf": {
"instagram_url": "https://instagram.com/ton_compte"
}
}
)"""
def delete_all_informations(self):
self.media_manager.delete_all_images()
self.attribute_manager.delete_all()
@ -1409,7 +1584,8 @@ class OrderManager:
self.error_log = []
self.headers = {
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0"
}
def delete_all_orders(self):
@ -1443,24 +1619,14 @@ class SeoManager(OdsReader):
"""Récupère toutes les images en gérant la pagination"""
all_pages = []
dict_id_slug = {}
#while True:
response = requests.get(f"{self.page_api_url}?per_page=100",
headers={"Authorization": f"Basic {self.ath.auth_base64}"},
#headers={"Authorization": f"Basic {self.ath['auth_base64']}"},
verify=False
)
response = self.wcapi.get(f"seo", params={"per_page": 100})
if response.status_code != 200:
pass
list_pages = response.json()
#pprint.pprint(page)
#print(page[0]['_links'])
#print(page[0]['slug'])
print(f"count = {len(list_pages)}")
if not list_pages:
pass
#print('_______')
#pprint.pprint(page)
for index, page in enumerate(list_pages):
dict_id_slug[list_pages[index]['id']] = list_pages[index]['slug']
all_pages.append(dict_id_slug)
@ -1486,15 +1652,7 @@ class SeoManager(OdsReader):
#"_yoast_wpseo_opengraph-description": line["Description"]
}
}
response = requests.post(
f"{self.page_api_url}/{key_page}",
headers={
"Authorization": f"Basic {self.ath.auth_base64}",
"Content-Type": "application/json"
},
json=data,
verify=False
)
response = self.wcapi.post(f"seo/{key_page}")
""""meta": {
"_yoast_wpseo_title": line["Titre"],

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

BIN
requirements.txt Normal file

Binary file not shown.

View File

@ -2,6 +2,7 @@ from PIL import Image, ImageOps
import os
from pathlib import Path# main.py
import logging
import tempfile
logger = logging.getLogger(__name__)
logger.info("Logger from watermark")
@ -10,6 +11,7 @@ def create_watermark_image(image_path, filigrane_path="logo-lescreationsdemissbl
#image = Image.open(image_path).convert("RGBA")
image = ImageOps.exif_transpose(Image.open(image_path)).convert("RGBA")
filigrane = Image.open(filigrane_path).convert("RGBA")
temp_dir = tempfile.gettempdir()
# Resize the watermak (ex: 25% of widht from principal image)
"""ratio = 0.25
@ -30,16 +32,19 @@ def create_watermark_image(image_path, filigrane_path="logo-lescreationsdemissbl
# Paste watermark (with alpha mask)
image.paste(filigrane, (x, y), filigrane)
temp_image_path = os.path.join(temp_dir, image_path)
# Save the result
output_path = image_path.rsplit('.', 1)
output_path = f"{output_path[0]}-filigrane.jpg"
#output_path = image_path.rsplit('.', 1)
#output_path = f"{output_path[0]}.jpg"
output_path = os.path.basename(temp_image_path)
try:
if not os.path.exists(output_path):
image.convert("RGB").save(output_path, "JPEG")
except Exception as e:
logger.exception(f"🔥 Image avec filigrane existe déjà : {e} - {Path(output_path).name}")
print(f"outpath = {output_path}")
#print(f"outpath = {output_path}")
return output_path
#print(f"✅ Image enregistrée : {output_path}")

View File

@ -1,10 +1,10 @@
print(f"📦 Script lancé : __name__ = {__name__}")
import argparse
from woocommerce import API as WoocommerceApi
#from api_woocommerce import AuthentificationWpApi, MediaManager, CategoryManager, ProductManager, AttributeManager, VariationsManager, TabManager, WooCommerceManager
from api import WoocommerceApiClient
from api_woocommerce import AuthentificationWpApi, MediaManager, CategoryManager, ProductManager, AttributeManager, VariationsManager, TabManager, WooCommerceManager
import pprint
import base64
import sys
from base64 import b64encode
def import_medias_ods(args, media_manager):
@ -48,7 +48,14 @@ def import_products_ods(args, woocommerce_manager):
print(" --product activé, mais aucune plage spécifiée.")
def main():
#ath = AuthentificationWpApi()
ath = AuthentificationWpApi()
#ctx = ssl.create_default_context()
#ctx.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK')
#http = urllib3.PoolManager(ssl_context=ctx)
#credentials = b64encode(f"{args.wc_key}:{args.wc_secret}").decode("utf-8")
parser = argparse.ArgumentParser(prog='wcctl', description='WooCommerce CLI controller')
@ -65,7 +72,6 @@ def main():
# 📥 Commande : import-ods
import_parser = subparsers.add_parser('import-ods', help='Import ODS file data')
import_parser.add_argument('--ods-path', required=True, help='Path to the ODS file')
# media
@ -95,22 +101,81 @@ def main():
# delete all informations
import_parser.add_argument('--delete-all', action='store_true', help='Delete media, categories, products, attributes, tabs')
# 📥 Commande : import-ods
checkssl_parser = subparsers.add_parser('checkssl', help='Check ssl connectivity')
check_parser = subparsers.add_parser('check', help='Check connectivity')
# Analyse des arguments
args = parser.parse_args()
wcapi = WoocommerceApi(
"""wcapi = WoocommerceApi(
url=args.wc_url,
consumer_key=args.wc_key,
consumer_secret=args.wc_secret,
wp_api=True,
version="wc/v3",
verify_ssl=False, # Désactive la vérification SSL pour le développement
verify_ssl=False,
timeout=30
)
)"""
wcapi = WoocommerceApiClient(
url=args.wc_url,
wc_consumer_key=args.wc_key,
wc_consumer_secret=args.wc_secret,
wp_application_user="admin_lcdm",
wp_application_password= "Do4p tLYF 5uje PF2M 21Zo x3OR", #"eAB4 49W6 ZRBj zIZc fH62 tv5c", #"yTW8 Mc6J FUCN tPSq bnuJ 0Sdw", #
wp_api=True,
version="wc/v3",
verify_ssl=True,
timeout=30
)
if args.command == "check":
#wcctl-api-rw
#class WoocommerceApiClient(API) def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs):
wcapi = WoocommerceApiClient(
url=args.wc_url,
wc_consumer_key=args.wc_key,
wc_consumer_secret=args.wc_secret,
wp_application_user="admin_lcdm",
wp_application_password= "Do4p tLYF 5uje PF2M 21Zo x3OR",#"eAB4 49W6 ZRBj zIZc fH62 tv5c", #"yTW8 Mc6J FUCN tPSq bnuJ 0Sdw", #
wp_api=True,
version="wc/v3",
verify_ssl=True,
timeout=30
)
print("recupération d'un produit", end="")
response = wcapi.get("products", params={"per_page": 1, "page": 1})
print(response)
print("OK")
"""wcapi = WoocommerceApi(
url=args.wc_url,
consumer_key="wcctl-api-rw",
consumer_secret="NUrp R7tI oDKr FSqT hhf8 KxOT",
wp_api=True,
version="wp/v2",
verify_ssl=False,
timeout=30
)"""
print("recupération d'un media", end="")
response = wcapi.get("media", params={"per_page": 100, "page": 1})
print(response)
print("OK")
sys.exit(0)
if args.command == "checkssl":
for i in range(10):
response = wcapi.get("products", params={"per_page": 1, "page": 1})
print(i)
print(f'connection OK')
sys.exit(0)
ath = AuthentificationWpApi()
media_manager = MediaManager(ath, filename_ods=args.ods_path)
media_manager = MediaManager(ath, wcapi, filename_ods=args.ods_path)
category_manager = CategoryManager(wcapi, ath, filename_ods=args.ods_path)
product_manager = ProductManager(wcapi, ath, filename_ods=args.ods_path)
attribute_manager = AttributeManager(wcapi, filename_ods=args.ods_path)
@ -131,12 +196,12 @@ def main():
#print(f"🔍 args.media = {args.media}")
#print(f"🔍 args.media_range = {args.media_range}")
print(f"args = {args}")
if args.media:
import_medias_ods(args, media_manager)
if args.delete_all:
#woocommerce_manager.delete_all_informations()
media_manager.delete_all_images()
woocommerce_manager.delete_all_informations()
if args.category:
medias = media_manager.get_all_as_slug_dict()
@ -162,6 +227,10 @@ if __name__ == "__main__":
# ods_file = donnees_site_internet_missbleue_corrige.ods
#python wcctl.py --wc-url="https://lescreationsdemissbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_corrige.ods"
#python wcctl.py --wc-url="https://lescreationsdemissbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods"
#python wcctl.py --wc-url="https://les-creations-de-missbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_corrige.ods"
#python wcctl.py --wc-url="https://les-creations-de-missbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods"
# python wcctl.py --wc-url="https://les-creations-de-missbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods" --product --product-range="111:112"
#
# # python wcctl.py --wc-url="https://les-creations-de-missbleue.com" --wc-key="ck_4125db027f75040ce9c7ca1bbc95b7031d1a6263" --wc-secret="cs_50d0a1de003fa8f8d0deb5f427b7769071dd4bfd" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods"