Spaces:
Running
Running
import gradio as gr | |
import os | |
import json | |
import requests | |
import time | |
from datetime import datetime, timedelta | |
from collections import deque | |
from supabase import create_client, Client | |
GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
SUPABASE_KEY = os.environ.get("SUPABASE_KEY") | |
SYSTEM_MESSAGE = os.environ.get("System_Prompt") | |
GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" | |
MODEL_NAME = "meta-llama/llama-4-scout-17b-16e-instruct" | |
MAX_TOKENS = 2048 | |
TEMPERATURE = 0.7 | |
TOP_P = 0.95 | |
MAX_REQUESTS_PER_MINUTE = 15 | |
REQUEST_WINDOW = 60 | |
class RateLimiter: | |
def __init__(self, max_requests=MAX_REQUESTS_PER_MINUTE, window=REQUEST_WINDOW): | |
self.max_requests = max_requests | |
self.window = window | |
self.requests = deque() | |
def can_make_request(self): | |
now = time.time() | |
while self.requests and self.requests[0] <= now - self.window: | |
self.requests.popleft() | |
return len(self.requests) < self.max_requests | |
def add_request(self): | |
self.requests.append(time.time()) | |
def time_until_next_request(self): | |
if not self.requests: | |
return 0 | |
oldest_request = self.requests[0] | |
time_passed = time.time() - oldest_request | |
if time_passed >= self.window: | |
return 0 | |
return self.window - time_passed | |
rate_limiter = RateLimiter() | |
def persist_data(session_data, user_identifier=None): | |
if not SUPABASE_URL or not SUPABASE_KEY: | |
return | |
try: | |
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
formatted_messages = [] | |
for user_msg, assistant_msg in session_data: | |
if user_msg: | |
formatted_messages.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
formatted_messages.append({"role": "assistant", "content": assistant_msg}) | |
data_to_insert = { | |
"timestamp": datetime.now().isoformat(), | |
"user_id": user_identifier, | |
"messages": formatted_messages | |
} | |
table_name = "conversations" | |
supabase.table(table_name).insert(data_to_insert).execute() | |
except Exception as e: | |
pass | |
def respond(message, history: list[tuple[str, str]]): | |
if not rate_limiter.can_make_request(): | |
wait_time = rate_limiter.time_until_next_request() | |
yield f"Límite local alcanzado. Espera {int(wait_time)} segundos." | |
return | |
rate_limiter.add_request() | |
messages = [{"role": "system", "content": SYSTEM_MESSAGE}] | |
truncated_history = history[-5:] if len(history) > 5 else history | |
for user_msg, assistant_msg in truncated_history: | |
if user_msg: | |
messages.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
messages.append({"role": "assistant", "content": assistant_msg[:1000]}) | |
messages.append({"role": "user", "content": message}) | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {GROQ_API_KEY}" | |
} | |
payload = { | |
"model": MODEL_NAME, | |
"messages": messages, | |
"max_tokens": MAX_TOKENS, | |
"temperature": TEMPERATURE, | |
"top_p": TOP_P, | |
"stream": True | |
} | |
max_retries = 3 | |
base_delay = 2 | |
for attempt in range(max_retries): | |
try: | |
if attempt > 0: | |
delay = base_delay * (2 ** (attempt - 1)) | |
yield f"Reintentando en {delay} segundos... (intento {attempt + 1}/{max_retries})" | |
time.sleep(delay) | |
response = requests.post( | |
GROQ_API_URL, | |
headers=headers, | |
json=payload, | |
stream=True, | |
timeout=30 | |
) | |
response.raise_for_status() | |
accumulated_response = "" | |
for line in response.iter_lines(): | |
if line: | |
line_text = line.decode('utf-8') | |
if line_text.startswith("data: "): | |
data_str = line_text[6:] | |
if data_str == "[DONE]": | |
break | |
try: | |
data = json.loads(data_str) | |
if 'choices' in data and len(data['choices']) > 0: | |
delta = data['choices'][0].get('delta', {}) | |
if 'content' in delta and delta['content']: | |
token = delta['content'] | |
accumulated_response += token | |
yield accumulated_response | |
except json.JSONDecodeError: | |
print(f"Error decodificando JSON del stream: {data_str}") | |
continue | |
if not accumulated_response: | |
if attempt < max_retries - 1: | |
continue | |
yield "Lo siento, no recibí una respuesta. Inténtalo de nuevo." | |
else: | |
current_session = history + [(message, accumulated_response)] | |
persist_data(current_session) | |
break | |
except requests.exceptions.HTTPError as e: | |
if e.response.status_code == 429: | |
error_text = e.response.text | |
if "TPM" in error_text or "tokens per minute" in error_text: | |
if attempt < max_retries - 1: | |
yield f"Límite de tokens por minuto alcanzado. Reintentando en 30 segundos..." | |
time.sleep(30) | |
continue | |
else: | |
yield "Límite de tokens por minuto excedido. Espera 1 minuto antes de continuar." | |
break | |
else: | |
if attempt < max_retries - 1: | |
retry_after = e.response.headers.get('retry-after', '10') | |
wait_time = min(int(retry_after), 30) | |
yield f"Servidor ocupado. Reintentando en {wait_time} segundos..." | |
time.sleep(wait_time) | |
continue | |
else: | |
yield "El servidor está muy ocupado. Inténtalo en unos minutos." | |
break | |
else: | |
print(f"HTTP Error: {e}") | |
yield f"Error del servidor: {e.response.status_code}. Inténtalo de nuevo." | |
break | |
except requests.exceptions.Timeout: | |
if attempt < max_retries - 1: | |
continue | |
print("Timeout en la solicitud a la API de Groq") | |
yield "La solicitud tardó demasiado tiempo. Por favor inténtalo de nuevo." | |
break | |
except requests.exceptions.RequestException as e: | |
print(f"Error en la solicitud a la API de Groq: {e}") | |
yield "Lo siento, ocurrió un error de conexión al procesar tu solicitud." | |
break | |
demo = gr.ChatInterface( | |
respond, | |
examples=[["¡Bienvenido a Tu Aliado Momentum!"], | |
["¿En qué consiste el programa y para quién es?"], | |
["¿Qué beneficios obtengo y con qué empresas me conecto?"], | |
["¿Cómo puedo participar o registrarme?"] | |
] | |
) | |
if __name__ == "__main__": | |
demo.launch() |