Spaces:
Sleeping
Sleeping
import requests | |
import os | |
import openai | |
import anthropic | |
BASE_URL = 'https://api.openai.com/v1' | |
GPT_TYPES = ["gpt-3.5-turbo", "gpt-4", "gpt-4-32k"] | |
RATE_LIMIT_PER_MODEL = { | |
"gpt-3.5-turbo": 2000, # new pay turbo will have 2000 RPM for the first 48 hours then become 3500 | |
"gpt-4": 200, | |
"gpt-4-32k": 1000 | |
} | |
def get_headers(key): | |
headers = {'Authorization': f'Bearer {key}'} | |
return headers | |
def get_subscription(key, available_models): | |
headers = get_headers(key) | |
rpm = "0" | |
tpm = "0" | |
tpm_left = "0" | |
org = "" | |
quota = "" | |
key_highest_model = "" | |
has_gpt4_32k = False | |
has_gpt4 = False | |
if check_gpt4_32k_availability(available_models): | |
key_highest_model = GPT_TYPES[2] | |
has_gpt4_32k = True | |
has_gpt4 = True | |
elif check_gpt4_availability(available_models): | |
key_highest_model = GPT_TYPES[1] | |
has_gpt4 = True | |
else: | |
key_highest_model = GPT_TYPES[0] | |
req_body = {"model": key_highest_model, "messages": [{'role':'user', 'content': ''}], "max_tokens": 1} | |
r = requests.post(f"{BASE_URL}/chat/completions", headers=headers, json=req_body) | |
result = r.json() | |
if "id" in result: | |
rpm = r.headers.get("x-ratelimit-limit-requests", "0") | |
tpm = r.headers.get("x-ratelimit-limit-tokens", "0") | |
tpm_left = r.headers.get("x-ratelimit-remaining-tokens", "0") | |
org = r.headers.get('openai-organization', "") | |
quota = check_key_type(key_highest_model, int(rpm)) | |
else: | |
e = result.get("error", {}).get("code", "") | |
quota = f"Error: {e}" | |
org = get_org_name(key) | |
return {"has_gpt4_32k": has_gpt4_32k, | |
"has_gpt4": has_gpt4, | |
"organization": org, | |
"rpm": f"{rpm} ({key_highest_model})", | |
"tpm": f"{tpm} ({tpm_left} left)", | |
"quota": quota} | |
def get_org_name(key): | |
headers=get_headers(key) | |
r = requests.post(f"{BASE_URL}/images/generations", headers=headers) | |
return r.headers['openai-organization'] | |
def check_key_type(model, rpm): | |
if rpm < RATE_LIMIT_PER_MODEL[model]: | |
return "yes | trial" | |
else: | |
return "yes | pay" | |
def check_gpt4_availability(available_models): | |
if 'gpt-4' in available_models: | |
return True | |
else: | |
return False | |
def check_gpt4_32k_availability(available_models): | |
if 'gpt-4-32k' in available_models: | |
return True | |
else: | |
return False | |
def check_key_availability(): | |
try: | |
avai_models = openai.Model.list() | |
return [model["root"] for model in avai_models["data"] if model["root"] in GPT_TYPES] | |
except: | |
return False | |
def check_key_ant_availability(ant): | |
try: | |
ant.completions.create( | |
prompt=f"{anthropic.HUMAN_PROMPT}Hi{anthropic.AI_PROMPT}", | |
max_tokens_to_sample=1, | |
model="claude-instant-v1", | |
) | |
return True, "Working" | |
except anthropic.APIConnectionError as e: | |
print(e.__cause__) # an underlying Exception, likely raised within httpx. | |
return False, "Error: The server could not be reached" | |
except anthropic.RateLimitError as e: | |
return True, "Error: 429, rate limited; we should back off a bit." | |
except anthropic.APIStatusError as e: | |
err_msg = e.body.get('error', {}).get('message', '') | |
return False, f"Error: {e.status_code}, {err_msg}" | |
if __name__ == "__main__": | |
key = os.getenv("OPENAI_API_KEY") | |
key_ant = os.getenv("ANTHROPIC_API_KEY") | |
results = get_subscription(key) |