Files
missbleue/api.py

164 lines
6.3 KiB
Python

from woocommerce import API
import base64
import requests
import ssl
from requests.adapters import HTTPAdapter
import logging
import os
from logging.handlers import TimedRotatingFileHandler
log_directory = "logs"
# 🔧 Configuration du handler avec rotation quotidienne
log_file = os.path.join(log_directory, "woocommerce.log")
handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # ⏰ Rotation tous les jours à minuit
interval=1, # 📅 Chaque 1 jour
backupCount=7, # ♻️ Garde les 7 derniers fichiers de log
encoding='utf-8' # 🧾 Pour supporter tous les caractères
)
# 📋 Format du log
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
# 🔌 Récupère le logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # 👁 Niveau minimum à capturer
logger.addHandler(handler)
class WoocommerceApiClient(API):
def __init__(self, url, wc_consumer_key, wc_consumer_secret, wp_application_user, wp_application_password, verify_ssl=False, **kwargs):
self.max_retry = 20
self.url = url
super().__init__(url, wc_consumer_key, wc_consumer_secret, **kwargs)
auth_str = f"{wp_application_user}:{wp_application_password}"
auth_bytes = auth_str.encode("utf-8")
auth_base64 = base64.b64encode(auth_bytes).decode("utf-8")
self.headers = {"Authorization": f"Basic {auth_base64}",
"User-Agent": "Mozilla/5.0"
}
self.verify_ssl = verify_ssl
self.init_ssl()
def _is_wc_api(self, endpoint):
return endpoint.startswith("media")
def _is_wp_api(self, endpoint):
return endpoint.startswith("settings")
def _get_requests_general_kwargs(self, endpoint):
return {
'url': f'{self.url}/wp-json/wp/v2/{endpoint}',
'headers': self.headers,
'verify': self.verify_ssl,
}
def init_ssl(self):
class SSLContextAdapter(HTTPAdapter):
def __init__(self, ssl_context=None, **kwargs):
self.ssl_context = ssl_context
super().__init__(**kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
super().init_poolmanager(*args, **kwargs)
# Create your custom SSL context
context = ssl.create_default_context()
context.set_ciphers('@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:AESGCM:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM:!PSK')
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.maximum_version = ssl.TLSVersion.TLSv1_3
# Create a session and mount adapter
session = requests.Session()
adapter = SSLContextAdapter(ssl_context=context)
session.mount('https://', adapter)
def get(self, endpoint, **kwargs):
retry = 0
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
#print(kwargs) # ✅ Montre tout le dict
#print(kwargs["headers"]) # ✅ Si tu veux un champ en particulier
return requests.get(**kwargs)
else:
return super().get(endpoint, **kwargs)
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise
def post(self, endpoint, data=None, files=None, **kwargs):
retry = 0
response = None
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
if files:
kwargs['files'] = files
if data:
kwargs['json'] = data
#print("kwargs envoyés à requests.post:", kwargs)
response = requests.post(**kwargs)
elif self._is_wp_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
kwargs['json'] = data
response = requests.post(**kwargs)
else:
response = super().post(endpoint, data, **kwargs)
if response.status_code not in (500, ) or retry >= self.max_retry:
return response
else:
retry += 1
logger.debug(f"status_code={response.status_code} - retry #{retry}")
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
logger.debug(f"got requests.exceptions.ConnectionError - retry #{retry}")
print(".", end="")
retry += 1
else: raise
def put(self, endpoint, data, file=None, **kwargs):
retry = 0
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
kwargs.update(self._get_requests_general_kwargs(endpoint))
if file:
kwargs['json'] = file
if data:
kwargs['json'] = data
return requests.put(**kwargs)
else:
return super().put(endpoint, data, **kwargs)
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise
def delete(self, endpoint, **kwargs):
retry = 0
while retry <= self.max_retry:
try:
if self._is_wc_api(endpoint):
return requests.delete(**self._get_requests_general_kwargs(endpoint), params={"force": True})
else:
return super().delete(endpoint, **kwargs)
except requests.exceptions.ConnectionError:
if retry < self.max_retry:
print(".", end="")
retry += 1
else: raise