Spaces:
Building
Building
""" | |
Flare β API Executor | |
~~~~~~~~~~~~~~~~~~~~ | |
β’ header/body templating | |
β’ auth token caching + refresh | |
β’ retry w/ tenacity | |
""" | |
import json | |
import re | |
import time | |
from typing import Dict, Any | |
import httpx | |
from tenacity import retry, stop_after_attempt, wait_fixed, wait_exponential | |
from config_provider import APIConfig | |
from utils import log | |
_TOKEN_CACHE: Dict[str, Dict[str, Any]] = {} # name -> {token, expiry} | |
def _apply_template(template: Dict[str, str], | |
variables: Dict[str, str]) -> Dict[str, str]: | |
out = {} | |
for k, v in template.items(): | |
out[k] = re.sub(r"{{\s*([a-zA-Z0-9_.]+)\s*}}", | |
lambda m: variables.get(m.group(1), ""), v) | |
return out | |
def _get_auth_token(name: str, api_cfg: APIConfig) -> str: | |
if not api_cfg.auth.enabled: | |
return "" | |
cached = _TOKEN_CACHE.get(name) | |
if cached and cached["expiry"] > time.time(): | |
return cached["token"] | |
log(f"π Fetching token for API [{name}] ...") | |
body = _apply_template(api_cfg.auth.body_template, {}) | |
resp = httpx.post(api_cfg.auth.token_endpoint, json=body, timeout=10) | |
resp.raise_for_status() | |
token = resp.json() | |
for segment in api_cfg.auth.response_token_path.split("."): | |
token = token.get(segment) | |
_TOKEN_CACHE[name] = {"token": token, "expiry": time.time() + 3000} | |
return token | |
def _tenacity_wait(cfg: APIConfig): | |
return (wait_exponential(multiplier=cfg.retry.backoff_seconds) | |
if cfg.retry.strategy == "exponential" | |
else wait_fixed(cfg.retry.backoff_seconds)) | |
def call_api(cfg: APIConfig, | |
variables: Dict[str, str]) -> httpx.Response: | |
headers = _apply_template(cfg.headers, {"auth_tokens": _TOKEN_CACHE, | |
**variables}) | |
body = _apply_template(cfg.body_template, {"variables": variables}) | |
if cfg.auth.enabled: | |
headers["Authorization"] = f"Bearer {_get_auth_token(cfg.url, cfg)}" | |
log(f"π Calling API {cfg.method} {cfg.url}") | |
client_args = {"timeout": cfg.timeout_seconds} | |
if cfg.proxy.enabled: | |
client_args["proxies"] = {"all://": cfg.proxy.url} | |
with httpx.Client(**client_args) as client: | |
resp = client.request(cfg.method, cfg.url, json=body, headers=headers) | |
resp.raise_for_status() | |
return resp | |