Spaces:
Running
Running
import httpx | |
from loguru import logger | |
from core.conf import settings | |
from typing import Optional, Dict, Any | |
from core.token_manager import get_access_token | |
API_BASE_URL = settings.API_BASE_URL | |
# API_BASE_URL = "https://api.futabus.vn" | |
class API: | |
async def get( | |
endpoint: str, | |
params: Optional[Dict[str, Any]] = None, | |
headers: Optional[Dict[str, str]] = None, | |
api_base: str = None, | |
): | |
if api_base: | |
url = f"{api_base}{endpoint}" | |
else: | |
url = f"{API_BASE_URL}{endpoint}" | |
access_token = await get_access_token() | |
headers = headers or {} | |
headers.setdefault("Content-Type", "application/json") | |
headers["Authorization"] = f"Bearer {access_token}" | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.get(url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
# except httpx.HTTPStatusError as http_err: | |
# return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} | |
except httpx.HTTPStatusError as http_err: | |
## RETRY FOR 403 ERROR | |
if ( | |
http_err.response.status_code == 403 | |
or http_err.response.status_code == 401 | |
): | |
headers.pop("Authorization", None) | |
if http_err.response.status_code == 403: | |
headers["x-access-token"] = access_token | |
headers["token_type"] = "anonymous" | |
try: | |
response = await client.get(url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as second_err: | |
return { | |
"error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}" | |
} | |
except Exception as second_err: | |
return { | |
"error": f"[403 RETRY] Request failed: {str(second_err)}" | |
} | |
else: | |
return { | |
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
async def post( | |
endpoint: str, | |
payload: Dict[str, Any] = None, | |
headers: Optional[Dict[str, str]] = None, | |
api_base: str = None, | |
): | |
url = "" | |
if api_base: | |
url = f"{api_base}{endpoint}" | |
else: | |
url = f"{API_BASE_URL}{endpoint}" | |
access_token = await get_access_token() | |
headers = headers or {} | |
headers.setdefault("Content-Type", "application/json") | |
headers["Authorization"] = f"Bearer {access_token}" | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
## RETRY FOR 403 ERROR | |
if ( | |
http_err.response.status_code == 403 | |
or http_err.response.status_code == 401 | |
): | |
headers.pop("Authorization", None) | |
headers["x-access-token"] = access_token | |
headers["token_type"] = "anonymous" | |
try: | |
response = await client.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as second_err: | |
return { | |
"error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}" | |
} | |
except Exception as second_err: | |
return { | |
"error": f"[403 RETRY] Request failed: {str(second_err)}" | |
} | |
else: | |
return { | |
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
async def put_api( | |
endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None | |
): | |
url = f"{API_BASE_URL}{endpoint}" | |
access_token = await get_access_token() | |
headers = headers or {} | |
headers.setdefault("Content-Type", "application/json") | |
headers["Authorization"] = f"Bearer {access_token}" | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.put(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
return { | |
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
async def delete_api( | |
endpoint: str, | |
params: Optional[Dict[str, Any]] = None, | |
headers: Optional[Dict[str, str]] = None, | |
): | |
url = f"{API_BASE_URL}{endpoint}" | |
access_token = await get_access_token() | |
headers = headers or {} | |
headers.setdefault("Content-Type", "application/json") | |
headers["Authorization"] = f"Bearer {access_token}" | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.delete(url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
return { | |
"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
api: API = API() | |