|
|
import gradio as gr |
|
|
import base64 |
|
|
import os |
|
|
from pathlib import Path |
|
|
import openai |
|
|
import json |
|
|
from PIL import Image |
|
|
import io |
|
|
import asyncio |
|
|
from settings_mgr import generate_download_settings_js, generate_upload_settings_js |
|
|
from chat_export import import_history, get_export_js |
|
|
from mcp_registry import load_registry, get_tools_for_server, call_local_mcp_tool, function_to_mcp_map, shutdown_local_mcp_clients |
|
|
from gradio.components.base import Component |
|
|
from types import SimpleNamespace |
|
|
from dotenv import load_dotenv |
|
|
from transcription import stream_transcriptions |
|
|
|
|
|
from doc2json import process_docx |
|
|
from code_exec import eval_script |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
dump_controls = False |
|
|
log_to_console = False |
|
|
|
|
|
mcp_servers = load_registry() |
|
|
|
|
|
def encode_image(image_data): |
|
|
"""Generates a prefix for image base64 data in the required format for the |
|
|
four known image formats: png, jpeg, gif, and webp. |
|
|
|
|
|
Args: |
|
|
image_data: The image data, encoded in base64. |
|
|
|
|
|
Returns: |
|
|
A string containing the prefix. |
|
|
""" |
|
|
|
|
|
|
|
|
magic_number = image_data[:4] |
|
|
|
|
|
|
|
|
if magic_number.startswith(b'\x89PNG'): |
|
|
image_type = 'png' |
|
|
elif magic_number.startswith(b'\xFF\xD8'): |
|
|
image_type = 'jpeg' |
|
|
elif magic_number.startswith(b'GIF89a'): |
|
|
image_type = 'gif' |
|
|
elif magic_number.startswith(b'RIFF'): |
|
|
if image_data[8:12] == b'WEBP': |
|
|
image_type = 'webp' |
|
|
else: |
|
|
|
|
|
raise Exception("Unknown image type") |
|
|
else: |
|
|
|
|
|
raise Exception("Unknown image type") |
|
|
|
|
|
return f"data:image/{image_type};base64,{base64.b64encode(image_data).decode('utf-8')}" |
|
|
|
|
|
def process_pdf(pdf_fn: str): |
|
|
with open(pdf_fn, "rb") as pdf_file: |
|
|
base64_string = base64.b64encode(pdf_file.read()).decode("utf-8") |
|
|
return [{"type": "input_file", "filename": os.path.basename(pdf_fn), |
|
|
"file_data": f"data:application/pdf;base64,{base64_string}"}] |
|
|
|
|
|
def encode_file(fn: str) -> list: |
|
|
user_msg_parts = [] |
|
|
|
|
|
if fn.endswith(".docx"): |
|
|
user_msg_parts.append({"type": "input_text", "text": process_docx(fn)}) |
|
|
elif fn.endswith(".pdf"): |
|
|
user_msg_parts.extend(process_pdf(fn)) |
|
|
else: |
|
|
with open(fn, mode="rb") as f: |
|
|
content = f.read() |
|
|
|
|
|
isImage = False |
|
|
if isinstance(content, bytes): |
|
|
try: |
|
|
|
|
|
content = encode_image(content) |
|
|
isImage = True |
|
|
except: |
|
|
|
|
|
content = content.decode('utf-8', 'replace') |
|
|
else: |
|
|
content = str(content) |
|
|
|
|
|
if isImage: |
|
|
user_msg_parts.append({"type": "input_image", |
|
|
"image_url": content}) |
|
|
else: |
|
|
fn = os.path.basename(fn) |
|
|
user_msg_parts.append({"type": "input_text", "text": f"```{fn}\n{content}\n```"}) |
|
|
|
|
|
return user_msg_parts |
|
|
|
|
|
def normalize_user_content(content) -> list: |
|
|
"""Convert chat history entries to OpenAI-style message parts.""" |
|
|
parts = [] |
|
|
|
|
|
if hasattr(content, "model_dump"): |
|
|
content = content.model_dump() |
|
|
|
|
|
if isinstance(content, Component): |
|
|
val = getattr(content, "value", None) |
|
|
if val is None and hasattr(content, "constructor_args"): |
|
|
ca = content.constructor_args |
|
|
if isinstance(ca, dict): |
|
|
val = ca.get("value") |
|
|
elif isinstance(ca, list): |
|
|
for entry in ca: |
|
|
if isinstance(entry, dict) and "value" in entry: |
|
|
val = entry["value"] |
|
|
break |
|
|
if val is not None: |
|
|
content = val |
|
|
|
|
|
if isinstance(content, dict): |
|
|
if "file" in content and isinstance(content["file"], dict) and content["file"].get("path"): |
|
|
parts.extend(encode_file(content["file"]["path"])) |
|
|
elif content.get("path"): |
|
|
parts.extend(encode_file(content["path"])) |
|
|
elif content.get("component"): |
|
|
val = content.get("value") or content.get("constructor_args", {}).get("value") |
|
|
if isinstance(val, dict) and val.get("path"): |
|
|
parts.extend(encode_file(val["path"])) |
|
|
else: |
|
|
parts.append({"type": "input_text", "text": str(content)}) |
|
|
else: |
|
|
parts.append({"type": "input_text", "text": str(content)}) |
|
|
elif isinstance(content, Image.Image): |
|
|
buf = io.BytesIO() |
|
|
fmt = content.format if content.format else "PNG" |
|
|
content.save(buf, format=fmt) |
|
|
parts.append({"type": "input_image", "image_url": encode_image(buf.getvalue())}) |
|
|
elif isinstance(content, tuple): |
|
|
parts.extend(encode_file(content[0])) |
|
|
else: |
|
|
parts.append({"type": "input_text", "text": str(content)}) |
|
|
|
|
|
return parts |
|
|
|
|
|
def undo(history): |
|
|
history.pop() |
|
|
return history |
|
|
|
|
|
def clear_both_histories(): |
|
|
"""Clear both chatbot display history and OpenAI format history""" |
|
|
return [], [] |
|
|
|
|
|
def undo_both_histories(chatbot_history, openai_history): |
|
|
"""Remove last message from both histories""" |
|
|
|
|
|
|
|
|
while chatbot_history and chatbot_history[-1]["role"] != "user": |
|
|
chatbot_history.pop() |
|
|
if chatbot_history and chatbot_history[-1]["role"] == "user": |
|
|
chatbot_history.pop() |
|
|
|
|
|
|
|
|
while openai_history and not (isinstance(openai_history[-1], dict) and openai_history[-1].get("role") == "user"): |
|
|
openai_history.pop() |
|
|
if openai_history and isinstance(openai_history[-1], dict) and openai_history[-1].get("role") == "user": |
|
|
openai_history.pop() |
|
|
|
|
|
return chatbot_history, openai_history |
|
|
|
|
|
def retry_last_message(chatbot_history, openai_history): |
|
|
"""Remove last assistant message for retry""" |
|
|
if chatbot_history and len(chatbot_history) > 0: |
|
|
|
|
|
last_msg = chatbot_history[-1] |
|
|
if hasattr(last_msg, 'role') and last_msg.role == "assistant": |
|
|
new_chatbot = chatbot_history[:-1] |
|
|
new_openai = openai_history[:-1] if openai_history else [] |
|
|
return new_chatbot, new_openai |
|
|
elif isinstance(last_msg, dict) and last_msg.get('role') == "assistant": |
|
|
new_chatbot = chatbot_history[:-1] |
|
|
new_openai = openai_history[:-1] if openai_history else [] |
|
|
return new_chatbot, new_openai |
|
|
|
|
|
return chatbot_history, openai_history |
|
|
|
|
|
def dump(history): |
|
|
return str(history) |
|
|
|
|
|
def load_settings(): |
|
|
|
|
|
pass |
|
|
|
|
|
def _event_to_dict(obj): |
|
|
if hasattr(obj, "model_dump"): |
|
|
return obj.model_dump() |
|
|
if hasattr(obj, "to_dict"): |
|
|
return obj.to_dict() |
|
|
if hasattr(obj, "__dict__"): |
|
|
return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} |
|
|
return {"type": getattr(obj, "type", "unknown")} |
|
|
|
|
|
def save_settings(acc, sec, prompt, temp, tokens, model): |
|
|
|
|
|
pass |
|
|
|
|
|
def process_values_js(): |
|
|
return """ |
|
|
() => { |
|
|
return ["oai_key", "system_prompt"]; |
|
|
} |
|
|
""" |
|
|
|
|
|
async def bot(message, history, history_openai_format, oai_key, system_prompt, temperature, max_tokens, model, reasoning_effort, verbosity, python_use, web_search, *mcp_selected): |
|
|
try: |
|
|
client = openai.OpenAI( |
|
|
api_key=oai_key |
|
|
) |
|
|
|
|
|
if model in ("whisper", "gpt-4o-transcribe-diarize"): |
|
|
assistant_msg = gr.ChatMessage(role="assistant", content="") |
|
|
streamed = False |
|
|
for content in stream_transcriptions(client, model, message, history, system_prompt): |
|
|
streamed = True |
|
|
assistant_msg.content = content |
|
|
yield assistant_msg, history_openai_format |
|
|
if not streamed: |
|
|
yield assistant_msg, history_openai_format |
|
|
return |
|
|
|
|
|
elif model == "gpt-image-1": |
|
|
if message.get("files"): |
|
|
image_files = [] |
|
|
for file in message["files"]: |
|
|
image_files.append(open(file, "rb")) |
|
|
|
|
|
response = client.images.edit( |
|
|
model=model, |
|
|
image=image_files, |
|
|
prompt=message["text"], |
|
|
quality="high" |
|
|
) |
|
|
for f in image_files: |
|
|
f.close() |
|
|
else: |
|
|
response = client.images.generate( |
|
|
model=model, |
|
|
prompt=message["text"], |
|
|
quality="high", |
|
|
moderation="low" |
|
|
) |
|
|
b64data = response.data[0].b64_json |
|
|
img_bytes = base64.b64decode(b64data) |
|
|
pil_img = Image.open(io.BytesIO(img_bytes)).convert("RGB") |
|
|
yield gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=gr.Image(type="pil", value=pil_img) |
|
|
) |
|
|
else: |
|
|
approval_items = [] |
|
|
if history_openai_format: |
|
|
last_msg = history_openai_format[-1] |
|
|
if isinstance(last_msg, openai.types.responses.ResponseOutputMessage) and last_msg.type == "mcp_approval_request": |
|
|
flag = message[0].lower() |
|
|
if flag == 'y': |
|
|
approve = True |
|
|
elif flag == 'n': |
|
|
approve = False |
|
|
else: |
|
|
raise gr.Error("MCP tool call awaiting confirmation. Start your reply with 'y' or 'n'.") |
|
|
history_openai_format.append({ |
|
|
"type": "mcp_approval_response", |
|
|
"approval_request_id": pending_mcp_request.id, |
|
|
"approve": approve, |
|
|
}) |
|
|
|
|
|
tools = [] |
|
|
if python_use: |
|
|
tools.append({ |
|
|
"type": "function", |
|
|
"name": "eval_python", |
|
|
"description": "Evaluate a simple script written in a conservative, restricted subset of Python." |
|
|
"Note: Augmented assignments, in-place operations (e.g., +=, -=), lambdas (e.g. list comprehensions) are not supported. " |
|
|
"Use regular assignments and operations instead. Only 'import math' is allowed. " |
|
|
"Returns: unquoted results without HTML encoding.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"python_source_code": { |
|
|
"type": "string", |
|
|
"description": "The Python script that will run in a RestrictedPython context. " |
|
|
"Avoid using augmented assignments or in-place operations (+=, -=, etc.), as well as lambdas (e.g. list comprehensions). " |
|
|
"Use regular assignments and operations instead. Only 'import math' is allowed. Results need to be reported through print()." |
|
|
} |
|
|
}, |
|
|
"required": ["python_source_code"] |
|
|
} |
|
|
}) |
|
|
if web_search: |
|
|
tools.append({ |
|
|
"type": "web_search", |
|
|
"search_context_size": "high" |
|
|
}) |
|
|
|
|
|
for sel, entry in zip(mcp_selected, mcp_servers): |
|
|
if sel: |
|
|
tools.extend(await get_tools_for_server(entry)) |
|
|
if not tools: |
|
|
tools = None |
|
|
|
|
|
if log_to_console: |
|
|
print(f"bot history: {str(history)}") |
|
|
|
|
|
instructions = None |
|
|
user_msg_parts = [] |
|
|
|
|
|
if system_prompt: |
|
|
if not system_prompt.startswith("Formatting re-enabled"): |
|
|
instructions = "Formatting re-enabled\n" + system_prompt |
|
|
else: |
|
|
instructions = system_prompt |
|
|
|
|
|
|
|
|
if message["text"]: |
|
|
user_msg_parts.append({"type": "input_text", "text": message["text"]}) |
|
|
if message["files"]: |
|
|
for file in message["files"]: |
|
|
user_msg_parts.extend(encode_file(file)) |
|
|
history_openai_format.append({"role": "user", "content": user_msg_parts}) |
|
|
user_msg_parts = [] |
|
|
|
|
|
if log_to_console: |
|
|
print(f"br_prompt: {str(history_openai_format)}") |
|
|
|
|
|
reasoner_models = {"o4-mini", "o3", "o3-pro", "gpt-5", "gpt-5-mini", "gpt-5-pro"} |
|
|
reasoner = model in reasoner_models |
|
|
|
|
|
assistant_msgs = [] |
|
|
whole_response = "" |
|
|
final_msg = None |
|
|
mcp_event_msg = None |
|
|
loop_tool_calling = True |
|
|
while loop_tool_calling: |
|
|
request_params = { |
|
|
"model": model, |
|
|
"input": history_openai_format, |
|
|
"store": False, |
|
|
"instructions": instructions |
|
|
} |
|
|
if reasoner: |
|
|
reasoning_dict = {"summary": "auto"} |
|
|
if reasoning_effort in ("low", "medium", "high"): |
|
|
reasoning_dict["effort"] = reasoning_effort |
|
|
else: |
|
|
reasoning_dict["effort"] = "medium" |
|
|
request_params["reasoning"] = reasoning_dict |
|
|
request_params["include"] = ["reasoning.encrypted_content"] |
|
|
else: |
|
|
request_params["temperature"] = temperature |
|
|
if model.startswith("gpt-5"): |
|
|
|
|
|
request_params["text"] = {"verbosity": verbosity} |
|
|
if tools: |
|
|
request_params["tools"] = tools |
|
|
request_params["tool_choice"] = "auto" |
|
|
if max_tokens > 0: |
|
|
request_params["max_output_tokens"] = max_tokens |
|
|
|
|
|
try: |
|
|
stream = client.responses.create(stream=True, **request_params) |
|
|
have_stream = True |
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
response = client.responses.create(stream=False, **request_params) |
|
|
stream = iter([SimpleNamespace(type="response.completed", response=response)]) |
|
|
have_stream = False |
|
|
|
|
|
loop_tool_calling = False |
|
|
for event in stream: |
|
|
if event.type == "response.output_text.delta": |
|
|
if final_msg is None: |
|
|
final_msg = gr.ChatMessage(role="assistant", content="") |
|
|
assistant_msgs.append(final_msg) |
|
|
whole_response += event.delta |
|
|
final_msg.content = whole_response |
|
|
yield assistant_msgs, history_openai_format |
|
|
elif event.type == "response.output_item.added" and event.item.type == "reasoning": |
|
|
summary = "" |
|
|
for str in event.item.summary: |
|
|
if str.type == "summary_text": |
|
|
summary += str.text |
|
|
if summary: |
|
|
rs_msg = gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=summary, |
|
|
metadata={"title": "Reasoning", "id": event.item.id, "status": "done"}, |
|
|
) |
|
|
assistant_msgs.append(rs_msg) |
|
|
yield assistant_msgs, history_openai_format |
|
|
elif event.type in ( |
|
|
"response.mcp_list_tools.in_progress", |
|
|
"response.mcp_call.in_progress", |
|
|
): |
|
|
mcp_event_msg = gr.ChatMessage( |
|
|
role="assistant", |
|
|
content="", |
|
|
metadata={ |
|
|
"title": event.type, |
|
|
"id": f"mcp-{getattr(event, 'sequence_number', '')}", |
|
|
"status": "pending", |
|
|
}, |
|
|
) |
|
|
assistant_msgs.append(mcp_event_msg) |
|
|
yield assistant_msgs, history_openai_format |
|
|
elif event.type in ( |
|
|
"response.mcp_list_tools.completed", |
|
|
"response.mcp_list_tools.failed", |
|
|
"response.mcp_call.completed", |
|
|
"response.mcp_call.failed", |
|
|
): |
|
|
if mcp_event_msg is not None: |
|
|
mcp_event_msg.metadata["status"] = "done" |
|
|
yield assistant_msgs, history_openai_format |
|
|
elif event.type == "response.completed": |
|
|
response = event.response |
|
|
outputs = response.output |
|
|
|
|
|
history_openai_format.extend(outputs) |
|
|
|
|
|
for output in outputs: |
|
|
if output.type == "message": |
|
|
for part in output.content: |
|
|
if part.type == "output_text": |
|
|
if not have_stream: |
|
|
if final_msg is None: |
|
|
final_msg = gr.ChatMessage(role="assistant", content="") |
|
|
assistant_msgs.append(final_msg) |
|
|
whole_response += part.text |
|
|
final_msg.content = whole_response |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
anns = part.annotations |
|
|
if anns: |
|
|
link_lines = [] |
|
|
for ann in anns: |
|
|
if ann.type == "url_citation": |
|
|
url = ann.url |
|
|
title = ann.title |
|
|
link_lines.append(f"- [{title}]({url})") |
|
|
if link_lines: |
|
|
link_lines = list(dict.fromkeys(link_lines)) |
|
|
whole_response += "\n\n**Citations:**\n" + "\n".join(link_lines) |
|
|
if final_msg is None: |
|
|
final_msg = gr.ChatMessage(role="assistant", content="") |
|
|
assistant_msgs.append(final_msg) |
|
|
final_msg.content = whole_response |
|
|
yield assistant_msgs, history_openai_format |
|
|
elif output.type == "function_call": |
|
|
|
|
|
function_name = output.name |
|
|
if function_name in function_to_mcp_map: |
|
|
try: |
|
|
mcp_info = function_to_mcp_map[function_name] |
|
|
server_name = mcp_info["server_name"] |
|
|
tool_name = mcp_info["tool_name"] |
|
|
|
|
|
|
|
|
server_entry = None |
|
|
for entry in mcp_servers: |
|
|
if entry["name"] == server_name: |
|
|
server_entry = entry |
|
|
break |
|
|
|
|
|
if server_entry: |
|
|
history_openai_format.append({ |
|
|
"type": "function_call", |
|
|
"name": function_name, |
|
|
"arguments": output.arguments, |
|
|
"call_id": output.call_id |
|
|
}) |
|
|
|
|
|
|
|
|
arguments = json.loads(output.arguments) |
|
|
call_id = output.call_id |
|
|
|
|
|
|
|
|
parent_msg = gr.ChatMessage( |
|
|
role="assistant", |
|
|
content="", |
|
|
metadata={"title": f"MCP: {server_name} - {tool_name}", "id": call_id, "status": "pending"}, |
|
|
) |
|
|
assistant_msgs.append(parent_msg) |
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` arguments\n{output.arguments}\n```", |
|
|
metadata={"title": "request", "parent_id": call_id}, |
|
|
) |
|
|
) |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
|
|
|
try: |
|
|
tool_result = await call_local_mcp_tool(server_entry, tool_name, arguments) |
|
|
|
|
|
if isinstance(tool_result, list) and tool_result and hasattr(tool_result[0], 'text'): |
|
|
result_text = "\n".join([item.text for item in tool_result]) |
|
|
elif hasattr(tool_result, 'text'): |
|
|
result_text = tool_result.text |
|
|
else: |
|
|
result_text = str(tool_result) |
|
|
|
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` result\n{result_text}\n```", |
|
|
metadata={"title": "response", "parent_id": call_id, "status": "done"}, |
|
|
) |
|
|
) |
|
|
parent_msg.metadata["status"] = "done" |
|
|
|
|
|
history_openai_format.append( |
|
|
{ |
|
|
"type": "function_call_output", |
|
|
"call_id": output.call_id, |
|
|
"output": result_text, |
|
|
} |
|
|
) |
|
|
yield assistant_msgs, history_openai_format |
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
history_openai_format.append({ |
|
|
"type": "function_call_output", |
|
|
"call_id": output.call_id, |
|
|
"output": json.dumps({"error": error_message}) |
|
|
}) |
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` error\n{error_message}\n```", |
|
|
metadata={"title": "response", "parent_id": call_id, "status": "done"}, |
|
|
) |
|
|
) |
|
|
parent_msg.metadata["status"] = "done" |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
|
|
|
loop_tool_calling = True |
|
|
else: |
|
|
|
|
|
error_message = f"Server {server_name} not found" |
|
|
history_openai_format.append({ |
|
|
"type": "function_call_output", |
|
|
"call_id": output.call_id, |
|
|
"output": json.dumps({"error": error_message}) |
|
|
}) |
|
|
except Exception as e: |
|
|
|
|
|
error_message = f"Error processing local MCP tool call: {str(e)}" |
|
|
history_openai_format.append({ |
|
|
"type": "function_call_output", |
|
|
"call_id": output.call_id, |
|
|
"output": json.dumps({"error": error_message}) |
|
|
}) |
|
|
elif output.name == "eval_python": |
|
|
try: |
|
|
history_openai_format.append({ |
|
|
"type": "function_call", |
|
|
"name": output.name, |
|
|
"arguments": output.arguments, |
|
|
"call_id": output.call_id |
|
|
}) |
|
|
|
|
|
parsed_args = json.loads(output.arguments) |
|
|
tool_script = parsed_args.get("python_source_code", "") |
|
|
call_id = output.call_id |
|
|
|
|
|
parent_msg = gr.ChatMessage( |
|
|
role="assistant", |
|
|
content="", |
|
|
metadata={"title": output.name, "id": call_id, "status": "pending"}, |
|
|
) |
|
|
assistant_msgs.append(parent_msg) |
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` script\n{tool_script}\n```", |
|
|
metadata={"title": "request", "parent_id": call_id}, |
|
|
) |
|
|
) |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
tool_result = eval_script(tool_script) |
|
|
result_text = ( |
|
|
tool_result["prints"] |
|
|
if tool_result["success"] |
|
|
else tool_result.get("error", "") |
|
|
) |
|
|
|
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` result\n{result_text}\n```", |
|
|
metadata={"title": "response", "parent_id": call_id, "status": "done"}, |
|
|
) |
|
|
) |
|
|
parent_msg.metadata["status"] = "done" |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
history_openai_format.append( |
|
|
{ |
|
|
"type": "function_call_output", |
|
|
"call_id": output.call_id, |
|
|
"output": json.dumps(tool_result), |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
history_openai_format.append({ |
|
|
"type": "function_call_output", |
|
|
"call_id": output.call_id, |
|
|
"output": { |
|
|
"toolResult": { |
|
|
"content": [{"text": e.args[0]}], |
|
|
"status": 'error' |
|
|
} |
|
|
} |
|
|
}) |
|
|
|
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` error\n{e.args[0]}\n```", |
|
|
metadata={"title": "response", "parent_id": call_id, "status": "done"}, |
|
|
) |
|
|
) |
|
|
parent_msg.metadata["status"] = "done" |
|
|
yield assistant_msgs, history_openai_format |
|
|
else: |
|
|
history_openai_format.append(outputs) |
|
|
|
|
|
loop_tool_calling = True |
|
|
elif output.type == "mcp_approval_request": |
|
|
history_openai_format.append(output) |
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=( |
|
|
f"MCP approval needed for {output.name} on {output.server_label} with arguments {output.arguments}." |
|
|
), |
|
|
options=[{"value": "y", "label": "Yes"}, {"value": "n", "label": "No"}], |
|
|
) |
|
|
) |
|
|
yield assistant_msgs, history_openai_format |
|
|
return |
|
|
elif output.type == "mcp_call": |
|
|
history_openai_format.append(_event_to_dict(output)) |
|
|
if getattr(output, "output", None) is not None: |
|
|
assistant_msgs.append( |
|
|
gr.ChatMessage( |
|
|
role="assistant", |
|
|
content=f"``` mcp_result\n{output.output}\n```", |
|
|
metadata={"title": "response"}, |
|
|
) |
|
|
) |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
if log_to_console: |
|
|
print(f"usage: {event.usage}") |
|
|
elif event.type == "response.incomplete": |
|
|
gr.Warning(f"Incomplete response, reason: {event.response.incomplete_details.reason}") |
|
|
if final_msg is None: |
|
|
final_msg = gr.ChatMessage(role="assistant", content="") |
|
|
assistant_msgs.append(final_msg) |
|
|
final_msg.content = whole_response |
|
|
yield assistant_msgs, history_openai_format |
|
|
|
|
|
if log_to_console: |
|
|
print(f"br_result: {str(history)}") |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Error: {str(e)}") |
|
|
|
|
|
def import_history_guarded(oai_key, history, file): |
|
|
|
|
|
try: |
|
|
client = OpenAI(api_key=oai_key) |
|
|
client.models.retrieve("gpt-4o") |
|
|
except Exception as e: |
|
|
raise gr.Error(f"OpenAI login error: {str(e)}") |
|
|
|
|
|
|
|
|
chat_history, system_prompt_value, history_openai_format = import_history(history, file) |
|
|
|
|
|
return chat_history, system_prompt_value, chat_history, history_openai_format |
|
|
|
|
|
with gr.Blocks(delete_cache=(86400, 86400)) as demo: |
|
|
gr.Markdown("# OpenAI™️ Chat (Nils' Version™️)") |
|
|
with gr.Accordion("Startup"): |
|
|
gr.Markdown("""Use of this interface permitted under the terms and conditions of the |
|
|
[MIT license](https://github.com/ndurner/oai_chat/blob/main/LICENSE). |
|
|
Third party terms and conditions apply, particularly |
|
|
those of the LLM vendor (OpenAI) and hosting provider (Hugging Face). This app and the AI models may make mistakes, so verify any outputs.""") |
|
|
|
|
|
oai_key = gr.Textbox(label="OpenAI API Key", elem_id="oai_key", value=os.environ.get("OPENAI_API_KEY")) |
|
|
model = gr.Dropdown(label="Model", value="gpt-5-mini", allow_custom_value=True, elem_id="model", |
|
|
choices=["gpt-5", "gpt-5-mini", "gpt-5-chat-latest", "gpt-5-pro", "gpt-4o", "gpt-4.1", "o3", "o3-pro", "o4-mini", "chatgpt-4o-latest", "gpt-4o-mini", "gpt-4-turbo", "whisper", "gpt-4o-transcribe-diarize", "gpt-image-1"]) |
|
|
reasoning_effort = gr.Dropdown(label="Reasoning Effort", value="medium", choices=["low", "medium", "high"], elem_id="reasoning_effort") |
|
|
verbosity = gr.Dropdown(label="Verbosity (GPT-5)", value="medium", choices=["low", "medium", "high"], elem_id="verbosity") |
|
|
system_prompt = gr.TextArea("You are a helpful yet diligent AI assistant. Answer faithfully and factually correct. Respond with 'I do not know' if uncertain.", label="System/Developer Prompt", lines=3, max_lines=250, elem_id="system_prompt") |
|
|
temp = gr.Slider(0, 2, label="Temperature", elem_id="temp", value=1) |
|
|
max_tokens = gr.Slider(0, 16384, label="Max. Tokens", elem_id="max_tokens", value=0) |
|
|
python_use = gr.Checkbox(label="Python Use", value=False) |
|
|
web_search = gr.Checkbox(label="Web Search", value=False) |
|
|
mcp_boxes = [] |
|
|
for entry in mcp_servers: |
|
|
label = f"MCP: {entry.get('server_label', entry.get('name'))}" |
|
|
mcp_boxes.append(gr.Checkbox(label=label, value=False)) |
|
|
save_button = gr.Button("Save Settings") |
|
|
load_button = gr.Button("Load Settings") |
|
|
dl_settings_button = gr.Button("Download Settings") |
|
|
ul_settings_button = gr.Button("Upload Settings") |
|
|
|
|
|
load_button.click(load_settings, js=""" |
|
|
() => { |
|
|
let elems = ['#oai_key textarea', '#system_prompt textarea', '#temp input', '#max_tokens input', '#model']; |
|
|
elems.forEach(elem => { |
|
|
let item = document.querySelector(elem); |
|
|
let event = new InputEvent('input', { bubbles: true }); |
|
|
item.value = localStorage.getItem(elem.split(" ")[0].slice(1)) || ''; |
|
|
item.dispatchEvent(event); |
|
|
}); |
|
|
const re = document.querySelector('#reasoning_effort'); |
|
|
if (re) { const ev = new InputEvent('input', { bubbles: true }); re.value = localStorage.getItem('reasoning_effort') || 'medium'; re.dispatchEvent(ev); } |
|
|
const vb = document.querySelector('#verbosity'); |
|
|
if (vb) { const ev2 = new InputEvent('input', { bubbles: true }); vb.value = localStorage.getItem('verbosity') || 'medium'; vb.dispatchEvent(ev2); } |
|
|
// Backwards compatibility: map pseudo model names to base + reasoning effort |
|
|
(function() { |
|
|
const modelEl = document.querySelector('#model'); |
|
|
const effortEl = document.querySelector('#reasoning_effort'); |
|
|
if (!modelEl || !effortEl) return; |
|
|
const m = modelEl.value; |
|
|
const mapping = { |
|
|
'o1-high': {base: 'o1', effort: 'high'}, |
|
|
'o3-mini-high': {base: 'o3-mini', effort: 'high'}, |
|
|
'o4-mini-high': {base: 'o4-mini', effort: 'high'}, |
|
|
'o3-high': {base: 'o3', effort: 'high'}, |
|
|
'o3-low': {base: 'o3', effort: 'low'} |
|
|
}; |
|
|
if (mapping[m]) { |
|
|
const evM = new InputEvent('input', { bubbles: true }); |
|
|
modelEl.value = mapping[m].base; |
|
|
modelEl.dispatchEvent(evM); |
|
|
const evE = new InputEvent('input', { bubbles: true }); |
|
|
effortEl.value = mapping[m].effort; |
|
|
effortEl.dispatchEvent(evE); |
|
|
try { localStorage.setItem('model', mapping[m].base); } catch (e) {} |
|
|
try { localStorage.setItem('reasoning_effort', mapping[m].effort); } catch (e) {} |
|
|
} |
|
|
})(); |
|
|
} |
|
|
""") |
|
|
|
|
|
save_button.click(save_settings, [oai_key, system_prompt, temp, max_tokens, model], js=""" |
|
|
(oai, sys, temp, ntok, model) => { |
|
|
localStorage.setItem('oai_key', oai); |
|
|
localStorage.setItem('system_prompt', sys); |
|
|
localStorage.setItem('temp', document.querySelector('#temp input').value); |
|
|
localStorage.setItem('max_tokens', document.querySelector('#max_tokens input').value); |
|
|
localStorage.setItem('model', model); |
|
|
const re = document.querySelector('#reasoning_effort'); |
|
|
if (re) localStorage.setItem('reasoning_effort', re.value || 'medium'); |
|
|
const vb = document.querySelector('#verbosity'); |
|
|
if (vb) localStorage.setItem('verbosity', vb.value || 'medium'); |
|
|
} |
|
|
""") |
|
|
|
|
|
control_ids = [('oai_key', '#oai_key textarea'), |
|
|
('system_prompt', '#system_prompt textarea'), |
|
|
('temp', '#temp input'), |
|
|
('max_tokens', '#max_tokens input'), |
|
|
('model', '#model'), |
|
|
('reasoning_effort', '#reasoning_effort'), |
|
|
('verbosity', '#verbosity')] |
|
|
controls = [oai_key, system_prompt, temp, max_tokens, model, reasoning_effort, verbosity, python_use, web_search] + mcp_boxes |
|
|
|
|
|
dl_settings_button.click(None, controls, js=generate_download_settings_js("oai_chat_settings.bin", control_ids)) |
|
|
ul_settings_button.click(None, None, None, js=generate_upload_settings_js(control_ids)) |
|
|
|
|
|
history_openai_format = gr.State([]) |
|
|
chat = gr.ChatInterface( |
|
|
fn=bot, |
|
|
multimodal=True, |
|
|
additional_inputs=[history_openai_format] + controls, |
|
|
additional_outputs=[history_openai_format], |
|
|
autofocus=False, |
|
|
type="messages", |
|
|
chatbot=gr.Chatbot(elem_id="chatbot", type="messages"), |
|
|
textbox=gr.MultimodalTextbox(elem_id="chat_input"), |
|
|
) |
|
|
chat.textbox.file_count = "multiple" |
|
|
chat.textbox.max_plain_text_length = 2**31 |
|
|
chat.textbox.max_lines = 10 |
|
|
chatbot = chat.chatbot |
|
|
chatbot.show_copy_button = True |
|
|
chatbot.height = 450 |
|
|
|
|
|
|
|
|
chatbot.clear( |
|
|
fn=clear_both_histories, |
|
|
outputs=[chatbot, history_openai_format] |
|
|
) |
|
|
|
|
|
chatbot.undo( |
|
|
fn=undo_both_histories, |
|
|
inputs=[chatbot, history_openai_format], |
|
|
outputs=[chatbot, history_openai_format] |
|
|
) |
|
|
|
|
|
chatbot.retry( |
|
|
fn=retry_last_message, |
|
|
inputs=[chatbot, history_openai_format], |
|
|
outputs=[chatbot, history_openai_format] |
|
|
) |
|
|
|
|
|
|
|
|
if dump_controls: |
|
|
with gr.Row(): |
|
|
dmp_btn = gr.Button("Dump") |
|
|
txt_dmp = gr.Textbox("Dump") |
|
|
dmp_btn.click(dump, inputs=[chatbot], outputs=[txt_dmp]) |
|
|
|
|
|
with gr.Accordion("Import/Export", open = False): |
|
|
import_button = gr.UploadButton("History Import") |
|
|
export_button = gr.Button("History Export") |
|
|
export_button.click(lambda: None, [chatbot, system_prompt], js=get_export_js()) |
|
|
dl_button = gr.Button("File download") |
|
|
dl_button.click(lambda: None, [chatbot], js=""" |
|
|
(chat_history) => { |
|
|
const languageToExt = { |
|
|
'python': 'py', |
|
|
'javascript': 'js', |
|
|
'typescript': 'ts', |
|
|
'csharp': 'cs', |
|
|
'ruby': 'rb', |
|
|
'shell': 'sh', |
|
|
'bash': 'sh', |
|
|
'markdown': 'md', |
|
|
'yaml': 'yml', |
|
|
'rust': 'rs', |
|
|
'golang': 'go', |
|
|
'kotlin': 'kt' |
|
|
}; |
|
|
|
|
|
const contentRegex = /```(?:([^\\n]+)?\\n)?([\\s\\S]*?)```/; |
|
|
const match = contentRegex.exec(chat_history[chat_history.length - 1][1]); |
|
|
|
|
|
if (match && match[2]) { |
|
|
const specifier = match[1] ? match[1].trim() : ''; |
|
|
const content = match[2]; |
|
|
|
|
|
let filename = 'download'; |
|
|
let fileExtension = 'txt'; // default |
|
|
|
|
|
if (specifier) { |
|
|
if (specifier.includes('.')) { |
|
|
// If specifier contains a dot, treat it as a filename |
|
|
const parts = specifier.split('.'); |
|
|
filename = parts[0]; |
|
|
fileExtension = parts[1]; |
|
|
} else { |
|
|
// Use mapping if exists, otherwise use specifier itself |
|
|
const langLower = specifier.toLowerCase(); |
|
|
fileExtension = languageToExt[langLower] || langLower; |
|
|
filename = 'code'; |
|
|
} |
|
|
} |
|
|
|
|
|
const blob = new Blob([content], {type: 'text/plain'}); |
|
|
const url = URL.createObjectURL(blob); |
|
|
const a = document.createElement('a'); |
|
|
a.href = url; |
|
|
a.download = `${filename}.${fileExtension}`; |
|
|
document.body.appendChild(a); |
|
|
a.click(); |
|
|
document.body.removeChild(a); |
|
|
URL.revokeObjectURL(url); |
|
|
} |
|
|
} |
|
|
""") |
|
|
import_button.upload(import_history_guarded, |
|
|
inputs=[oai_key, chatbot, import_button], |
|
|
outputs=[chatbot, system_prompt, chat.chatbot_state, history_openai_format]) |
|
|
|
|
|
demo.queue(default_concurrency_limit = None).launch() |
|
|
|