flare / api_executor.py
ciyidogan's picture
Update api_executor.py
916c313 verified
raw
history blame
2.49 kB
"""
Flare – API Executor
~~~~~~~~~~~~~~~~~~~~
β€’ header/body templating
β€’ auth token caching + refresh
β€’ retry w/ tenacity
"""
import json
import re
import time
from typing import Dict, Any
import httpx
from tenacity import retry, stop_after_attempt, wait_fixed, wait_exponential
from config_provider import APIConfig
from utils import log
_TOKEN_CACHE: Dict[str, Dict[str, Any]] = {} # name -> {token, expiry}
def _apply_template(template: Dict[str, str],
variables: Dict[str, str]) -> Dict[str, str]:
out = {}
for k, v in template.items():
out[k] = re.sub(r"{{\s*([a-zA-Z0-9_.]+)\s*}}",
lambda m: variables.get(m.group(1), ""), v)
return out
def _get_auth_token(name: str, api_cfg: APIConfig) -> str:
if not api_cfg.auth.enabled:
return ""
cached = _TOKEN_CACHE.get(name)
if cached and cached["expiry"] > time.time():
return cached["token"]
log(f"πŸ”‘ Fetching token for API [{name}] ...")
body = _apply_template(api_cfg.auth.body_template, {})
resp = httpx.post(api_cfg.auth.token_endpoint, json=body, timeout=10)
resp.raise_for_status()
token = resp.json()
for segment in api_cfg.auth.response_token_path.split("."):
token = token.get(segment)
_TOKEN_CACHE[name] = {"token": token, "expiry": time.time() + 3000}
return token
def _tenacity_wait(cfg: APIConfig):
return (wait_exponential(multiplier=cfg.retry.backoff_seconds)
if cfg.retry.strategy == "exponential"
else wait_fixed(cfg.retry.backoff_seconds))
@retry(stop=stop_after_attempt(lambda cfg: cfg.retry.max_attempts),
wait=lambda retry_state: _tenacity_wait(retry_state.args[0]))
def call_api(cfg: APIConfig,
variables: Dict[str, str]) -> httpx.Response:
headers = _apply_template(cfg.headers, {"auth_tokens": _TOKEN_CACHE,
**variables})
body = _apply_template(cfg.body_template, {"variables": variables})
if cfg.auth.enabled:
headers["Authorization"] = f"Bearer {_get_auth_token(cfg.url, cfg)}"
log(f"🌐 Calling API {cfg.method} {cfg.url}")
client_args = {"timeout": cfg.timeout_seconds}
if cfg.proxy.enabled:
client_args["proxies"] = {"all://": cfg.proxy.url}
with httpx.Client(**client_args) as client:
resp = client.request(cfg.method, cfg.url, json=body, headers=headers)
resp.raise_for_status()
return resp