Spaces:
Building
Building
""" | |
Flare – API Executor (token refresh eklenmiş) | |
""" | |
from __future__ import annotations | |
import json, re, time, requests | |
from typing import Any, Dict | |
from utils import log | |
from config_provider import ConfigProvider, APIConfig, ProxyConfig | |
cfg = ConfigProvider.get() | |
_token: Dict[str, Dict[str, Any]] = {} # {api: {token, expiry, refresh_ep, refresh_body}} | |
_placeholder = re.compile(r"\{\{\s*([^\}]+?)\s*\}\}") | |
def _render(obj, vars, api): | |
def r(m): | |
key=m.group(1) | |
if key.startswith("variables."): | |
return str(vars.get(key.split(".",1)[1],"")) | |
if key.startswith("auth_tokens."): | |
_, apiname, _ = key.split(".") | |
return _token.get(apiname,{}).get("token","") | |
if key.startswith("config."): | |
return str(getattr(cfg.global_config, key.split(".",1)[1], "")) | |
return m.group(0) | |
if isinstance(obj,str): return _placeholder.sub(r,obj) | |
if isinstance(obj,dict): return {k:_render(v,vars,api) for k,v in obj.items()} | |
if isinstance(obj,list): return [_render(v,vars,api) for v in obj] | |
return obj | |
def _fetch_token(api: APIConfig): | |
body=_render(api.auth.token_request_body,{},api.name) | |
r=requests.post(api.auth.token_endpoint,json=body,timeout=api.timeout_seconds) | |
r.raise_for_status() | |
js=r.json() | |
tok=js | |
for p in api.auth.response_token_path.split("."): tok=tok[p] | |
_token[api.name]={ | |
"token":tok, | |
"expiry":time.time()+3500, | |
"refresh_ep":api.auth.token_refresh_endpoint, | |
"refresh_body":api.auth.token_refresh_body, | |
} | |
def _ensure_token(api: APIConfig): | |
if not api.auth or not api.auth.enabled: return | |
info=_token.get(api.name) | |
if not info: _fetch_token(api); return | |
if info["expiry"]>time.time(): return | |
# refresh varsa dene | |
if info["refresh_ep"]: | |
body=_render(info["refresh_body"],{},api.name) | |
try: | |
r=requests.post(info["refresh_ep"],json=body,timeout=api.timeout_seconds) | |
r.raise_for_status() | |
js=r.json() | |
tok=js | |
for p in api.auth.response_token_path.split("."): tok=tok[p] | |
info["token"]=tok; info["expiry"]=time.time()+3500 | |
return | |
except Exception as e: | |
log(f"⚠️ token refresh fail {e}") | |
_fetch_token(api) | |
def call_api(api: APIConfig, vars: Dict[str,Any]): | |
_ensure_token(api) | |
hdr=_render(api.headers,vars,api.name) | |
body=_render(api.body_template,vars,api.name) | |
proxy=None | |
if api.proxy: | |
proxy=api.proxy if isinstance(api.proxy,str) else (api.proxy.url if api.proxy.enabled else None) | |
log(f"🌐 {api.name} {api.method} {api.url}") | |
r=requests.request( | |
api.method, | |
api.url, | |
json=body if api.method in ("POST","PUT","PATCH") else None, | |
params=body if api.method=="GET" else None, | |
headers=hdr, | |
proxies={"http":proxy,"https":proxy} if proxy else None, | |
timeout=api.timeout_seconds, | |
) | |
r.raise_for_status() | |
return r | |