Quardo's picture
Update app.py
562ad37 verified
raw
history blame
11.2 kB
from starlette.responses import HTMLResponse
from fastapi import FastAPI, Request
from typing import List
import gradio as gr
import requests
import argparse
import aiohttp
import uvicorn
import random
import string
import json
import math
import sys
import os
API_BASE = "env"
api_key = os.environ['API_KEY']
oai_api_key = os.environ['OPENAI_API_KEY']
base_url = os.environ.get('OPENAI_BASE_URL', "https://api.openai.com/v1")
def_models = '["gpt-4", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"]'
def checkModels():
global base_url
if API_BASE == "env":
try:
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {get_api_key()}"})
response.raise_for_status()
if not ('data' in response.json()):
base_url = "https://api.openai.com/v1"
api_key = oai_api_key
except Exception as e:
print(f"Error testing API endpoint: {e}")
else:
base_url = "https://api.openai.com/v1"
api_key = oai_api_key
def loadModels():
global models, modelList
models = json.loads(def_models)
models = sorted(models)
modelList = {
"object": "list",
"data": [{"id": v, "object": "model", "created": 0, "owned_by": "system"} for v in models]
}
def handleApiKeys():
global api_key
if ',' in api_key:
output = []
for key in api_key.split(','):
try:
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {key}"})
response.raise_for_status()
if ('data' in response.json()):
output.append(key)
except Exception as e:
print((F"API key {key} is not valid or an actuall error happend {e}"))
if len(output)==1:
raise RuntimeError("No API key is working")
api_key = ",".join(output)
else:
try:
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"})
response.raise_for_status()
if not ('data' in response.json()):
raise RuntimeError("Current API key is not valid")
except Exception as e:
raise RuntimeError(f"Current API key is not valid or an actual error happened: {e}")
def encodeChat(messages):
output = []
for message in messages:
role = message['role']
name = f" [{message['name']}]" if 'name' in message else ''
content = message['content']
formatted_message = f"<|im_start|>{role}{name}\n{content}<|end_of_text|>"
output.append(formatted_message)
return "\n".join(output)
def get_api_key(call='api_key'):
if call == 'api_key':
key = api_key
elif call == 'oai_api_key':
key = oai_api_key
else:
key = api_key
if ',' in key:
return random.choice(key.split(','))
return key
def moderate(messages):
try:
response = requests.post(
f"{base_url}/moderations",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {get_api_key(call='api_key')}"
},
json={"input": encodeChat(messages)}
)
response.raise_for_status()
moderation_result = response.json()
except requests.exceptions.RequestException as e:
print(f"Error during moderation request to {base_url}: {e}")
try:
response = requests.post(
"https://api.openai.com/v1/moderations",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {get_api_key(call='oai_api_key')}"
},
json={"input": encodeChat(messages)}
)
response.raise_for_status()
moderation_result = response.json()
except requests.exceptions.RequestException as e:
print(f"Error during moderation request to fallback URL: {e}")
return False
try:
if any(result["flagged"] for result in moderation_result["results"]):
return moderation_result
except KeyError:
if moderation_result["flagged"]:
return moderation_result
return False
async def streamChat(params):
async with aiohttp.ClientSession() as session:
try:
async with session.post(f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='api_key')}", "Content-Type": "application/json"}, json=params) as r:
r.raise_for_status()
async for line in r.content:
if line:
line_str = line.decode('utf-8')
if line_str.startswith("data: "):
line_str = line_str[6:].strip()
if line_str == "[DONE]":
continue
try:
message = json.loads(line_str)
yield message
except json.JSONDecodeError:
continue
except aiohttp.ClientError:
try:
async with session.post("https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='oai_api_key')}", "Content-Type": "application/json"}, json=params) as r:
r.raise_for_status()
async for line in r.content:
if line:
line_str = line.decode('utf-8')
if line_str.startswith("data: "):
line_str = line_str[6:].strip()
if line_str == "[DONE]":
continue
try:
message = json.loads(line_str)
yield message
except json.JSONDecodeError:
continue
except aiohttp.ClientError:
return
def rnd(length=8):
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for i in range(length))
async def respond(
message,
history: list[tuple[str, str]],
model_name,
max_tokens,
temperature,
top_p,
):
messages = [];
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
if message:
mode = moderate(messages)
if mode:
reasons = []
categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {})
for category, flagged in categories.items():
if flagged:
reasons.append(category)
if reasons:
yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```"
else:
yield "[MODERATION] I'm sorry, but I can't assist with that."
return
response = ""
async for token in streamChat({
"model": model_name,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"user": rnd(),
"stream": True
}):
response += token['choices'][0]['delta'].get("content", token['choices'][0]['delta'].get("refusal", ""))
yield response
handleApiKeys();loadModels();checkModels();
demo = gr.ChatInterface(
respond,
title="gpt-4o-mini-small",
description=f"This is the smaller version of quardo/gpt-4o-small space.<br/>Mainly exists when the main space is down.",
additional_inputs=[
gr.Dropdown(choices=models, value="gpt-4o-mini", label="Model"),
gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"),
gr.Slider(minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
css="footer{display:none !important}",
head="""<script>if(!confirm("By using our application, which integrates with OpenAI's API, you acknowledge and agree to the following terms regarding the data you provide:\\n\\n1. Data Collection: This application may log the following data through the Gradio endpoint or the API endpoint: message requests (including messages, responses, model settings, and images sent along with the messages), images that were generated (including only the prompt and the image), search tool calls (including query, search results, summaries, and output responses), and moderation checks (including input and output).\\n2. Data Retention and Removal: Data is retained until further notice or until a specific request for removal is made.\\n3. Data Usage: The collected data may be used for various purposes, including but not limited to, administrative review of logs, AI training, and publication as a dataset.\\n4. Privacy: Please avoid sharing any personal information.\\n\\nBy continuing to use our application, you explicitly consent to the collection, use, and potential sharing of your data as described above. If you disagree with our data collection, usage, and sharing practices, we advise you not to use our application."))location.href="/declined";</script>"""
)
app = FastAPI()
@app.get("/declined")
def test():
return HTMLResponse(content="""
<html>
<head>
<title>Declined</title>
</head>
<body>
<p>Ok, you can go back to Hugging Face. I just didn't have any idea how to handle decline so you are redirected here.</p><br/>
<a href="/">Go back</button>
</body>
</html>
""")
app = gr.mount_gradio_app(app, demo, path="/")
class ArgParser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super(ArgParser, self).__init__(*args, **kwargs)
self.add_argument("-s", "--server", type=str, default="0.0.0.0")
self.add_argument("-p", "--port", type=int, default=7860)
self.add_argument("-d", "--dev", default=False, action="store_true")
self.args = self.parse_args(sys.argv[1:])
if __name__ == "__main__":
args = ArgParser().args
if args.dev:
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True)
else:
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False)