Spaces:
Sleeping
Sleeping
File size: 3,862 Bytes
5564ecb dd9b2b9 5564ecb dd9b2b9 5564ecb dd9b2b9 5564ecb dd9b2b9 5564ecb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import httpx
from core.conf import settings
from typing import Optional, Dict, Any
from core.token_manager import get_access_token
API_BASE_URL = settings.API_BASE_URL
class API():
@staticmethod
async def get(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, api_base: str = None):
if api_base:
url = f"{api_base}{endpoint}"
else:
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
@staticmethod
async def post(endpoint: str , payload: Dict[str, Any] = None,headers: Optional[Dict[str, str]] = None, api_base: str = None):
if api_base:
url = f"{api_base}{endpoint}"
else:
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
@staticmethod
async def put_api(endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None):
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.put(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
@staticmethod
async def delete_api(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None):
url = f"{API_BASE_URL}{endpoint}"
access_token = await get_access_token()
headers = headers or {}
headers.setdefault("Content-Type", "application/json")
headers["Authorization"] = f"Bearer {access_token}"
async with httpx.AsyncClient() as client:
try:
response = await client.delete(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as http_err:
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
except Exception as err:
return {"error": f"Request failed: {str(err)}"}
api: API = API() |