dialogflowAPI / common /external /external_api.py
OnlyBiggg
refactor code
df37f6e
import httpx
from loguru import logger
from core.conf import settings
from typing import Optional, Dict, Any
from core.token_manager import get_access_token
API_BASE_URL = settings.API_BASE_URL
# API_BASE_URL = "https://api.futabus.vn"
class API:
@staticmethod
async def get(
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
api_base: str = None,
):
if api_base:
url = f"{api_base}{endpoint}"
else:
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
# except httpx.HTTPStatusError as http_err:
# return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
except httpx.HTTPStatusError as http_err:
## RETRY FOR 403 ERROR
if (
http_err.response.status_code == 403
or http_err.response.status_code == 401
):
headers.pop("Authorization", None)
if http_err.response.status_code == 403:
headers["x-access-token"] = access_token
headers["token_type"] = "anonymous"
try:
response = await client.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as second_err:
return {
"error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}"
}
except Exception as second_err:
return {
"error": f"[403 RETRY] Request failed: {str(second_err)}"
}
else:
return {
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
@staticmethod
async def post(
endpoint: str,
payload: Dict[str, Any] = None,
headers: Optional[Dict[str, str]] = None,
api_base: str = None,
):
url = ""
if api_base:
url = f"{api_base}{endpoint}"
else:
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
## RETRY FOR 403 ERROR
if (
http_err.response.status_code == 403
or http_err.response.status_code == 401
):
headers.pop("Authorization", None)
headers["x-access-token"] = access_token
headers["token_type"] = "anonymous"
try:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as second_err:
return {
"error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}"
}
except Exception as second_err:
return {
"error": f"[403 RETRY] Request failed: {str(second_err)}"
}
else:
return {
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
@staticmethod
async def put_api(
endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None
):
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.put(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
return {
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
@staticmethod
async def delete_api(
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
):
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.delete(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
return {
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
api: API = API()