from woocommerce import API import base64 import requests import ssl from requests.adapters import HTTPAdapter import logging import os from logging.handlers import TimedRotatingFileHandler log_directory = "logs" # 🔧 Configuration du handler avec rotation quotidienne log_file = os.path.join(log_directory, "woocommerce.log") handler = TimedRotatingFileHandler( filename=log_file, when="midnight", # ⏰ Rotation tous les jours à minuit interval=1, # 📅 Chaque 1 jour backupCount=7, # ♻️ Garde les 7 derniers fichiers de log encoding='utf-8' # 🧾 Pour supporter tous les caractères ) # 📋 Format du log formatter = logging.Formatter( fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) handler.setFormatter(formatter) # 🔌 Récupère le logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # 👁 Niveau minimum à capturer logger.addHandler(handler) class WoocommerceApiClient(API): def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs): self.max_retry = 20 self.url = url super().__init__(url, wc_consumer_key, wc_consumer_secret, **kwargs) auth_str = f"{wp_application_user}:{wp_application_password}" auth_bytes = auth_str.encode("utf-8") auth_base64 = base64.b64encode(auth_bytes).decode("utf-8") self.headers = {"Authorization": f"Basic {auth_base64}", "User-Agent": "Mozilla/5.0" } self.verify_ssl = verify_ssl self.init_ssl() def _is_wc_api(self, endpoint): return endpoint.startswith("media") def _is_wp_api(self, endpoint): return endpoint.startswith("settings") def _get_requests_general_kwargs(self, endpoint): return { 'url': f'{self.url}/wp-json/wp/v2/{endpoint}', 'headers': self.headers, 'verify': self.verify_ssl, } def init_ssl(self): class SSLContextAdapter(HTTPAdapter): def __init__(self, ssl_context=None, **kwargs): self.ssl_context = ssl_context super().__init__(**kwargs) def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context super().init_poolmanager(*args, **kwargs) # Create your custom SSL context context = ssl.create_default_context() context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK') context.minimum_version = ssl.TLSVersion.TLSv1_3 context.maximum_version = ssl.TLSVersion.TLSv1_3 # Create a session and mount adapter session = requests.Session() adapter = SSLContextAdapter(ssl_context=context) session.mount('https://', adapter) def get(self, endpoint, **kwargs): retry = 0 while retry <= self.max_retry: try: if self._is_wc_api(endpoint): kwargs.update(self._get_requests_general_kwargs(endpoint)) #print(kwargs) # ✅ Montre tout le dict #print(kwargs["headers"]) # ✅ Si tu veux un champ en particulier return requests.get(**kwargs) else: return super().get(endpoint, **kwargs) except requests.exceptions.ConnectionError: if retry < self.max_retry: print(".", end="") retry += 1 else: raise def post(self, endpoint, data=None, files=None, **kwargs): retry = 0 response = None while retry <= self.max_retry: try: if self._is_wc_api(endpoint): kwargs.update(self._get_requests_general_kwargs(endpoint)) if files: kwargs['files'] = files if data: kwargs['json'] = data #print("kwargs envoyés à requests.post:", kwargs) response = requests.post(**kwargs) elif self._is_wp_api(endpoint): kwargs.update(self._get_requests_general_kwargs(endpoint)) kwargs['json'] = data response = requests.post(**kwargs) else: response = super().post(endpoint, data, **kwargs) if response.status_code not in (500, ) or retry >= self.max_retry: return response else: retry += 1 logger.debug(f"status_code={response.status_code} - retry #{retry}") except requests.exceptions.ConnectionError: if retry < self.max_retry: logger.debug(f"got requests.exceptions.ConnectionError - retry #{retry}") print(".", end="") retry += 1 else: raise def put(self, endpoint, data, file=None, **kwargs): retry = 0 while retry <= self.max_retry: try: if self._is_wc_api(endpoint): kwargs.update(self._get_requests_general_kwargs(endpoint)) if file: kwargs['json'] = file if data: kwargs['json'] = data return requests.put(**kwargs) else: return super().put(endpoint, data, **kwargs) except requests.exceptions.ConnectionError: if retry < self.max_retry: print(".", end="") retry += 1 else: raise def delete(self, endpoint, **kwargs): retry = 0 while retry <= self.max_retry: try: if self._is_wc_api(endpoint): return requests.delete(**self._get_requests_general_kwargs(endpoint), params={"force": True}) else: return super().delete(endpoint, **kwargs) except requests.exceptions.ConnectionError: if retry < self.max_retry: print(".", end="") retry += 1 else: raise