Spaces:
Running
Running
from starlette.responses import HTMLResponse | |
from fastapi import FastAPI, Request | |
from typing import List | |
import gradio as gr | |
import requests | |
import argparse | |
import aiohttp | |
import uvicorn | |
import random | |
import string | |
import json | |
import math | |
import sys | |
import os | |
API_BASE = "env" | |
api_key = os.environ['API_KEY'] | |
oai_api_key = os.environ['OPENAI_API_KEY'] | |
base_url = os.environ.get('OPENAI_BASE_URL', "https://api.openai.com/v1") | |
def_models = '["gpt-4", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"]' | |
def checkModels(): | |
global base_url | |
if API_BASE == "env": | |
try: | |
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {get_api_key()}"}) | |
response.raise_for_status() | |
if not ('data' in response.json()): | |
base_url = "https://api.openai.com/v1" | |
api_key = oai_api_key | |
except Exception as e: | |
print(f"Error testing API endpoint: {e}") | |
else: | |
base_url = "https://api.openai.com/v1" | |
api_key = oai_api_key | |
def loadModels(): | |
global models, modelList | |
models = json.loads(def_models) | |
models = sorted(models) | |
modelList = { | |
"object": "list", | |
"data": [{"id": v, "object": "model", "created": 0, "owned_by": "system"} for v in models] | |
} | |
def handleApiKeys(): | |
global api_key | |
if ',' in api_key: | |
output = [] | |
for key in api_key.split(','): | |
try: | |
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {key}"}) | |
response.raise_for_status() | |
if ('data' in response.json()): | |
output.append(key) | |
except Exception as e: | |
print((F"API key {key} is not valid or an actuall error happend {e}")) | |
if len(output)==1: | |
raise RuntimeError("No API key is working") | |
api_key = ",".join(output) | |
else: | |
try: | |
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"}) | |
response.raise_for_status() | |
if not ('data' in response.json()): | |
raise RuntimeError("Current API key is not valid") | |
except Exception as e: | |
raise RuntimeError(f"Current API key is not valid or an actual error happened: {e}") | |
def encodeChat(messages): | |
output = [] | |
for message in messages: | |
role = message['role'] | |
name = f" [{message['name']}]" if 'name' in message else '' | |
content = message['content'] | |
formatted_message = f"<|im_start|>{role}{name}\n{content}<|end_of_text|>" | |
output.append(formatted_message) | |
return "\n".join(output) | |
def get_api_key(call='api_key'): | |
if call == 'api_key': | |
key = api_key | |
elif call == 'oai_api_key': | |
key = oai_api_key | |
else: | |
key = api_key | |
if ',' in key: | |
return random.choice(key.split(',')) | |
return key | |
def moderate(messages): | |
try: | |
response = requests.post( | |
f"{base_url}/moderations", | |
headers={ | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {get_api_key(call='api_key')}" | |
}, | |
json={"input": encodeChat(messages)} | |
) | |
response.raise_for_status() | |
moderation_result = response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Error during moderation request to {base_url}: {e}") | |
try: | |
response = requests.post( | |
"https://api.openai.com/v1/moderations", | |
headers={ | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {get_api_key(call='oai_api_key')}" | |
}, | |
json={"input": encodeChat(messages)} | |
) | |
response.raise_for_status() | |
moderation_result = response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Error during moderation request to fallback URL: {e}") | |
return False | |
try: | |
if any(result["flagged"] for result in moderation_result["results"]): | |
return moderation_result | |
except KeyError: | |
if moderation_result["flagged"]: | |
return moderation_result | |
return False | |
async def streamChat(params): | |
async with aiohttp.ClientSession() as session: | |
try: | |
async with session.post(f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='api_key')}", "Content-Type": "application/json"}, json=params) as r: | |
r.raise_for_status() | |
async for line in r.content: | |
if line: | |
line_str = line.decode('utf-8') | |
if line_str.startswith("data: "): | |
line_str = line_str[6:].strip() | |
if line_str == "[DONE]": | |
continue | |
try: | |
message = json.loads(line_str) | |
yield message | |
except json.JSONDecodeError: | |
continue | |
except aiohttp.ClientError: | |
try: | |
async with session.post("https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='oai_api_key')}", "Content-Type": "application/json"}, json=params) as r: | |
r.raise_for_status() | |
async for line in r.content: | |
if line: | |
line_str = line.decode('utf-8') | |
if line_str.startswith("data: "): | |
line_str = line_str[6:].strip() | |
if line_str == "[DONE]": | |
continue | |
try: | |
message = json.loads(line_str) | |
yield message | |
except json.JSONDecodeError: | |
continue | |
except aiohttp.ClientError: | |
return | |
def rnd(length=8): | |
letters = string.ascii_letters + string.digits | |
return ''.join(random.choice(letters) for i in range(length)) | |
async def respond( | |
message, | |
history: list[tuple[str, str]], | |
model_name, | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
messages = []; | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
messages.append({"role": "user", "content": message}) | |
if message: | |
mode = moderate(messages) | |
if mode: | |
reasons = [] | |
categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {}) | |
for category, flagged in categories.items(): | |
if flagged: | |
reasons.append(category) | |
if reasons: | |
yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```" | |
else: | |
yield "[MODERATION] I'm sorry, but I can't assist with that." | |
return | |
response = "" | |
async for token in streamChat({ | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"user": rnd(), | |
"stream": True | |
}): | |
response += token['choices'][0]['delta'].get("content", token['choices'][0]['delta'].get("refusal", "")) | |
yield response | |
handleApiKeys();loadModels();checkModels(); | |
demo = gr.ChatInterface( | |
respond, | |
title="gpt-4o-mini-small", | |
description=f"This is the smaller version of quardo/gpt-4o-small space.<br/>Mainly exists when the main space is down.", | |
additional_inputs=[ | |
gr.Dropdown(choices=models, value="gpt-4o-mini", label="Model"), | |
gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"), | |
gr.Slider(minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), | |
], | |
css="footer{display:none !important}", | |
head="""<script>if(!confirm("By using our application, which integrates with OpenAI's API, you acknowledge and agree to the following terms regarding the data you provide:\\n\\n1. Data Collection: This application may log the following data through the Gradio endpoint or the API endpoint: message requests (including messages, responses, model settings, and images sent along with the messages), images that were generated (including only the prompt and the image), search tool calls (including query, search results, summaries, and output responses), and moderation checks (including input and output).\\n2. Data Retention and Removal: Data is retained until further notice or until a specific request for removal is made.\\n3. Data Usage: The collected data may be used for various purposes, including but not limited to, administrative review of logs, AI training, and publication as a dataset.\\n4. Privacy: Please avoid sharing any personal information.\\n\\nBy continuing to use our application, you explicitly consent to the collection, use, and potential sharing of your data as described above. If you disagree with our data collection, usage, and sharing practices, we advise you not to use our application."))location.href="/declined";</script>""" | |
) | |
app = FastAPI() | |
def test(): | |
return HTMLResponse(content=""" | |
<html> | |
<head> | |
<title>Declined</title> | |
</head> | |
<body> | |
<p>Ok, you can go back to Hugging Face. I just didn't have any idea how to handle decline so you are redirected here.</p><br/> | |
<a href="/">Go back</button> | |
</body> | |
</html> | |
""") | |
app = gr.mount_gradio_app(app, demo, path="/") | |
class ArgParser(argparse.ArgumentParser): | |
def __init__(self, *args, **kwargs): | |
super(ArgParser, self).__init__(*args, **kwargs) | |
self.add_argument("-s", "--server", type=str, default="0.0.0.0") | |
self.add_argument("-p", "--port", type=int, default=7860) | |
self.add_argument("-d", "--dev", default=False, action="store_true") | |
self.args = self.parse_args(sys.argv[1:]) | |
if __name__ == "__main__": | |
args = ArgParser().args | |
if args.dev: | |
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True) | |
else: | |
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False) | |