Spaces:
Paused
Paused
| import requests | |
| from utils import log | |
| class APIConnector: | |
| def __init__(self, service_config): | |
| self.service_config = service_config | |
| def resolve_placeholders(self, template, session): | |
| resolved = template | |
| for key, value in session.variables.items(): | |
| resolved = resolved.replace(f"{{variables.{key}}}", str(value)) | |
| for api, tokens in session.auth_tokens.items(): | |
| resolved = resolved.replace(f"{{auth_tokens.{api}.token}}", tokens.get("token", "")) | |
| return resolved | |
| def get_auth_token(self, api_name, auth_config, session): | |
| auth_endpoint = auth_config.get("auth_endpoint") | |
| auth_body = { | |
| k: self.resolve_placeholders(str(v), session) | |
| for k, v in auth_config.get("auth_body", {}).items() | |
| } | |
| token_path = auth_config.get("auth_token_path") | |
| response = requests.post(auth_endpoint, json=auth_body, timeout=5) | |
| response.raise_for_status() | |
| json_resp = response.json() | |
| token = json_resp | |
| for part in token_path.split("."): | |
| token = token.get(part) | |
| if token is None: | |
| raise Exception(f"Could not resolve token path: {token_path}") | |
| refresh_token = json_resp.get("refresh_token") | |
| session.auth_tokens[api_name] = {"token": token, "refresh_token": refresh_token} | |
| log(f"π Retrieved auth token for {api_name}") | |
| return token | |
| def refresh_auth_token(self, api_name, auth_config, session): | |
| refresh_endpoint = auth_config.get("auth_refresh_endpoint") | |
| refresh_body = { | |
| k: self.resolve_placeholders(str(v), session) | |
| for k, v in auth_config.get("refresh_body", {}).items() | |
| } | |
| token_path = auth_config.get("auth_token_path") | |
| response = requests.post(refresh_endpoint, json=refresh_body, timeout=5) | |
| response.raise_for_status() | |
| json_resp = response.json() | |
| token = json_resp | |
| for part in token_path.split("."): | |
| token = token.get(part) | |
| if token is None: | |
| raise Exception(f"Could not resolve token path: {token_path}") | |
| new_refresh_token = json_resp.get("refresh_token", session.auth_tokens[api_name].get("refresh_token")) | |
| session.auth_tokens[api_name] = {"token": token, "refresh_token": new_refresh_token} | |
| log(f"π Refreshed auth token for {api_name}") | |
| return token | |
| def call_api(self, intent_def, session): | |
| api_name = intent_def.get("action") | |
| api_def = self.service_config.get_api_config(api_name) | |
| if not api_def: | |
| raise Exception(f"API config not found: {api_name}") | |
| url = api_def["url"] | |
| method = api_def.get("method", "POST") | |
| headers = {h["key"]: self.resolve_placeholders(h["value"], session) for h in api_def.get("headers", [])} | |
| body = {k: self.resolve_placeholders(str(v), session) for k, v in api_def.get("body", {}).items()} | |
| timeout = api_def.get("timeout", 5) | |
| retry_count = api_def.get("retry_count", 0) | |
| auth_config = api_def.get("auth") | |
| # Get auth token if needed | |
| if auth_config and api_name not in session.auth_tokens: | |
| self.get_auth_token(api_name, auth_config, session) | |
| for attempt in range(retry_count + 1): | |
| try: | |
| response = requests.request(method, url, headers=headers, json=body, timeout=timeout) | |
| if response.status_code == 401 and auth_config and attempt < retry_count: | |
| log(f"π Token expired for {api_name}, refreshing...") | |
| self.refresh_auth_token(api_name, auth_config, session) | |
| continue | |
| response.raise_for_status() | |
| log(f"β API call successful: {api_name}") | |
| return response.json() | |
| except requests.Timeout: | |
| fallback = intent_def.get("fallback_timeout_message", "This operation is currently unavailable.") | |
| log(f"β οΈ API timeout for {api_name} β {fallback}") | |
| return {"fallback": fallback} | |
| except Exception as e: | |
| log(f"β API call error for {api_name}: {e}") | |
| fallback = intent_def.get("fallback_error_message", "An error occurred during the operation.") | |
| return {"fallback": fallback} | |