Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
from os import getenv | |
import base64 | |
from io import BytesIO | |
from dotenv import load_dotenv | |
import requests | |
import socket | |
import logging | |
import json | |
from langchain_openai import ChatOpenAI | |
from langchain_core.messages import HumanMessage, AIMessage | |
from langchain_core.callbacks import StreamingStdOutCallbackHandler | |
# Load environment | |
dotenv_path = os.path.join(os.path.dirname(__file__), '.env') | |
load_dotenv(dotenv_path=dotenv_path) | |
# Connectivity test | |
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"): | |
try: | |
return requests.get(url, timeout=5).status_code == 200 | |
except (requests.RequestException, socket.error): | |
return False | |
# Helper to make direct API calls to OpenRouter when LangChain fails | |
def direct_api_call(messages, api_key, base_url): | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {api_key}", | |
"HTTP-Referer": "https://your-app-domain.com", # Add your domain | |
"X-Title": "Image Analysis App" | |
} | |
if getenv("HELICONE_API_KEY"): | |
headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}" | |
payload = { | |
"model": "google/gemini-flash-1.5", | |
"messages": messages, | |
"stream": False, | |
} | |
try: | |
response = requests.post( | |
f"{base_url}/chat/completions", | |
headers=headers, | |
json=payload, | |
timeout=30 | |
) | |
response.raise_for_status() | |
return response.json()["choices"][0]["message"]["content"] | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Initialize LLM with streaming and retry logic | |
def init_llm(): | |
if not test_connectivity(): | |
raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.") | |
return ChatOpenAI( | |
openai_api_key=getenv("OPENROUTER_API_KEY"), | |
openai_api_base=getenv("OPENROUTER_BASE_URL"), | |
model_name="google/gemini-flash-1.5", | |
streaming=True, | |
callbacks=[StreamingStdOutCallbackHandler()], | |
model_kwargs={ | |
"extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"} | |
}, | |
) | |
# Try to initialize LLM but handle failures gracefully | |
try: | |
llm = init_llm() | |
except Exception as e: | |
llm = None | |
# Helpers | |
def encode_image_to_base64(pil_image): | |
buffer = BytesIO() | |
pil_image.save(buffer, format="PNG") | |
return base64.b64encode(buffer.getvalue()).decode() | |
# Core logic | |
def generate_response(message, chat_history, image): | |
# Convert chat history to standard format | |
formatted_history = [] | |
for msg in chat_history: | |
role = msg.get('role') | |
content = msg.get('content') | |
if role == 'user': | |
formatted_history.append({"role": "user", "content": content}) | |
else: | |
formatted_history.append({"role": "assistant", "content": content}) | |
# Prepare system message | |
system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."} | |
# Prepare the latest message with image if provided | |
if image: | |
base64_image = encode_image_to_base64(image) | |
# Format for direct API call (OpenRouter/OpenAI format) | |
api_messages = [system_msg] + formatted_history + [{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": message}, | |
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} | |
] | |
}] | |
# For LangChain format | |
content_for_langchain = [ | |
{"type": "text", "text": message}, | |
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} | |
] | |
else: | |
api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}] | |
content_for_langchain = message | |
# Build LangChain messages | |
lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")] | |
for msg in chat_history: | |
role = msg.get('role') | |
content = msg.get('content') | |
if role == 'user': | |
lc_messages.append(HumanMessage(content=content)) | |
else: | |
lc_messages.append(AIMessage(content=content)) | |
lc_messages.append(HumanMessage(content=content_for_langchain)) | |
try: | |
# First try with LangChain | |
if llm: | |
try: | |
try: | |
stream_iter = llm.stream(lc_messages) | |
partial = "" | |
for chunk in stream_iter: | |
if chunk is None: | |
continue | |
content = getattr(chunk, 'content', None) | |
if content is None: | |
continue | |
partial += content | |
yield partial | |
# If we got this far, streaming worked | |
return | |
except Exception as e: | |
print(f"Streaming failed: {e}. Falling back to non-streaming mode") | |
# Try non-streaming | |
try: | |
response = llm.invoke(lc_messages) | |
yield response.content | |
return | |
except Exception as e: | |
raise e | |
except Exception as e: | |
raise e | |
response_text = direct_api_call( | |
api_messages, | |
getenv("OPENROUTER_API_KEY"), | |
getenv("OPENROUTER_BASE_URL") | |
) | |
yield response_text | |
except Exception as e: | |
import traceback | |
error_trace = traceback.format_exc() | |
yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde." | |
# Gradio interface | |
def process_message(message, chat_history, image): | |
if chat_history is None: | |
chat_history = [] | |
if image is None: | |
chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'}) | |
return "", chat_history | |
chat_history.append({'role':'user','content':message}) | |
chat_history.append({'role':'assistant','content':'⏳ Procesando...'}) | |
yield "", chat_history | |
for chunk in generate_response(message, chat_history, image): | |
chat_history[-1]['content'] = chunk | |
yield "", chat_history | |
return "", chat_history | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot(type='messages', height=600) | |
msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...") | |
clear = gr.ClearButton([msg, chatbot]) | |
with gr.Column(scale=1): | |
image_input = gr.Image(type="pil", label="Sube Imagen") | |
info = gr.Textbox(label="Info Imagen", interactive=False) | |
msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot]) | |
image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info]) | |
demo.launch() |