File size: 3,862 Bytes
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import httpx
from core.conf import settings
from typing import Optional, Dict, Any
from core.token_manager import get_access_token

API_BASE_URL = settings.API_BASE_URL

class API():
    @staticmethod
    async def get(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, api_base: str = None):
        if api_base:
            url = f"{api_base}{endpoint}"
        else:
            url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(url, headers=headers, params=params)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def post(endpoint: str , payload: Dict[str, Any] = None,headers: Optional[Dict[str, str]] = None, api_base: str = None):
        if api_base:
            url = f"{api_base}{endpoint}"
        else:
            url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(url, json=payload, headers=headers)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def put_api(endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None):
        url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.put(url, json=payload, headers=headers)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def delete_api(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None):
        url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.delete(url, headers=headers, params=params)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

api: API = API()