Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import gradio as gr | |
| from google import genai | |
| from typing import List, Dict, Any, Optional | |
| from supabase import create_client, Client | |
| import jwt | |
| from datetime import datetime, timedelta | |
| # --- Configuration --- | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") | |
| JWT_SECRET = os.getenv("JWT_SECRET", "a-strong-secret-key-for-local-testing") | |
| # --- External API Config --- | |
| QUANTUM_API_BASE_URL = os.getenv("QUANTUM_API_BASE_URL", "https://virtserver.swaggerhub.com/JOCALL3_1/jamesburvel/1.0") | |
| # CHANGED: The API key is now optional. It will be None if the secret is not set. | |
| QUANTUM_API_KEY = os.getenv("QUANTUM_API_KEY") | |
| # --- Initialize Supabase Admin Client --- | |
| try: | |
| supabase_admin: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| print("Successfully created Supabase admin client.") | |
| except Exception as e: | |
| supabase_admin = None | |
| print(f"Warning: Supabase service key invalid. User authentication will fail. Error: {e}") | |
| # --- Real API Client for the "Quantum Core 3.0" Service --- | |
| class QuantumAPIClient: | |
| # CHANGED: The __init__ method is updated to handle an optional API key. | |
| def __init__(self, base_url: str, api_key: Optional[str] = None): | |
| self.base_url = base_url | |
| self.headers = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| if api_key: | |
| print("Quantum API Key found, adding Authorization header.") | |
| self.headers["Authorization"] = f"Bearer {api_key}" | |
| else: | |
| print("No Quantum API Key found, making public API calls.") | |
| def _get(self, endpoint: str, params: Dict[str, Any] = None): | |
| try: | |
| response = requests.get(f"{self.base_url}{endpoint}", headers=self.headers, params=params, timeout=20) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| return {"error": f"API call failed: {e}"} | |
| def _post(self, endpoint: str, data: Dict[str, Any]): | |
| try: | |
| response = requests.post(f"{self.base_url}{endpoint}", headers=self.headers, json=data, timeout=20) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| return {"error": f"API call failed: {e}"} | |
| quantum_client = QuantumAPIClient(base_url=QUANTUM_API_BASE_URL, api_key=QUANTUM_API_KEY) | |
| # --- Tool Definitions (Internal Memory + External API) --- | |
| def get_ai_advisor_chat_history(supabase_user_client: Client, user_id: str): | |
| print(f"TOOL CALL: get_ai_advisor_chat_history for user '{user_id}' from Supabase") | |
| try: | |
| response = supabase_user_client.table('chat_history').select("*").eq('user_id', user_id).order('timestamp').limit(50).execute() | |
| return response.data | |
| except Exception as e: | |
| return {"error": f"Database query failed: {str(e)}"} | |
| def save_chat_turn(supabase_user_client: Client, user_id: str, session_id: str, user_message: str, assistant_message: str): | |
| try: | |
| supabase_user_client.table('chat_history').insert([ | |
| {"user_id": user_id, "session_id": session_id, "role": "user", "content": user_message}, | |
| {"user_id": user_id, "session_id": session_id, "role": "assistant", "content": assistant_message} | |
| ]).execute() | |
| except Exception as e: | |
| print(f"Error saving chat turn: {e}") | |
| def run_standard_financial_simulation(prompt: str, duration_years: int = 5): | |
| """Submits a 'What-If' scenario to the Quantum Oracle AI for standard financial impact analysis.""" | |
| print(f"TOOL CALL: run_standard_financial_simulation with prompt: '{prompt}'") | |
| payload = {"prompt": prompt, "parameters": {"durationYears": duration_years}} | |
| return quantum_client._post("/ai/oracle/simulate", data=payload) | |
| def generate_video_ad(prompt: str, style: str = "Cinematic", length_seconds: int = 15): | |
| """Submits a request to generate a high-quality video ad using the Veo 2.0 generative AI model.""" | |
| print(f"TOOL CALL: generate_video_ad with prompt: '{prompt}'") | |
| payload = {"prompt": prompt, "style": style, "lengthSeconds": length_seconds} | |
| return quantum_client._post("/ai/ads/generate", data=payload) | |
| def list_available_ai_tools(): | |
| """Retrieves a list of all integrated AI tools that Quantum can invoke.""" | |
| print("TOOL CALL: list_available_ai_tools -> GET /ai/advisor/tools") | |
| return quantum_client._get("/ai/advisor/tools") | |
| # --- Core Agent & Auth Logic --- | |
| def perform_login(user_id: str): | |
| if not supabase_admin: raise Exception("Auth service not configured.") | |
| payload = { "sub": user_id, "aud": "authenticated", "exp": datetime.utcnow() + timedelta(hours=1), "role": "authenticated" } | |
| return jwt.encode(payload, JWT_SECRET, algorithm="HS256") | |
| def run_agent_logic(prompt: str, user_id: str, session_id: str, token: str): | |
| try: | |
| decoded_token = jwt.decode(token, JWT_SECRET, algorithms=["HS256"], audience="authenticated") | |
| if not decoded_token.get("sub") or decoded_token.get("sub") != user_id: | |
| raise Exception("Invalid token or user mismatch.") | |
| supabase_user_client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| supabase_user_client.auth.set_session(access_token=token, refresh_token=token) | |
| except jwt.PyJWTError as e: | |
| raise Exception(f"Invalid token: {e}") | |
| def get_my_chat_history(): | |
| """Fetches my most recent conversation history to provide context.""" | |
| return get_ai_advisor_chat_history(supabase_user_client, user_id) | |
| all_tools = [ | |
| get_my_chat_history, | |
| run_standard_financial_simulation, | |
| generate_video_ad, | |
| list_available_ai_tools, | |
| ] | |
| try: | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| history_data = get_my_chat_history() | |
| conversation_history = [] | |
| if isinstance(history_data, list): | |
| for turn in history_data: | |
| role = "user" if turn['role'] == 'user' else "model" | |
| conversation_history.append({'role': role, 'parts': [{'text': turn['content']}]}) | |
| conversation_history.append({'role': 'user', 'parts': [{'text': prompt}]}) | |
| model = genai.GenerativeModel('gemini-pro') | |
| response = model.generate_content( | |
| conversation_history, tools=all_tools, | |
| safety_settings={'HARM_CATEGORY_DANGEROUS_CONTENT': 'BLOCK_NONE', 'HARM_CATEGORY_HARASSMENT': 'BLOCK_NONE', 'HARM_CATEGORY_HATE_SPEECH': 'BLOCK_NONE', 'HARM_CATEGORY_SEXUALLY_EXPLICIT': 'BLOCK_NONE'} | |
| ) | |
| final_response_text = response.text | |
| save_chat_turn(supabase_user_client, user_id, session_id, prompt, final_response_text) | |
| return final_response_text | |
| except Exception as e: | |
| print(f"Error in agent invocation: {e}") | |
| return f"An error occurred: {e}" | |
| # --- Gradio UI (Pure Gradio, No FastAPI) --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 💎 Secure, Multi-User AI Agent (Full API)") | |
| gr.Markdown("Log in with a User ID. The agent can fetch chat history (from Supabase) and call external APIs for simulations or ads.") | |
| user_id_state = gr.State("") | |
| token_state = gr.State("") | |
| session_id_state = gr.State("session-" + os.urandom(8).hex()) | |
| with gr.Row(): | |
| user_id_input = gr.Textbox(label="Enter User ID to Log In", placeholder="e.g., user-a") | |
| login_button = gr.Button("Login") | |
| login_status = gr.Markdown("") | |
| chatbot = gr.Chatbot(label="Agent Chat", visible=False, height=500) | |
| msg_input = gr.Textbox(label="Your Message", visible=False, show_label=False, placeholder="Type your message here...") | |
| def login_fn(user_id): | |
| if not user_id: | |
| return {login_status: gr.update(value="<p style='color:red;'>Please enter a User ID.</p>")} | |
| try: | |
| token = perform_login(user_id) | |
| status_html = f"<p style='color:green;'>Logged in as: {user_id}. You can start chatting.</p>" | |
| return { | |
| login_status: gr.update(value=status_html), user_id_state: user_id, token_state: token, | |
| chatbot: gr.update(visible=True), msg_input: gr.update(visible=True), | |
| } | |
| except Exception as e: | |
| return {login_status: gr.update(value=f"<p style='color:red;'>Login failed: {e}</p>")} | |
| def chat_fn(message, chat_history, user_id, token, session_id): | |
| if not token: | |
| chat_history.append((message, "Error: You are not logged in.")) | |
| return "", chat_history | |
| bot_message = run_agent_logic(message, user_id, session_id, token) | |
| chat_history.append((message, bot_message)) | |
| return "", chat_history | |
| login_button.click(login_fn, inputs=[user_id_input], outputs=[login_status, user_id_state, token_state, chatbot, msg_input]) | |
| msg_input.submit(chat_fn, [msg_input, chatbot, user_id_state, token_state, session_id_state], [msg_input, chatbot]) | |
| # The standard way to launch a Gradio app on Hugging Face | |
| demo.launch() |