diff --git a/api.py b/api.py new file mode 100644 index 0000000..062ce3f --- /dev/null +++ b/api.py @@ -0,0 +1,134 @@ +from woocommerce import API +import base64 +import requests +import ssl +from requests.adapters import HTTPAdapter + + +class WoocommerceApiClient(API): + def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs): + self.max_retry = 20 + self.url = url + super().__init__(url, wc_consumer_key, wc_consumer_secret, **kwargs) + auth_str = f"{wp_application_user}:{wp_application_password}" + auth_bytes = auth_str.encode("utf-8") + auth_base64 = base64.b64encode(auth_bytes).decode("utf-8") + self.headers = {"Authorization": f"Basic {auth_base64}", + "User-Agent": "Mozilla/5.0" + } + self.verify_ssl = verify_ssl + self.init_ssl() + + def _is_wc_api(self, endpoint): + return endpoint.startswith("media") + + def _is_wp_api(self, endpoint): + return endpoint.startswith("settings") + + def _get_requests_general_kwargs(self, endpoint): + return { + 'url': f'{self.url}/wp-json/wp/v2/{endpoint}', + 'headers': self.headers, + 'verify': self.verify_ssl, + } + + def init_ssl(self): + class SSLContextAdapter(HTTPAdapter): + def __init__(self, ssl_context=None, **kwargs): + self.ssl_context = ssl_context + super().__init__(**kwargs) + + def init_poolmanager(self, *args, **kwargs): + kwargs['ssl_context'] = self.ssl_context + super().init_poolmanager(*args, **kwargs) + + # Create your custom SSL context + context = ssl.create_default_context() + context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK') + context.minimum_version = ssl.TLSVersion.TLSv1_3 + context.maximum_version = ssl.TLSVersion.TLSv1_3 + + # Create a session and mount adapter + session = requests.Session() + adapter = SSLContextAdapter(ssl_context=context) + session.mount('https://', adapter) + + def get(self, endpoint, **kwargs): + retry = 0 + while retry <= self.max_retry: + try: + if self._is_wc_api(endpoint): + kwargs.update(self._get_requests_general_kwargs(endpoint)) + print(kwargs) # ✅ Montre tout le dict + print(kwargs["headers"]) # ✅ Si tu veux un champ en particulier + return requests.get(**kwargs) + else: + return super().get(endpoint, **kwargs) + except requests.exceptions.ConnectionError: + if retry < self.max_retry: + print(".", end="") + retry += 1 + else: raise + + + + def post(self, endpoint, data=None, files=None, **kwargs): + retry = 0 + response = None + while retry <= self.max_retry: + try: + if self._is_wc_api(endpoint): + kwargs.update(self._get_requests_general_kwargs(endpoint)) + if files: + kwargs['files'] = files + if data: + kwargs['json'] = data + print("kwargs envoyés à requests.post:", kwargs) + response = requests.post(**kwargs) + elif self._is_wp_api(endpoint): + kwargs.update(self._get_requests_general_kwargs(endpoint)) + kwargs['json'] = data + response = requests.post(**kwargs) + else: + response = super().post(endpoint, data, **kwargs) + if response and response.status_code not in (500, 400): + return response + except requests.exceptions.ConnectionError: + if retry < self.max_retry: + print(".", end="") + retry += 1 + else: raise + + def put(self, endpoint, data, file=None, **kwargs): + retry = 0 + while retry <= self.max_retry: + try: + if self._is_wc_api(endpoint): + kwargs.update(self._get_requests_general_kwargs(endpoint)) + if file: + kwargs['json'] = file + if data: + kwargs['json'] = data + return requests.put(**kwargs) + else: + return super().put(endpoint, data, **kwargs) + except requests.exceptions.ConnectionError: + if retry < self.max_retry: + print(".", end="") + retry += 1 + else: raise + + def delete(self, endpoint, **kwargs): + retry = 0 + while retry <= self.max_retry: + try: + if self._is_wc_api(endpoint): + return requests.delete(**self._get_requests_general_kwargs(endpoint), params={"force": True}) + else: + return super().delete(endpoint, **kwargs) + except requests.exceptions.ConnectionError: + if retry < self.max_retry: + print(".", end="") + retry += 1 + else: raise + \ No newline at end of file diff --git a/api_woocommerce.py b/api_woocommerce.py index 846f676..7b0f1ed 100644 --- a/api_woocommerce.py +++ b/api_woocommerce.py @@ -16,6 +16,10 @@ import argparse from logging.handlers import TimedRotatingFileHandler from watermark import create_watermark_image +import ssl +import urllib3 +from base64 import b64encode + # Créer un dossier 'logs' s'il n'existe pas log_directory = "logs" os.makedirs(log_directory, exist_ok=True) @@ -53,21 +57,16 @@ logger.critical("Erreur critique (CRITICAL)")""" # via consumer key and consumer secret : # https://lescreationsdemissbleue.local/wp-json/wc/v3/products?consumer_key=ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e&consumer_secret=cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768 -wcapi = WoocommerceApi( #url="https://lescreationsdemissbleue.local", - url="https://les-creations-de-missbleue.local", - consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e", - consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768", - wp_api=True, - version="wc/v3", - verify_ssl=False, # Désactive la vérification SSL pour le développement - timeout=30 -) + #url="https://les-creations-de-missbleue.local", + #consumer_key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e", + #consumer_secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768", -class AuthentificationWpApi: + +class AuthentificationWpApi(): # Identifiants WordPress (et non WooCommerce) wordpress_username = "admin_lcdm" # Remplace par ton username WordPress - wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs + wordpress_application_password = "yTW8 Mc6J FUCN tPSq bnuJ 0Sdw" #'W6Zt N5CU 2Gj6 TlKm clGn LvIz' #"#8io_mb!55@Bis" # Généré dans WordPress > Utilisateurs # Générer l'authentification Basic en base64 auth_str = f"{wordpress_username}:{wordpress_application_password}" @@ -76,12 +75,13 @@ class AuthentificationWpApi: ath = AuthentificationWpApi() -#WEBSITE_URL = "https://lescreationsdemissbleue.local" WEBSITE_URL = "https://les-creations-de-missbleue.local" +#WEBSITE_URL = "https://les-creations-de-missbleue.com" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\donnees_site_internet_missbleue_corrige.ods" -BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" +#BASE_PATH = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" +BASE_PATH = "C:\\Users\\beren\\Cloud\\beren\\site_missbleue\\photos\\photos_site\\Photos_site\\" #FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\infos_site.ods" -FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_corrige.ods" +FILENAME_ODS = "C:\\Users\\beren\\OneDrive\\Documents\\nextcloud\\beren\\site_missbleue\\api_woocommerce\\final_api_woocommerce\\donnees_site_internet_missbleue_version_finale.ods" @@ -226,11 +226,18 @@ class OdsReader: class MediaManager(OdsReader): - def __init__(self, ath, filename_ods):# filename_ods + def __init__(self, ath, wcapi, filename_ods):# filename_ods super().__init__(filename_ods) # filename_ods self.ath = ath - self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" - self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings" + self.wcapi = wcapi + #self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" + #self.media_api_settings = f"{WEBSITE_URL}/wp-json/wp/v2/settings" + self.dict_equivalence = {'ambre':'ambré', 'meringue':'meringué', 'givree':'givrée', 'sale':'salé', 'bresilien':'brésilien', 'epices':'épices', 'noel':'noël', 'a':'à', 'petales':'pétales', + 'lumiere':'lumière', 'allumee':'allumée', 'eteinte':'éteinte', 'celebration':'célébration', 'argente':'argenté', 'dore':'doré', 'accroche':'accroché', 'pose':'posé', 'colore':'coloré', + 'kevin': 'Kévin', 'interieur':'intérieur', 'cafe':'café', 'bresil':'Brésil', 'dagrumes': "d'agrumes", "iles":"îles", 'apero': 'apéro', 'quebecois':'québecois', 'defendu':'défendu', + 'tiare':'tiaré', 'mure':'mûre', 'allergenes':'allergènes', 'parfume':'parfumé' + } + def upload_media(self, search_value=None): if search_value: @@ -247,22 +254,13 @@ class MediaManager(OdsReader): # 👇 Tentative d'ouverture et d'envoi with open(image_path, "rb") as image_file: - response = requests.post( - self.media_api_url, - headers={ - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Disposition": f"attachment; filename={image_name}" - }, - files={"file": image_file}, - verify=False - ) - + response = self.wcapi.post("media",files={"file": image_file}) if response.status_code == 201: media_data = response.json() self.update_data_media(media, media_data['id']) logger.info(f"✅ Image uploadée : {image_name}") else: - logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.media_api_url}") + logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}") else: logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}") @@ -283,24 +281,32 @@ class MediaManager(OdsReader): image_path = path else: image_path = BASE_PATH + media['Chemin'] - print(f"image_path = {image_path}") + #print(f"image_path = {image_path}") # 👇 Tentative d'ouverture et d'envoi with open(image_path, "rb") as image_file: - response = requests.post( - self.media_api_url, + response = self.wcapi.post("media",files={"file": image_file}) + print("kkkkkkkkkkkkkkkkkkkkkkkkkk") + """ response = self.wcapi.post( + "media", headers={ "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Disposition": f"attachment; filename={image_name}" + "Content-Disposition": f"attachment; filename={image_name}", + "User-Agent": "Mozilla/5.0" }, files={"file": image_file}, verify=False - ) + )""" + print(f"response = {response.status_code}") if response.status_code == 201: + print('____') + pprint.pprint(media) media_data = response.json() self.update_data_media(media, media_data['id']) logger.info(f"✅ Image uploadée : {image_name}") else: - logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.media_api_url}") + logger.error(f"❌ Échec de l'upload ({response.status_code}) pour : {image_name} - URL: {self.wcapi.url}") + else: + logger.info(f"↪️ Image déjà existante (non uploadée) : {image_name}") except FileNotFoundError: logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})") @@ -315,17 +321,26 @@ class MediaManager(OdsReader): json_data = self.fetch_all_media_rows(range_start, range_end) for media in json_data: + pprint.pprint(media) path = Path(BASE_PATH + media['Chemin']) image_name = path.name first_folder = media['Chemin'].split("\\")[0] print(f"first_folder = {first_folder}") watermarked_path = Path(create_watermark_image(str(path))) watermarked_name = watermarked_path.name + print('logo') if first_folder == 'Logo': + print("first_file") self.create_and_update_media(media,image_name,path) else: + print("pas logo") self.create_and_update_media(media, watermarked_name, watermarked_path, True) - + try: + os.remove(watermarked_path) + except FileNotFoundError: + logger.exception(f"🚫 Fichier introuvable : {image_name} ({path})") + + def is_exists(self, media, image_name): all_images = self.get_all_images() @@ -336,28 +351,62 @@ class MediaManager(OdsReader): else: pass return False + + def get_title_and_alt_text_media(self, media): + sentence = media['Slug'].replace('-', ' ') + sentence = sentence.replace('img', '').strip() + print(f"type = {type(sentence)}") + title = sentence.capitalize() + alt_text = title + return title, alt_text + + + def update_accent_in_sentence(self, sentence): + print(type(sentence)) + print(sentence) + words = sentence.split() + new_words = [self.dict_equivalence[word.lower()] if word.lower() in self.dict_equivalence else word for word in words] + pprint.pprint(new_words) + new_sentence = " ".join(new_words) + print(f"new_sentence = {new_sentence}") + return new_sentence + def update_data_media(self, media, id_img): + print(f"nom = {media['Nom']}") + print(type(media['Nom'])) + if media['Nom'] is None or media['Description'] is None: + title, alt_text = self.get_title_and_alt_text_media(media) + else: + title = media['Nom'] + alt_text = media['Description'] + + title = self.update_accent_in_sentence(title) + alt_text = self.update_accent_in_sentence(alt_text) + print(f"title = {title}") + print(f"alt_txt = {alt_text}") + update_data = { - "title" : media['Nom'], - "alt_text": media['Description'], + "title" : title, + "alt_text": alt_text, "slug": media['Slug'], } path = Path(BASE_PATH + media['Chemin']) image_name = path.name - - response = requests.post( - f"{self.media_api_url}/{id_img}", + response = self.wcapi.post(f"media/{id_img}", data=update_data) + """response = self.wcapi.post( + f"media/{id_img}", headers={ "Authorization": f"Basic {self.ath.auth_base64}", #"Authorization": f"Basic {self.ath['auth_base64']}", - "Content-Disposition": f"attachment; filename={image_name}" + "Content-Disposition": f"attachment; filename={image_name}", + "User-Agent": "Mozilla/5.0" }, json=update_data, verify=False - ) + )""" if response.status_code == 200: return response.json() @@ -381,60 +430,55 @@ class MediaManager(OdsReader): images = self.get_all_images() for img in images: if img['slug'] == slug: - delete_url = f"{self.media_api_url}/{img['id']}?force=true" - response = requests.delete(delete_url, - #headers={"Authorization": f"Basic {self.ath['auth_base64']}"}, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False) + self.wcapi.delete(f"media/{img['id']}", params = {"force": True}) def get_all_images(self): """Récupère toutes les images en gérant la pagination""" all_images = [] page = 1 + print(f"self.ath.auth_base64 = {self.ath.auth_base64}") while True: - response = requests.get(f"{self.media_api_url}?per_page=100&page={page}", - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - #headers={"Authorization": f"Basic {self.ath['auth_base64']}"}, + """response = self.wcapi.get(f"wp-json/wp/v2/media?per_page=100&page={page}", + headers={"Authorization": f"Basic {self.ath.auth_base64}", + "User-Agent": "Mozilla/5.0"}, verify=False - ) + )""" + response = self.wcapi.get("media", params={"per_page": 100, "page": page}) if response.status_code != 200: break images = response.json() if not images: break - all_images.extend(images) - page += 1 - + page += 1 return all_images def delete_images(self, images): """Supprime toutes les images récupérées""" for img in images: img_id = img['id'] - delete_url = f"{self.media_api_url}/{img_id}?force=true" + + response = self.wcapi.delete(f"media/{img_id}") + """delete_url = f"media/{img_id}?force=true" response = requests.delete(delete_url, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, + headers={"Authorization": f"Basic {self.ath.auth_base64}", + "User-Agent": "Mozilla/5.0"}, #{"Authorization": f"Basic {self.ath['auth_base64']}"}, - verify=False) + verify=False)""" if response.status_code in [200, 410]: # 410 = déjà supprimé print(f"Image {img_id} supprimée.") else: print(f"Erreur suppression {img_id} :", response.status_code, response.text) def delete_all_images(self): - print('iciiiii') images = self.get_all_images() for img in images: img_id = img['id'] - delete_url = f"{self.media_api_url}/{img_id}?force=true" + print(f"img_id = {img['id']}") + response = self.wcapi.delete(f"media/{img_id}") - response = requests.delete(delete_url, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - #"Authorization": f"Basic {self.ath['auth_base64']}"}, - verify=False) if response.status_code in [200, 410]: # 410 = déjà supprimé print(f"Image {img_id} supprimée.") else: @@ -443,17 +487,12 @@ class MediaManager(OdsReader): def assign_image_logo(self): images = self.get_all_images() for img in images: - if img['slug'] == "logo-lescreationsdemissbleue": + if img['slug'] == "img-logo-lescreationsdemissbleue": data = { "site_logo":img['id'], "site_icon" : img['id'] } - response = requests.post( - self.media_api_settings, - json=data, - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False - ) + response = self.wcapi.post("settings", data=data) if response.status_code == 200: print("Logo mis à jour avec succès !") @@ -525,10 +564,7 @@ class CategoryManager(OdsReader): return id def update_media_id_for_category(self, media_id, cat_id): - response = requests.get(f"{self.media_api_url}/{media_id}", - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - verify=False - ) + response = self.wcapi.get(f"media/{media_id}", params={"per_page": 1, "page": 1}) update_category_data = { "image" : {'id':media_id}, } @@ -553,11 +589,7 @@ class CategoryManager(OdsReader): def delete_media_category(self, media_slug): media_id = self.find_media_id_by_slug(media_slug) - requests.delete( - f"{self.media_api_url}/{media_id['id']}", - headers=self.headers, - verify=False - ) + self.wcapi.delete(f"media/{media_id}") def delete_category_by_id(self, category_id): self.wcapi.delete(f"products/categories/{category_id}", params={"force": True}) @@ -579,7 +611,8 @@ class ProductManager(OdsReader): self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Type": "application/json" + "Content-Type": "application/json", + "User-Agent": "Mozilla/5.0" } self.media_api_url = f"{WEBSITE_URL}/wp-json/wp/v2/media" @@ -633,7 +666,7 @@ class ProductManager(OdsReader): list_product_tabs_data = [] x = 1 for key in product.keys(): - if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description" or key == "Allergènes": + if key == "Conseils d’utilisation" or key == "Précautions articles" or key == "Description": #or key == "Allergènes": product_tabs_data['title'] = key product_tabs_data['content'] = product[key] product_tabs_data['nickname'] = '' @@ -724,15 +757,12 @@ class ProductManager(OdsReader): products = self.get_all_products() if products: for pro in products: + print(f"pro['id'] = {pro['id']}") self.wcapi.delete(f"products/{pro['id']}", params={"force": True}) def delete_media_product(self, media_slug): media_id = self.find_media_id_by_slug(media_slug) - requests.delete( - f"{self.media_api_url}/{media_id['id']}", - headers=self.headers, - verify=False - ) + self.wcapi.delete(f"media/{media_id['id']}") def delete_product_by_id(self, product_id): self.wcapi.delete(f"products/{product_id}", params={"force": True}) @@ -780,7 +810,8 @@ class AttributeManager(OdsReader): list_name_data = [] json_data = self.get_all_attribute_and_tab_lines() for item in json_data: - if item['Onglet'].strip() == "Informations Complémentaires": + #if item['Onglet'].strip() == "Informations Complémentaires": + if item['Onglet'] == "Informations Complémentaires": list_name_data.append(item['Nom']) return list_name_data @@ -790,7 +821,8 @@ class AttributeManager(OdsReader): else: features_json_data = self.get_all_attribute_and_tab_lines() for item in features_json_data: - if item['Onglet'].strip() == "Informations Complémentaires": + #if item['Onglet'].strip() == "Informations Complémentaires": + if item['Onglet'] == "Informations Complémentaires": attribute_data = { 'name' : item["Nom"] } @@ -804,7 +836,8 @@ class AttributeManager(OdsReader): term_json_data = self.get_all_attribute_and_tab_lines() for item in term_json_data: list_item = [] - if item['Onglet'].strip() == "Informations Complémentaires": + #if item['Onglet'].strip() == "Informations Complémentaires": + if item['Onglet'] == "Informations Complémentaires": if "," in item["Valeurs"]: list_item = [value_term.strip() for value_term in item['Valeurs'].split(",")] else: @@ -838,6 +871,8 @@ class AttributeManager(OdsReader): def create_for_product(self, product_id, name, value, variation=False): + print(f"variation = {variation}") + print(f"value = {value}") data_attribute = { 'name': name, 'options':value @@ -858,6 +893,8 @@ class AttributeManager(OdsReader): if already_exist == False: found = False + print(f"attributes_data = {existing_attributes_data}") + print(f"data_attribute = {data_attribute}") for attribute in existing_attributes_data: if attribute["name"] == name: attribute["options"].append(data_attribute) @@ -866,9 +903,14 @@ class AttributeManager(OdsReader): # Si l'onglet `wb_custom_tabs` n'existe pas, on le crée if not found: + print(f"value = {value}") + if value is not None: + value = [v.strip() for v in value.split(",")] + print(f"value = {value}") + existing_attributes_data.append({ "name": name, - "options": [value], + "options": value, "visible":True, "variation": variation, #"parent_id":product_id @@ -1086,14 +1128,95 @@ class VariationsManager(OdsReader): print(update_res.json()) def create_variations_products(self, product_id, product_data): - #products_lines = self.get_all_product_lines() + products_lines = self.get_all_product_lines() product_line = self.get_product_by_slug_from_ods(product_data['slug']) - for product_line_key, products_line_value in product_line.items(): - if product_line_key == "Parfums": + parfums = None + volumes = None + price_per_product_variable = None + if product_line['Type'] == "Variable": + if product_line['Choix parfums'] is not None: + parfums = [p.strip() for p in product_line['Choix parfums'].split(",")] + print(f"parfums = {parfums}") + if product_line['Volume'] is not None: + volumes = [v.strip() for v in product_line['Volume'].split(",")] + print(f"volumes = {volumes}") + + """if product_line['Prix pour'] is not None: + #products = [v.strip() for v in product_line['Prix pour'].split(",")] + products = product_line['Prix pour'].split(",") + price_per_product_variable = {} + for p in products: + name, price = p.split("=") + price_per_product_variable[name.strip()] = price.strip() + + pprint.pprint(price_per_product_variable)""" + + response = self.wcapi.get(f"products/{product_id}") + print(f"product_id = {product_id}") + print(f"response = {response.status_code}") + try: + if response.status_code == 200: + #existing_product = response.json() + #self.update_product_attributes_merged(self.wcapi, product_id=product_id, attribute_name="Parfums", new_options=parfums) + if parfums is not None: + for parfum in parfums: + data = { + 'attributes': [ + { + 'name': 'Choix parfums', + 'option': parfum + } + ], + 'manage_stock': False, + 'in_stock':True, + 'regular_price': product_data['price'], + } + print(f"Posting variation: {data}") + response = self.wcapi.post(f"products/{product_id}/variations", data) + print(response.status_code) + print(response.json()) + logger.info(f"Variation de parfums a bien été créé") + if volumes is not None: + for volume in volumes: + data = { + 'attributes': [ + { + 'name': 'Volume', + 'option': volume + } + ], + 'manage_stock': False, + 'in_stock':True, + 'regular_price': product_data['price'], + } + print(f"Posting variation: {data}") + result = self.wcapi.post(f"products/{product_id}/variations", data) + logger.info(f"Variation de volumes a bien été créé") + """if price_per_product_variable is not None: + for name, price in price_per_product_variable.items(): + data = { + 'attributes': [ + { + 'name': 'Volume', + 'option': name + } + ], + 'manage_stock': False, + 'in_stock':True, + 'regular_price': price, + } + result = self.wcapi.post(f"products/{product_id}/variations", data) + logger.info(f"Variation de prix selon objet bien créé")""" + except Exception as e: + logger.exception(f"Erreur lors de la création du produit de variation : {e}") + #logger.error(f"Erreur lors de la création de la catégorie. Code: {response.status_code}, Message: {response.text}") + """ + for product_line_key, products_line_value in product_line.items(): + if product_line_key == "Choix parfums": name_attribute = product_line_key parfums = products_line_value if product_line_key == "Type": - if product_data['type'] == "variable": + if product_data['type'] == "Variable": response = self.wcapi.get(f"products/{product_id}") if response.status_code == 200: existing_product = response.json() @@ -1121,7 +1244,7 @@ class VariationsManager(OdsReader): print(result.status_code) pprint.pprint(result.json()) else: - return False + return False""" class WooCommerceManager(OdsReader): def __init__(self, wcapi, media_manager, category_manager, product_manager, tab_manager, attribute_manager, variation_manager, filename_ods): @@ -1195,8 +1318,8 @@ class WooCommerceManager(OdsReader): self.update_product_tab() self.update_product_attribute()""" - def update_product_variation(self, product_id, product_data): - pass + """def update_product_variation(self, product_id, product_data): + pass""" def update_product_by_slug(self, slug): self.product_manager.update_data_product_by_slug(slug) @@ -1222,18 +1345,53 @@ class WooCommerceManager(OdsReader): category_list_by_doc = [cat.strip().replace('"', '') for cat in category.split("/")] return category_list_by_doc - def get_list_media_id_for_product(self, media): - list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")] + def get_list_variable_attributes(self,attributes): + list_variable_attributes_by_doc = [attr.strip() for attr in attributes.split(",")] + return list_variable_attributes_by_doc + + def get_list_media_id_for_product(self, product): + #list_media_by_doc = [img.strip().replace(' ', '') for img in media.split(",")] + list_media_by_doc = [] + list_media_by_doc.append(product['Image1']) + list_media_by_doc.append(product['Image2']) + list_media_by_doc.append(product['Image3']) + list_media_by_doc.append(product['Pyramide']) return list_media_by_doc - def is_variable(self, type): + """def is_variable(self, type): + #print(f"type.lower = { type.lower}") + #print(f"type = {type}") return type.lower() == "parfums" + #if type.lower() == 'Choix parfums' or type.lower() == 'Volume' or type.lower() == 'Prix pour': + # return True""" + """def is_variable(self, type): + return type.lower() == "parfums""" + + """def is_variable(self, product_data): + if product_data['Choix parfums'] is not None or product_data['Volume'] is not None or product_data['Prix pour'] is not None: + return True""" + + def is_variable(self, name, value): + """print(f"typeee = {type}") + if type == 'variable': + return True""" + if name == "Volume" or name =="Choix parfums": + if value is not None: + return True + else: + return False + + def update_product_attribute(self, attributes, product_data): + print(f"product_data = {product_data}") + product_id = self.product_manager.find_id_by_slug(product_data['slug']) for name, value in attributes.items(): - self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data['type'])) - + print(f"self.is_variable(product_data['type'])) = {self.is_variable(name, value)}") + self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(name, value)) + #self.attribute_manager.create_for_product(product_id=product_id, name=name, value=value, variation=self.is_variable(product_data)) + def update_product_variations(self, product_data): product_id = self.product_manager.find_id_by_slug(product_data['slug']) self.variation_manager.create_variations_products(product_id, product_data) @@ -1293,8 +1451,14 @@ class WooCommerceManager(OdsReader): 'short_description': product_line['Courte Description'], 'slug':product_line['Slug'] } + if product_line['Promo'] is not None: + product_data['sale_price'] = product_line['Promo'] + if product_line['Type'] == "parfums": product_data['type'] = "variable" + if product_line['Volume'] is not None: + values_attributes = self.get_list_variable_attributes(product_line['Volume']) + attributes['Volume'] = values_attributes else: product_data['type'] = "simple" @@ -1305,7 +1469,8 @@ class WooCommerceManager(OdsReader): "Fabrication" : product_line['Fabrication'], "Composition" : product_line['Composition'], "Ingrédients et engagements" : product_line['Ingrédients et engagements'], - "Parfums" : product_line['Parfums'] + "Parfums" : product_line['Parfums'], + "Volume" : product_line["Volume"] } tabs ={ @@ -1336,6 +1501,7 @@ class WooCommerceManager(OdsReader): if self.product_manager.find_id_by_slug(product_line['Slug']): logger.debug(f"Produit contenant comme slug '{product_line['Slug']}' existe déjà") else: + print(f"process_file_from_to {product_line['Nom']}") # standard product data product_data = { 'name' : product_line['Nom'], @@ -1349,10 +1515,6 @@ class WooCommerceManager(OdsReader): 'short_description': product_line['Courte Description'], 'slug':product_line['Slug'] } - if product_line['Type'] == "parfums": - product_data['type'] = "variable" - else: - product_data['type'] = "simple" attributes = { "Temps de combustion" : product_line['Temps de combustion'], @@ -1361,9 +1523,33 @@ class WooCommerceManager(OdsReader): "Fabrication" : product_line['Fabrication'], "Composition" : product_line['Composition'], "Ingrédients et engagements" : product_line['Ingrédients et engagements'], - "Parfums" : product_line['Parfums'] + #"Parfums" : product_line['Choix parfums'] + #"Volume" : product_line["Volume"] } - + + if product_line['Promo'] is not None: + product_data['sale_price'] = product_line['Promo'] + + if product_line['Type'] == "Variable": + product_data['type'] = "variable" + if product_line['Choix parfums'] is not None: + values_attributes = self.get_list_variable_attributes(product_line['Choix parfums']) + print(f"values_attributes = {values_attributes}") + attributes['Choix parfums'] = values_attributes + if product_line['Volume'] is not None: + values_attributes = self.get_list_variable_attributes(product_line['Volume']) + print(f"values_attributes = {values_attributes}") + attributes['Volume'] = product_line['Volume'] + """if product_line['Prix pour'] is not None: + values_attributes = self.get_list_variable_attributes(product_line['Prix pour']) + print(f"values_attributes = {values_attributes}") + attributes['Prix pour'] = values_attributes""" + else: + product_data['type'] = "simple" + + print('attributes') + pprint.pprint(attributes) + tabs ={ #"Description" : product_line["Description"], "Conseils d'utilisation" : product_line["Conseils d’utilisation"], @@ -1374,22 +1560,11 @@ class WooCommerceManager(OdsReader): categories = self.get_list_category_for_product(product_line['Catégories']) # ... associated medias - print(f"product_line['Media Slugs'] = {product_line['Media Slugs']}") - medias = self.get_list_media_id_for_product(product_line['Media Slugs']) + medias = self.get_list_media_id_for_product(product_line) # create or update product self.create_or_update_product(product_data=product_data, attributes=attributes, tabs=tabs, categories=categories, medias=medias, json_data=product_line) - - """def put_social_data(self): - response = requests.post(url, - auth=HTTPBasicAuth("consumer_key", "consumer_secret"), - json={ - "acf": { - "instagram_url": "https://instagram.com/ton_compte" - } - } - )""" def delete_all_informations(self): self.media_manager.delete_all_images() @@ -1409,7 +1584,8 @@ class OrderManager: self.error_log = [] self.headers = { "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Type": "application/json" + "Content-Type": "application/json", + "User-Agent": "Mozilla/5.0" } def delete_all_orders(self): @@ -1443,24 +1619,14 @@ class SeoManager(OdsReader): """Récupère toutes les images en gérant la pagination""" all_pages = [] dict_id_slug = {} - #while True: - response = requests.get(f"{self.page_api_url}?per_page=100", - headers={"Authorization": f"Basic {self.ath.auth_base64}"}, - #headers={"Authorization": f"Basic {self.ath['auth_base64']}"}, - verify=False - ) + response = self.wcapi.get(f"seo", params={"per_page": 100}) if response.status_code != 200: pass list_pages = response.json() - #pprint.pprint(page) - #print(page[0]['_links']) - #print(page[0]['slug']) - print(f"count = {len(list_pages)}") if not list_pages: pass - #print('_______') - #pprint.pprint(page) + for index, page in enumerate(list_pages): dict_id_slug[list_pages[index]['id']] = list_pages[index]['slug'] all_pages.append(dict_id_slug) @@ -1486,16 +1652,8 @@ class SeoManager(OdsReader): #"_yoast_wpseo_opengraph-description": line["Description"] } } - response = requests.post( - f"{self.page_api_url}/{key_page}", - headers={ - "Authorization": f"Basic {self.ath.auth_base64}", - "Content-Type": "application/json" - }, - json=data, - verify=False - ) - + response = self.wcapi.post(f"seo/{key_page}") + """"meta": { "_yoast_wpseo_title": line["Titre"], "_yoast_wpseo_metadesc": line["Description"], diff --git a/donnees_site_internet_missbleue_version_finale.ods b/donnees_site_internet_missbleue_version_finale.ods new file mode 100644 index 0000000..98a8509 Binary files /dev/null and b/donnees_site_internet_missbleue_version_finale.ods differ diff --git a/img-logo-lescreationsdemissbleue-fond-transparent.png b/img-logo-lescreationsdemissbleue-fond-transparent.png new file mode 100644 index 0000000..079c195 Binary files /dev/null and b/img-logo-lescreationsdemissbleue-fond-transparent.png differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..573fd8e Binary files /dev/null and b/requirements.txt differ diff --git a/watermark.py b/watermark.py index 41fa2d6..da774ae 100644 --- a/watermark.py +++ b/watermark.py @@ -2,6 +2,7 @@ from PIL import Image, ImageOps import os from pathlib import Path# main.py import logging +import tempfile logger = logging.getLogger(__name__) logger.info("Logger from watermark") @@ -10,6 +11,7 @@ def create_watermark_image(image_path, filigrane_path="logo-lescreationsdemissbl #image = Image.open(image_path).convert("RGBA") image = ImageOps.exif_transpose(Image.open(image_path)).convert("RGBA") filigrane = Image.open(filigrane_path).convert("RGBA") + temp_dir = tempfile.gettempdir() # Resize the watermak (ex: 25% of widht from principal image) """ratio = 0.25 @@ -30,16 +32,19 @@ def create_watermark_image(image_path, filigrane_path="logo-lescreationsdemissbl # Paste watermark (with alpha mask) image.paste(filigrane, (x, y), filigrane) + temp_image_path = os.path.join(temp_dir, image_path) + # Save the result - output_path = image_path.rsplit('.', 1) - output_path = f"{output_path[0]}-filigrane.jpg" + #output_path = image_path.rsplit('.', 1) + #output_path = f"{output_path[0]}.jpg" + output_path = os.path.basename(temp_image_path) try: if not os.path.exists(output_path): image.convert("RGB").save(output_path, "JPEG") except Exception as e: logger.exception(f"🔥 Image avec filigrane existe déjà : {e} - {Path(output_path).name}") - print(f"outpath = {output_path}") + #print(f"outpath = {output_path}") return output_path #print(f"✅ Image enregistrée : {output_path}") diff --git a/wcctl.py b/wcctl.py index 5e35b66..4df38a4 100644 --- a/wcctl.py +++ b/wcctl.py @@ -1,10 +1,10 @@ print(f"📦 Script lancé : __name__ = {__name__}") import argparse from woocommerce import API as WoocommerceApi -#from api_woocommerce import AuthentificationWpApi, MediaManager, CategoryManager, ProductManager, AttributeManager, VariationsManager, TabManager, WooCommerceManager +from api import WoocommerceApiClient from api_woocommerce import AuthentificationWpApi, MediaManager, CategoryManager, ProductManager, AttributeManager, VariationsManager, TabManager, WooCommerceManager -import pprint -import base64 +import sys +from base64 import b64encode def import_medias_ods(args, media_manager): @@ -48,7 +48,14 @@ def import_products_ods(args, woocommerce_manager): print("ℹ️ --product activé, mais aucune plage spécifiée.") def main(): - #ath = AuthentificationWpApi() + ath = AuthentificationWpApi() + + + #ctx = ssl.create_default_context() + #ctx.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK') + #http = urllib3.PoolManager(ssl_context=ctx) + #credentials = b64encode(f"{args.wc_key}:{args.wc_secret}").decode("utf-8") + parser = argparse.ArgumentParser(prog='wcctl', description='WooCommerce CLI controller') @@ -65,7 +72,6 @@ def main(): # 📥 Commande : import-ods import_parser = subparsers.add_parser('import-ods', help='Import ODS file data') - import_parser.add_argument('--ods-path', required=True, help='Path to the ODS file') # media @@ -95,22 +101,81 @@ def main(): # delete all informations import_parser.add_argument('--delete-all', action='store_true', help='Delete media, categories, products, attributes, tabs') + # 📥 Commande : import-ods + checkssl_parser = subparsers.add_parser('checkssl', help='Check ssl connectivity') + check_parser = subparsers.add_parser('check', help='Check connectivity') + # Analyse des arguments args = parser.parse_args() - wcapi = WoocommerceApi( + """wcapi = WoocommerceApi( url=args.wc_url, consumer_key=args.wc_key, consumer_secret=args.wc_secret, wp_api=True, version="wc/v3", - verify_ssl=False, # Désactive la vérification SSL pour le développement + verify_ssl=False, timeout=30 -) +)""" + + wcapi = WoocommerceApiClient( + url=args.wc_url, + wc_consumer_key=args.wc_key, + wc_consumer_secret=args.wc_secret, + wp_application_user="admin_lcdm", + wp_application_password= "Do4p tLYF 5uje PF2M 21Zo x3OR", #"eAB4 49W6 ZRBj zIZc fH62 tv5c", #"yTW8 Mc6J FUCN tPSq bnuJ 0Sdw", # + wp_api=True, + version="wc/v3", + verify_ssl=True, + timeout=30 + ) + + if args.command == "check": + #wcctl-api-rw + #class WoocommerceApiClient(API) def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs): + wcapi = WoocommerceApiClient( + url=args.wc_url, + wc_consumer_key=args.wc_key, + wc_consumer_secret=args.wc_secret, + wp_application_user="admin_lcdm", + wp_application_password= "Do4p tLYF 5uje PF2M 21Zo x3OR",#"eAB4 49W6 ZRBj zIZc fH62 tv5c", #"yTW8 Mc6J FUCN tPSq bnuJ 0Sdw", # + wp_api=True, + version="wc/v3", + verify_ssl=True, + timeout=30 + ) + print("recupération d'un produit", end="") + response = wcapi.get("products", params={"per_page": 1, "page": 1}) + print(response) + print("OK") + + """wcapi = WoocommerceApi( + url=args.wc_url, + consumer_key="wcctl-api-rw", + consumer_secret="NUrp R7tI oDKr FSqT hhf8 KxOT", + wp_api=True, + version="wp/v2", + verify_ssl=False, + timeout=30 + )""" + print("recupération d'un media", end="") + response = wcapi.get("media", params={"per_page": 100, "page": 1}) + print(response) + print("OK") + sys.exit(0) + + if args.command == "checkssl": + for i in range(10): + response = wcapi.get("products", params={"per_page": 1, "page": 1}) + print(i) + print(f'connection OK') + sys.exit(0) + + ath = AuthentificationWpApi() - media_manager = MediaManager(ath, filename_ods=args.ods_path) + media_manager = MediaManager(ath, wcapi, filename_ods=args.ods_path) category_manager = CategoryManager(wcapi, ath, filename_ods=args.ods_path) product_manager = ProductManager(wcapi, ath, filename_ods=args.ods_path) attribute_manager = AttributeManager(wcapi, filename_ods=args.ods_path) @@ -131,12 +196,12 @@ def main(): #print(f"🔍 args.media = {args.media}") #print(f"🔍 args.media_range = {args.media_range}") print(f"args = {args}") + if args.media: import_medias_ods(args, media_manager) if args.delete_all: - #woocommerce_manager.delete_all_informations() - media_manager.delete_all_images() + woocommerce_manager.delete_all_informations() if args.category: medias = media_manager.get_all_as_slug_dict() @@ -162,6 +227,10 @@ if __name__ == "__main__": # ods_file = donnees_site_internet_missbleue_corrige.ods -#python wcctl.py --wc-url="https://lescreationsdemissbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_corrige.ods" +#python wcctl.py --wc-url="https://lescreationsdemissbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods" -#python wcctl.py --wc-url="https://les-creations-de-missbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_corrige.ods" \ No newline at end of file +#python wcctl.py --wc-url="https://les-creations-de-missbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods" + +# python wcctl.py --wc-url="https://les-creations-de-missbleue.local" --wc-key="ck_604e9b7b5d290cce72346efade6b31cb9a1ff28e" --wc-secret="cs_563974c7e59532c1ae1d0f8bbf61f0500d6bc768" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods" --product --product-range="111:112" +# +# # python wcctl.py --wc-url="https://les-creations-de-missbleue.com" --wc-key="ck_4125db027f75040ce9c7ca1bbc95b7031d1a6263" --wc-secret="cs_50d0a1de003fa8f8d0deb5f427b7769071dd4bfd" import-ods --ods-path="donnees_site_internet_missbleue_version_finale.ods" \ No newline at end of file