from fastapi import FastAPI, Request from typing import List import gradio as gr import requests import argparse import aiohttp import uvicorn import random import string import json import math import sys import os API_BASE = "env" api_key = os.environ['OPENAI_API_KEY'] base_url = os.environ.get('OPENAI_BASE_URL', "https://api.openai.com/v1") def_models = '["gpt-4", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"]' def checkModels(): global base_url if API_BASE == "env": try: response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {get_api_key()}"}) response.raise_for_status() if not ('data' in response.json()): base_url = "https://api.openai.com/v1" api_key = oai_api_key except Exception as e: print(f"Error testing API endpoint: {e}") else: base_url = "https://api.openai.com/v1" api_key = oai_api_key def loadModels(): global models, modelList models = json.loads(def_models) models = sorted(models) modelList = { "object": "list", "data": [{"id": v, "object": "model", "created": 0, "owned_by": "system"} for v in models] } def handleApiKeys(): global api_key if ',' in api_key: output = [] for key in api_key.split(','): try: response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {key}"}) response.raise_for_status() if ('data' in response.json()): output.append(key) except Exception as e: print((F"API key {key} is not valid or an actuall error happend {e}")) if len(output)==1: raise RuntimeError("No API key is working") api_key = ",".join(output) else: try: response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"}) response.raise_for_status() if not ('data' in response.json()): raise RuntimeError("Current API key is not valid") except Exception as e: raise RuntimeError(f"Current API key is not valid or an actual error happened: {e}") def encodeChat(messages): output = [] for message in messages: role = message['role'] name = f" [{message['name']}]" if 'name' in message else '' content = message['content'] formatted_message = f"<|im_start|>{role}{name}\n{content}<|end_of_text|>" output.append(formatted_message) return "\n".join(output) def get_api_key(call='api_key'): if call == 'api_key': key = api_key elif call == 'oai_api_key': key = oai_api_key else: key = api_key if ',' in key: return random.choice(key.split(',')) return key def moderate(messages): try: response = requests.post( f"{base_url}/moderations", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {get_api_key(call='api_key')}" }, json={"input": encodeChat(messages)} ) response.raise_for_status() moderation_result = response.json() except requests.exceptions.RequestException as e: print(f"Error during moderation request to {base_url}: {e}") try: response = requests.post( "https://api.openai.com/v1/moderations", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {get_api_key(call='oai_api_key')}" }, json={"input": encodeChat(messages)} ) response.raise_for_status() moderation_result = response.json() except requests.exceptions.RequestException as e: print(f"Error during moderation request to fallback URL: {e}") return False try: if any(result["flagged"] for result in moderation_result["results"]): return moderation_result except KeyError: if moderation_result["flagged"]: return moderation_result return False async def streamChat(params): async with aiohttp.ClientSession() as session: try: async with session.post(f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='api_key')}", "Content-Type": "application/json"}, json=params) as r: r.raise_for_status() async for line in r.content: if line: line_str = line.decode('utf-8') if line_str.startswith("data: "): line_str = line_str[6:].strip() if line_str == "[DONE]": continue try: message = json.loads(line_str) yield message except json.JSONDecodeError: continue except aiohttp.ClientError: try: async with session.post("https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='oai_api_key')}", "Content-Type": "application/json"}, json=params) as r: r.raise_for_status() async for line in r.content: if line: line_str = line.decode('utf-8') if line_str.startswith("data: "): line_str = line_str[6:].strip() if line_str == "[DONE]": continue try: message = json.loads(line_str) yield message except json.JSONDecodeError: continue except aiohttp.ClientError: return def rnd(length=8): letters = string.ascii_letters + string.digits return ''.join(random.choice(letters) for i in range(length)) async def respond( message, history: list[tuple[str, str]], model_name, max_tokens, temperature, top_p, ): messages = []; for val in history: if val[0]: messages.append({"role": "user", "content": val[0]}) if val[1]: messages.append({"role": "assistant", "content": val[1]}) messages.append({"role": "user", "content": message}) if message: mode = moderate(messages) if mode: reasons = [] categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {}) for category, flagged in categories.items(): if flagged: reasons.append(category) if reasons: yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```" else: yield "[MODERATION] I'm sorry, but I can't assist with that." return async def handleResponse(completion, prefix="", image_count=0, didSearchedAlready=False): response = "" isRequeryNeeded = False async for token in completion: response += token['choices'][0]['delta'].get("content", token['choices'][0]['delta'].get("refusal", "")) yield f"{prefix}{response}" mode = moderate([handleMultimodalData(model_name, "user", message),{"role": "assistant", "content": response}]) if mode: reasons = [] categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {}) for category, flagged in categories.items(): if flagged: reasons.append(category) if reasons: yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```" else: yield "[MODERATION] I'm sorry, but I can't assist with that." return for line in response.split('\n'): try: data = json.loads(line) if isinstance(data, dict) and data.get("tool") == "imagine" and data.get("isCall") and "prompt" in data: if image_count < 4: image_count += 1 def fetch_image_url(prompt, line): image_url = imagine(prompt) return line, f'{prompt}' def replace_line_in_response(line, replacement): nonlocal response response = response.replace(line, replacement) thread = threading.Thread(target=lambda: replace_line_in_response(*fetch_image_url(data["prompt"], line))) thread.start() thread.join() else: response = response.replace(line, f'[System: 4 image per message limit; prompt asked: `{data["prompt"]}]`') yield f"{prefix}{response}" elif isinstance(data, dict) and data.get("tool") == "calc" and data.get("isCall") and "prompt" in data: isRequeryNeeded = True try: result = safe_eval(data["prompt"]) response = response.replace(line, f'[System: `{data["prompt"]}` === `{result}`]') except Exception as e: response = response.replace(line, f'[System: Error in calculation; `{e}`]') yield f"{prefix}{response}" elif isinstance(data, dict) and data.get("tool") == "search" and data.get("isCall") and "prompt" in data: isRequeryNeeded = True if didSearchedAlready: response = response.replace(line, f'[System: One search per response is allowed; due to how long and resource it takes; query: `{data["prompt"]}]`]') else: try: result = searchEngine(data["prompt"]) result_escaped = result.replace('`', '\\`') response = response.replace(line, f'[System: `{data["prompt"]}` ===\n```\n{result_escaped}\n```\n]') didSearchedAlready = True except Exception as e: response = response.replace(line, f'[System: Error in search function; `{e}`]') yield f"{prefix}{response}" yield f"{prefix}{response}" except (json.JSONDecodeError, AttributeError, Exception): continue if isRequeryNeeded: messages.append({"role": "assistant", "content": response}) async for res in handleResponse(streamChat({ "model": model_name, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "user": rnd(), "stream": True }), f"{prefix}{response}\n\n", image_count, didSearchedAlready): yield res async for res in handleResponse(streamChat({ "model": model_name, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "user": rnd(), "stream": True })): yield res handleApiKeys();loadModels();checkModels(); demo = gr.ChatInterface( respond, title="gpt-4o-mini-small", description=f"This is the smaller version of quardo/gpt-4o-small space.
Mainly exists when the main space is down.", additional_inputs=[ gr.Dropdown(choices=models, value="gpt-4o-mini", label="Model"), gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"), gr.Slider(minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), ], css="footer{display:none !important}", head="""""" ) app = FastAPI() @app.get("/declined") def test(): return HTMLResponse(content=""" Declined

Ok, you can go back to Hugging Face. I just didn't have any idea how to handle decline so you are redirected here.


Go back """) app = gr.mount_gradio_app(app, demo, path="/") class ArgParser(argparse.ArgumentParser): def __init__(self, *args, **kwargs): super(ArgParser, self).__init__(*args, **kwargs) self.add_argument("-s", "--server", type=str, default="0.0.0.0") self.add_argument("-p", "--port", type=int, default=7860) self.add_argument("-d", "--dev", default=False, action="store_true") self.args = self.parse_args(sys.argv[1:]) if __name__ == "__main__": args = ArgParser().args if args.dev: uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True) else: uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False)