import os import time import json import random import requests import g4f from fastapi import FastAPI,Response, status from typing import Dict, NewType, Union, Optional, List, get_type_hints from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse,JSONResponse from fastapi.openapi.utils import get_openapi from starlette.exceptions import HTTPException as StarletteHTTPException from pydantic import BaseModel #app = FastAPI(docs_url=None, redoc_url=None) app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) class chat_completions_Item(BaseModel): stream : bool = False model : str = 'gpt-3.5-turbo' messages : list = [{'role': 'user', 'content':"Say 'Hello World'."}] provider: Union[str, None] = None temperature : float = 0.8 presence_penalty: float = 0 frequency_penalty: float = 0 top_p: float = 1 class completions_Item(BaseModel): stream : bool = False model : str = 'gpt-3.5-turbo' prompt : str = "Say 'Hello World'." provider : Union[str, None] = None temperature : float = 0.8 presence_penalty: float = 0 frequency_penalty: float = 0 top_p: float = 1 def auto_select(model:str='gpt-3.5-turbo',stream:bool=False): r = requests.get('https://gpt.lemonsoftware.eu.org/v1/status') data = r.json()['data'] model_providers = set() random.shuffle(data) for provider_info in data: for model_info in provider_info['model']: if model in model_info: model_providers.add(provider_info['provider']) if model_info[model]['status'] == 'Active': if stream == True and getattr(g4f.Provider,provider_info['provider']).supports_stream == False: continue return [getattr(g4f.Provider,provider_info['provider']),provider_info['provider']] else: continue break if not model_providers: return None active_providers = set() for provider_info in data: for model_info in provider_info['model']: for model in model_info.values(): if model['status'] == 'Active': if stream == True and getattr(g4f.Provider,provider_info['provider']).supports_stream == False: continue active_providers.add(provider_info['provider']) chooseable_providers = model_providers & active_providers if not chooseable_providers: return None chooseable_provider = random.choice(list(chooseable_providers)) return [getattr(g4f.Provider,chooseable_provider),chooseable_provider] @app.post("/v1/chat/completions") def chat_completions(item: chat_completions_Item,responses: Response): stream = item.stream model = item.model.lower() messages = item.messages provider_name = item.provider temperature = item.temperature presence_penalty = item.presence_penalty frequency_penalty = item.frequency_penalty top_p = item.top_p if provider_name: try: response = g4f.ChatCompletion.create(model=model, provider=getattr(g4f.Provider,provider_name),stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) except: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error","type": "invalid_request_error","param": None,"code": 500}}) else: provider = auto_select(model=model,stream=stream) if provider == None: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) if stream and provider[0].supports_stream == False: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) provider_name = provider[1] try: response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) except: provider = auto_select(model=model,stream=stream) if provider == None: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) if stream and provider[0].supports_stream == False: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) provider_name = provider[1] try: response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) except: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error.Please try again.","type": "server_error","param": None,"code": 500}}) if not stream: completion_timestamp = int(time.time()) completion_id = ''.join(random.choices( 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) return { 'id': 'chatcmpl-%s' % completion_id, 'object': 'chat.completion', 'created': completion_timestamp, 'model': model, 'provider':provider_name, 'usage': { 'prompt_tokens': len(messages), 'completion_tokens': len(response), 'total_tokens': len(messages)+len(response) }, 'choices': [{ 'message': { 'role': 'assistant', 'content': response }, 'finish_reason': 'stop', 'index': 0 }] } def stream(): nonlocal response for token in response: completion_timestamp = int(time.time()) completion_id = ''.join(random.choices( 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) completion_data = { 'id': f'chatcmpl-{completion_id}', 'object': 'chat.completion.chunk', 'created': completion_timestamp, 'model': model, 'provider':provider_name, 'choices': [ { 'delta': { 'content': token }, 'index': 0, 'finish_reason': None } ] } yield 'data: %s\n\n' % json.dumps(completion_data, separators=(',' ':')) time.sleep(0.1) return StreamingResponse(stream(), media_type='text/event-stream') @app.post("/v1/completions") def completions(item: completions_Item,responses: Response): stream = item.stream model = item.model.lower() messages = [{'role': 'user', 'content':item.prompt}] provider_name = item.provider temperature = item.temperature presence_penalty = item.presence_penalty frequency_penalty = item.frequency_penalty top_p = item.top_p if provider_name: try: response = g4f.ChatCompletion.create(model=model, provider=getattr(g4f.Provider,provider_name),stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) except: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error.","type": "invalid_request_error","param": None,"code": 500}}) else: provider = auto_select(model=model,stream=stream) if provider == None: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) if stream and provider[0].supports_stream == False: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) provider_name = provider[1] try: response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) except: provider = auto_select(model=model,stream=stream) if provider == None: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) if stream and provider[0].supports_stream == False: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) provider_name = provider[1] try: response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) except: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error.Please try again.","type": "server_error","param": None,"code": 500}}) if not stream: completion_timestamp = int(time.time()) completion_id = ''.join(random.choices( 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) return { 'id': 'cmpl-%s' % completion_id, 'object': 'text.completion', 'created': completion_timestamp, 'model': model, 'provider':provider_name, 'usage': { 'prompt_tokens': len(messages), 'completion_tokens': len(response), 'total_tokens': len(messages)+len(response) }, 'choices': [{ 'text': response, 'finish_reason': 'length', "logprobs": None, 'index': 0 }] } def stream(): nonlocal response for token in response: completion_timestamp = int(time.time()) completion_id = ''.join(random.choices( 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) completion_data = { 'id': f'cmpl-{completion_id}', 'object': 'text.completion.chunk', 'created': completion_timestamp, 'model': model, 'provider':provider_name, 'choices': [ { 'delta': { 'text': token }, 'index': 0, 'finish_reason': None } ] } yield 'data: %s\n\n' % json.dumps(completion_data, separators=(',' ':')) time.sleep(0.1) return StreamingResponse(stream(), media_type='text/event-stream') @app.get("/v1/dashboard/billing/subscription") @app.get("/dashboard/billing/subscription") async def billing_subscription(): return { "object": "billing_subscription", "has_payment_method": True, "canceled": False, "canceled_at": None, "delinquent": None, "access_until": 2556028800, "soft_limit": 6944500, "hard_limit": 166666666, "system_hard_limit": 166666666, "soft_limit_usd": 416.67, "hard_limit_usd": 9999.99996, "system_hard_limit_usd": 9999.99996, "plan": { "title": "Pay-as-you-go", "id": "payg" }, "primary": True, "account_name": "OpenAI", "po_number": None, "billing_email": None, "tax_ids": None, "billing_address": { "city": "New York", "line1": "OpenAI", "country": "US", "postal_code": "NY10031" }, "business_address": None } @app.get("/v1/dashboard/billing/usage") @app.get("/dashboard/billing/usage") async def billing_usage(start_date:str='2023-01-01',end_date:str='2023-01-31'): return { "object": "list", "daily_costs": [ { "timestamp": time.time(), "line_items": [ { "name": "GPT-4", "cost": 0.0 }, { "name": "Chat models", "cost": 1.01 }, { "name": "InstructGPT", "cost": 0.0 }, { "name": "Fine-tuning models", "cost": 0.0 }, { "name": "Embedding models", "cost": 0.0 }, { "name": "Image models", "cost": 16.0 }, { "name": "Audio models", "cost": 0.0 } ] } ], "total_usage": 1.01 } @app.get("/v1/models") def models(): import g4f.models model = {"data":[]} for i in g4f.models.ModelUtils.convert: model['data'].append({ "id": i, "object": "model", "owned_by": g4f.models.ModelUtils.convert[i].base_provider, "tokens": 99999, "fallbacks": None, "endpoints": [ "/v1/chat/completions" ], "limits": None, "permission": [] }) return model @app.get("/v1/providers") async def providers(): files = os.listdir("g4f/Provider/Providers") files = [f for f in files if os.path.isfile(os.path.join("g4f/Provider/Providers", f))] files.sort(key=str.lower) providers_data = {"data":[]} for file in files: if file.endswith(".py"): name = file[:-3] try: p = getattr(g4f.Provider,name) providers_data["data"].append({ "provider": str(name), "model": list(p.model), "url": str(p.url), "working": bool(p.working), "supports_stream": bool(p.supports_stream) }) except: pass return providers_data def custom_openapi(): if app.openapi_schema: return app.openapi_schema openapi_schema = get_openapi( title="GPT API", version="1.0.0", summary="GPT API", description="[Try Online](https://chatgpt-next-web.lemonsoftware.eu.org/)", routes=app.routes, ) openapi_schema["info"]["x-logo"] = { "url": "https://gpt-status.lemonsoftware.eu.org/icon.svg" } app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi @app.exception_handler(StarletteHTTPException) async def custom_http_exception_handler(request, exc): return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={"error": {"message": "Invalid URL","type": "invalid_request_error","param": None,"code": 404}}) if __name__ == '__main__': import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)