from fastapi import FastAPI, Request
from typing import List
import gradio as gr
import requests
import argparse
import aiohttp
import uvicorn
import random
import string
import json
import math
import sys
import os
API_BASE = "env"
api_key = os.environ['OPENAI_API_KEY']
base_url = os.environ.get('OPENAI_BASE_URL', "https://api.openai.com/v1")
def_models = '["gpt-4", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"]'
def checkModels():
global base_url
if API_BASE == "env":
try:
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {get_api_key()}"})
response.raise_for_status()
if not ('data' in response.json()):
base_url = "https://api.openai.com/v1"
api_key = oai_api_key
except Exception as e:
print(f"Error testing API endpoint: {e}")
else:
base_url = "https://api.openai.com/v1"
api_key = oai_api_key
def loadModels():
global models, modelList
models = json.loads(def_models)
models = sorted(models)
modelList = {
"object": "list",
"data": [{"id": v, "object": "model", "created": 0, "owned_by": "system"} for v in models]
}
def handleApiKeys():
global api_key
if ',' in api_key:
output = []
for key in api_key.split(','):
try:
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {key}"})
response.raise_for_status()
if ('data' in response.json()):
output.append(key)
except Exception as e:
print((F"API key {key} is not valid or an actuall error happend {e}"))
if len(output)==1:
raise RuntimeError("No API key is working")
api_key = ",".join(output)
else:
try:
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"})
response.raise_for_status()
if not ('data' in response.json()):
raise RuntimeError("Current API key is not valid")
except Exception as e:
raise RuntimeError(f"Current API key is not valid or an actual error happened: {e}")
def encodeChat(messages):
output = []
for message in messages:
role = message['role']
name = f" [{message['name']}]" if 'name' in message else ''
content = message['content']
formatted_message = f"<|im_start|>{role}{name}\n{content}<|end_of_text|>"
output.append(formatted_message)
return "\n".join(output)
def get_api_key(call='api_key'):
if call == 'api_key':
key = api_key
elif call == 'oai_api_key':
key = oai_api_key
else:
key = api_key
if ',' in key:
return random.choice(key.split(','))
return key
def moderate(messages):
try:
response = requests.post(
f"{base_url}/moderations",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {get_api_key(call='api_key')}"
},
json={"input": encodeChat(messages)}
)
response.raise_for_status()
moderation_result = response.json()
except requests.exceptions.RequestException as e:
print(f"Error during moderation request to {base_url}: {e}")
try:
response = requests.post(
"https://api.openai.com/v1/moderations",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {get_api_key(call='oai_api_key')}"
},
json={"input": encodeChat(messages)}
)
response.raise_for_status()
moderation_result = response.json()
except requests.exceptions.RequestException as e:
print(f"Error during moderation request to fallback URL: {e}")
return False
try:
if any(result["flagged"] for result in moderation_result["results"]):
return moderation_result
except KeyError:
if moderation_result["flagged"]:
return moderation_result
return False
async def streamChat(params):
async with aiohttp.ClientSession() as session:
try:
async with session.post(f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='api_key')}", "Content-Type": "application/json"}, json=params) as r:
r.raise_for_status()
async for line in r.content:
if line:
line_str = line.decode('utf-8')
if line_str.startswith("data: "):
line_str = line_str[6:].strip()
if line_str == "[DONE]":
continue
try:
message = json.loads(line_str)
yield message
except json.JSONDecodeError:
continue
except aiohttp.ClientError:
try:
async with session.post("https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='oai_api_key')}", "Content-Type": "application/json"}, json=params) as r:
r.raise_for_status()
async for line in r.content:
if line:
line_str = line.decode('utf-8')
if line_str.startswith("data: "):
line_str = line_str[6:].strip()
if line_str == "[DONE]":
continue
try:
message = json.loads(line_str)
yield message
except json.JSONDecodeError:
continue
except aiohttp.ClientError:
return
def rnd(length=8):
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for i in range(length))
async def respond(
message,
history: list[tuple[str, str]],
model_name,
max_tokens,
temperature,
top_p,
):
messages = [];
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
if message:
mode = moderate(messages)
if mode:
reasons = []
categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {})
for category, flagged in categories.items():
if flagged:
reasons.append(category)
if reasons:
yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```"
else:
yield "[MODERATION] I'm sorry, but I can't assist with that."
return
async def handleResponse(completion, prefix="", image_count=0, didSearchedAlready=False):
response = ""
isRequeryNeeded = False
async for token in completion:
response += token['choices'][0]['delta'].get("content", token['choices'][0]['delta'].get("refusal", ""))
yield f"{prefix}{response}"
mode = moderate([handleMultimodalData(model_name, "user", message),{"role": "assistant", "content": response}])
if mode:
reasons = []
categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {})
for category, flagged in categories.items():
if flagged:
reasons.append(category)
if reasons:
yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```"
else:
yield "[MODERATION] I'm sorry, but I can't assist with that."
return
for line in response.split('\n'):
try:
data = json.loads(line)
if isinstance(data, dict) and data.get("tool") == "imagine" and data.get("isCall") and "prompt" in data:
if image_count < 4:
image_count += 1
def fetch_image_url(prompt, line):
image_url = imagine(prompt)
return line, f''
def replace_line_in_response(line, replacement):
nonlocal response
response = response.replace(line, replacement)
thread = threading.Thread(target=lambda: replace_line_in_response(*fetch_image_url(data["prompt"], line)))
thread.start()
thread.join()
else:
response = response.replace(line, f'[System: 4 image per message limit; prompt asked: `{data["prompt"]}]`')
yield f"{prefix}{response}"
elif isinstance(data, dict) and data.get("tool") == "calc" and data.get("isCall") and "prompt" in data:
isRequeryNeeded = True
try:
result = safe_eval(data["prompt"])
response = response.replace(line, f'[System: `{data["prompt"]}` === `{result}`]')
except Exception as e:
response = response.replace(line, f'[System: Error in calculation; `{e}`]')
yield f"{prefix}{response}"
elif isinstance(data, dict) and data.get("tool") == "search" and data.get("isCall") and "prompt" in data:
isRequeryNeeded = True
if didSearchedAlready:
response = response.replace(line, f'[System: One search per response is allowed; due to how long and resource it takes; query: `{data["prompt"]}]`]')
else:
try:
result = searchEngine(data["prompt"])
result_escaped = result.replace('`', '\\`')
response = response.replace(line, f'[System: `{data["prompt"]}` ===\n```\n{result_escaped}\n```\n]')
didSearchedAlready = True
except Exception as e:
response = response.replace(line, f'[System: Error in search function; `{e}`]')
yield f"{prefix}{response}"
yield f"{prefix}{response}"
except (json.JSONDecodeError, AttributeError, Exception):
continue
if isRequeryNeeded:
messages.append({"role": "assistant", "content": response})
async for res in handleResponse(streamChat({
"model": model_name,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"user": rnd(),
"stream": True
}), f"{prefix}{response}\n\n", image_count, didSearchedAlready):
yield res
async for res in handleResponse(streamChat({
"model": model_name,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"user": rnd(),
"stream": True
})):
yield res
handleApiKeys();loadModels();checkModels();
demo = gr.ChatInterface(
respond,
title="gpt-4o-mini-small",
description=f"This is the smaller version of quardo/gpt-4o-small space.
Mainly exists when the main space is down.",
additional_inputs=[
gr.Dropdown(choices=models, value="gpt-4o-mini", label="Model"),
gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"),
gr.Slider(minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
css="footer{display:none !important}",
head=""""""
)
app = FastAPI()
@app.get("/declined")
def test():
return HTMLResponse(content="""
Ok, you can go back to Hugging Face. I just didn't have any idea how to handle decline so you are redirected here.