|
import os |
|
import time |
|
import json |
|
import random |
|
import requests |
|
import g4f |
|
from fastapi import FastAPI,Response, status |
|
from typing import Dict, NewType, Union, Optional, List, get_type_hints |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import StreamingResponse,JSONResponse |
|
from fastapi.openapi.utils import get_openapi |
|
from starlette.exceptions import HTTPException as StarletteHTTPException |
|
from pydantic import BaseModel |
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
class chat_completions_Item(BaseModel): |
|
stream : bool = False |
|
model : str = 'gpt-3.5-turbo' |
|
messages : list = [{'role': 'user', 'content':"Say 'Hello World'."}] |
|
provider: Union[str, None] = None |
|
temperature : float = 0.8 |
|
presence_penalty: float = 0 |
|
frequency_penalty: float = 0 |
|
top_p: float = 1 |
|
|
|
class completions_Item(BaseModel): |
|
stream : bool = False |
|
model : str = 'gpt-3.5-turbo' |
|
prompt : str = "Say 'Hello World'." |
|
provider : Union[str, None] = None |
|
temperature : float = 0.8 |
|
presence_penalty: float = 0 |
|
frequency_penalty: float = 0 |
|
top_p: float = 1 |
|
|
|
def auto_select(model:str='gpt-3.5-turbo',stream:bool=False): |
|
r = requests.get('https://gpt.lemonsoftware.eu.org/v1/status') |
|
data = r.json()['data'] |
|
model_providers = set() |
|
random.shuffle(data) |
|
for provider_info in data: |
|
for model_info in provider_info['model']: |
|
if model in model_info: |
|
model_providers.add(provider_info['provider']) |
|
if model_info[model]['status'] == 'Active': |
|
if stream == True and getattr(g4f.Provider,provider_info['provider']).supports_stream == False: |
|
continue |
|
return [getattr(g4f.Provider,provider_info['provider']),provider_info['provider']] |
|
else: |
|
continue |
|
break |
|
if not model_providers: |
|
return None |
|
active_providers = set() |
|
for provider_info in data: |
|
for model_info in provider_info['model']: |
|
for model in model_info.values(): |
|
if model['status'] == 'Active': |
|
if stream == True and getattr(g4f.Provider,provider_info['provider']).supports_stream == False: |
|
continue |
|
active_providers.add(provider_info['provider']) |
|
chooseable_providers = model_providers & active_providers |
|
if not chooseable_providers: |
|
return None |
|
chooseable_provider = random.choice(list(chooseable_providers)) |
|
return [getattr(g4f.Provider,chooseable_provider),chooseable_provider] |
|
|
|
@app.post("/v1/chat/completions") |
|
def chat_completions(item: chat_completions_Item,responses: Response): |
|
stream = item.stream |
|
model = item.model.lower() |
|
messages = item.messages |
|
provider_name = item.provider |
|
temperature = item.temperature |
|
presence_penalty = item.presence_penalty |
|
frequency_penalty = item.frequency_penalty |
|
top_p = item.top_p |
|
|
|
if provider_name: |
|
try: |
|
response = g4f.ChatCompletion.create(model=model, provider=getattr(g4f.Provider,provider_name),stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) |
|
except: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error","type": "invalid_request_error","param": None,"code": 500}}) |
|
else: |
|
provider = auto_select(model=model,stream=stream) |
|
if provider == None: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) |
|
if stream and provider[0].supports_stream == False: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) |
|
provider_name = provider[1] |
|
try: |
|
response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) |
|
except: |
|
provider = auto_select(model=model,stream=stream) |
|
if provider == None: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) |
|
if stream and provider[0].supports_stream == False: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) |
|
provider_name = provider[1] |
|
try: |
|
response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) |
|
except: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error.Please try again.","type": "server_error","param": None,"code": 500}}) |
|
|
|
|
|
if not stream: |
|
completion_timestamp = int(time.time()) |
|
completion_id = ''.join(random.choices( |
|
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) |
|
return { |
|
'id': 'chatcmpl-%s' % completion_id, |
|
'object': 'chat.completion', |
|
'created': completion_timestamp, |
|
'model': model, |
|
'provider':provider_name, |
|
'usage': { |
|
'prompt_tokens': len(messages), |
|
'completion_tokens': len(response), |
|
'total_tokens': len(messages)+len(response) |
|
}, |
|
'choices': [{ |
|
'message': { |
|
'role': 'assistant', |
|
'content': response |
|
}, |
|
'finish_reason': 'stop', |
|
'index': 0 |
|
}] |
|
} |
|
|
|
def stream(): |
|
nonlocal response |
|
for token in response: |
|
completion_timestamp = int(time.time()) |
|
completion_id = ''.join(random.choices( |
|
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) |
|
completion_data = { |
|
'id': f'chatcmpl-{completion_id}', |
|
'object': 'chat.completion.chunk', |
|
'created': completion_timestamp, |
|
'model': model, |
|
'provider':provider_name, |
|
'choices': [ |
|
{ |
|
'delta': { |
|
'content': token |
|
}, |
|
'index': 0, |
|
'finish_reason': None |
|
} |
|
] |
|
} |
|
yield 'data: %s\n\n' % json.dumps(completion_data, separators=(',' ':')) |
|
time.sleep(0.1) |
|
return StreamingResponse(stream(), media_type='text/event-stream') |
|
|
|
@app.post("/v1/completions") |
|
def completions(item: completions_Item,responses: Response): |
|
stream = item.stream |
|
model = item.model.lower() |
|
messages = [{'role': 'user', 'content':item.prompt}] |
|
provider_name = item.provider |
|
temperature = item.temperature |
|
presence_penalty = item.presence_penalty |
|
frequency_penalty = item.frequency_penalty |
|
top_p = item.top_p |
|
if provider_name: |
|
try: |
|
response = g4f.ChatCompletion.create(model=model, provider=getattr(g4f.Provider,provider_name),stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) |
|
except: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error.","type": "invalid_request_error","param": None,"code": 500}}) |
|
|
|
else: |
|
provider = auto_select(model=model,stream=stream) |
|
if provider == None: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) |
|
if stream and provider[0].supports_stream == False: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) |
|
provider_name = provider[1] |
|
try: |
|
response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) |
|
except: |
|
provider = auto_select(model=model,stream=stream) |
|
if provider == None: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "The model is invalid or not working.","type": "invalid_request_error","param": None,"code": 500}}) |
|
if stream and provider[0].supports_stream == False: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "Stream is not supported.","type": "invalid_request_error","param": None,"code": 500}}) |
|
provider_name = provider[1] |
|
try: |
|
response = g4f.ChatCompletion.create(model=model, provider=provider[0],stream=stream,messages=messages,temperature=temperature,presence_penalty=presence_penalty,frequency_penalty=frequency_penalty,top_p=top_p) |
|
except: |
|
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": {"message": "There was an error.Please try again.","type": "server_error","param": None,"code": 500}}) |
|
|
|
if not stream: |
|
completion_timestamp = int(time.time()) |
|
completion_id = ''.join(random.choices( |
|
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) |
|
return { |
|
'id': 'cmpl-%s' % completion_id, |
|
'object': 'text.completion', |
|
'created': completion_timestamp, |
|
'model': model, |
|
'provider':provider_name, |
|
'usage': { |
|
'prompt_tokens': len(messages), |
|
'completion_tokens': len(response), |
|
'total_tokens': len(messages)+len(response) |
|
}, |
|
'choices': [{ |
|
'text': response, |
|
'finish_reason': 'length', |
|
"logprobs": None, |
|
'index': 0 |
|
}] |
|
} |
|
|
|
def stream(): |
|
nonlocal response |
|
for token in response: |
|
completion_timestamp = int(time.time()) |
|
completion_id = ''.join(random.choices( |
|
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=28)) |
|
completion_data = { |
|
'id': f'cmpl-{completion_id}', |
|
'object': 'text.completion.chunk', |
|
'created': completion_timestamp, |
|
'model': model, |
|
'provider':provider_name, |
|
'choices': [ |
|
{ |
|
'delta': { |
|
'text': token |
|
}, |
|
'index': 0, |
|
'finish_reason': None |
|
} |
|
] |
|
} |
|
yield 'data: %s\n\n' % json.dumps(completion_data, separators=(',' ':')) |
|
time.sleep(0.1) |
|
return StreamingResponse(stream(), media_type='text/event-stream') |
|
|
|
@app.get("/v1/dashboard/billing/subscription") |
|
@app.get("/dashboard/billing/subscription") |
|
async def billing_subscription(): |
|
return { |
|
"object": "billing_subscription", |
|
"has_payment_method": True, |
|
"canceled": False, |
|
"canceled_at": None, |
|
"delinquent": None, |
|
"access_until": 2556028800, |
|
"soft_limit": 6944500, |
|
"hard_limit": 166666666, |
|
"system_hard_limit": 166666666, |
|
"soft_limit_usd": 416.67, |
|
"hard_limit_usd": 9999.99996, |
|
"system_hard_limit_usd": 9999.99996, |
|
"plan": { |
|
"title": "Pay-as-you-go", |
|
"id": "payg" |
|
}, |
|
"primary": True, |
|
"account_name": "OpenAI", |
|
"po_number": None, |
|
"billing_email": None, |
|
"tax_ids": None, |
|
"billing_address": { |
|
"city": "New York", |
|
"line1": "OpenAI", |
|
"country": "US", |
|
"postal_code": "NY10031" |
|
}, |
|
"business_address": None |
|
} |
|
|
|
|
|
@app.get("/v1/dashboard/billing/usage") |
|
@app.get("/dashboard/billing/usage") |
|
async def billing_usage(start_date:str='2023-01-01',end_date:str='2023-01-31'): |
|
return { |
|
"object": "list", |
|
"daily_costs": [ |
|
{ |
|
"timestamp": time.time(), |
|
"line_items": [ |
|
{ |
|
"name": "GPT-4", |
|
"cost": 0.0 |
|
}, |
|
{ |
|
"name": "Chat models", |
|
"cost": 1.01 |
|
}, |
|
{ |
|
"name": "InstructGPT", |
|
"cost": 0.0 |
|
}, |
|
{ |
|
"name": "Fine-tuning models", |
|
"cost": 0.0 |
|
}, |
|
{ |
|
"name": "Embedding models", |
|
"cost": 0.0 |
|
}, |
|
{ |
|
"name": "Image models", |
|
"cost": 16.0 |
|
}, |
|
{ |
|
"name": "Audio models", |
|
"cost": 0.0 |
|
} |
|
] |
|
} |
|
], |
|
"total_usage": 1.01 |
|
} |
|
|
|
@app.get("/v1/models") |
|
def models(): |
|
import g4f.models |
|
model = {"data":[]} |
|
for i in g4f.models.ModelUtils.convert: |
|
model['data'].append({ |
|
"id": i, |
|
"object": "model", |
|
"owned_by": g4f.models.ModelUtils.convert[i].base_provider, |
|
"tokens": 99999, |
|
"fallbacks": None, |
|
"endpoints": [ |
|
"/v1/chat/completions" |
|
], |
|
"limits": None, |
|
"permission": [] |
|
}) |
|
return model |
|
|
|
@app.get("/v1/providers") |
|
async def providers(): |
|
files = os.listdir("g4f/Provider/Providers") |
|
files = [f for f in files if os.path.isfile(os.path.join("g4f/Provider/Providers", f))] |
|
files.sort(key=str.lower) |
|
providers_data = {"data":[]} |
|
for file in files: |
|
if file.endswith(".py"): |
|
name = file[:-3] |
|
try: |
|
p = getattr(g4f.Provider,name) |
|
providers_data["data"].append({ |
|
"provider": str(name), |
|
"model": list(p.model), |
|
"url": str(p.url), |
|
"working": bool(p.working), |
|
"supports_stream": bool(p.supports_stream) |
|
}) |
|
except: |
|
pass |
|
return providers_data |
|
|
|
def custom_openapi(): |
|
if app.openapi_schema: |
|
return app.openapi_schema |
|
openapi_schema = get_openapi( |
|
title="GPT API", |
|
version="1.0.0", |
|
summary="GPT API", |
|
description="[Try Online](https://chatgpt-next-web.lemonsoftware.eu.org/)", |
|
routes=app.routes, |
|
) |
|
openapi_schema["info"]["x-logo"] = { |
|
"url": "https://gpt-status.lemonsoftware.eu.org/icon.svg" |
|
} |
|
app.openapi_schema = openapi_schema |
|
return app.openapi_schema |
|
app.openapi = custom_openapi |
|
|
|
@app.exception_handler(StarletteHTTPException) |
|
|
|
async def custom_http_exception_handler(request, exc): |
|
return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={"error": {"message": "Invalid URL","type": "invalid_request_error","param": None,"code": 404}}) |
|
|
|
if __name__ == '__main__': |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |