freeplay2api / freeplay2api.py
bibibi12345's picture
changed proxy pool
7d6cbb8
import json
import os
import time
import uuid
import threading
import logging
import asyncio
import concurrent.futures
from typing import Any, List, Optional, Dict, Generator, Union
from contextlib import asynccontextmanager
from proxy_pool import ProxyPool
from fastapi import FastAPI, HTTPException, Depends, Response, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from faker import Faker
import requests
# --- 基本配置 ---
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# --- 全局变量 ---
config = {}
account_manager = None
freeplay_client = None
proxy_pool = None
valid_client_keys = set()
app_lock = threading.Lock() # 用于保护全局资源的初始化
# --- Pydantic模型定义 (来自模板) ---
class ChatMessage(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]]]
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
stream: bool = False
temperature: Optional[float] = 1.0 # 映射到Freeplay参数
max_tokens: Optional[int] = 32000 # 映射到Freeplay参数
top_p: Optional[float] = 1.0 # 映射到Freeplay参数
class ModelInfo(BaseModel):
id: str
object: str = "model"
created: int = Field(default_factory=lambda: int(time.time()))
owned_by: str = "freeplay"
class ModelList(BaseModel):
object: str = "list"
data: List[ModelInfo]
class ChatCompletionChoice(BaseModel):
message: ChatMessage
index: int = 0
finish_reason: str = "stop"
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[ChatCompletionChoice]
usage: Dict[str, int] = Field(
default_factory=lambda: {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
}
)
class StreamChoice(BaseModel):
delta: Dict[str, Any] = Field(default_factory=dict)
index: int = 0
finish_reason: Optional[str] = None
class StreamResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion.chunk"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[StreamChoice]
# --- 模型映射 ---
MODEL_MAPPING = {
"claude-3-7-sonnet-20250219": {
"model_id": "be71f37b-1487-49fa-a989-a9bb99c0b129",
"max_tokens": 64000,
"provider": "Anthropic",
},
"claude-4-opus-20250514": {
"model_id": "bebc7dd5-a24d-4147-85b0-8f62902ea1a3",
"max_tokens": 32000,
"provider": "Anthropic",
},
"claude-4-sonnet": {
"model_id": "884dde7c-8def-4365-b19a-57af2787ab84",
"max_tokens": 64000,
"provider": "Anthropic",
},
}
# --- 服务类 ---
class FreeplayClient:
def __init__(self, proxy_pool_instance: Optional[ProxyPool] = None):
self.proxy_pool = proxy_pool_instance
self.faker = Faker()
def check_balance(self, session_id: str) -> float:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Accept": "application/json",
}
cookies = {"session": session_id}
proxy_info = self.proxy_pool.get_proxy() if self.proxy_pool else None
proxies = {"http": proxy_info['full'], "https": proxy_info['full']} if proxy_info else None
try:
response = requests.get(
"https://app.freeplay.ai/app_data/settings/billing",
headers=headers,
cookies=cookies,
proxies=proxies,
timeout=10,
)
if response.status_code == 200:
data = response.json()
for feature in data.get("feature_usage", []):
if feature.get("feature_name") == "Freeplay credits":
return feature.get("usage_limit", 0) - feature.get(
"usage_value", 0
)
return 0.0
return 0.0
except requests.exceptions.ProxyError as e:
logging.warning(f"Proxy error during balance check: {e}")
if self.proxy_pool and proxy_info:
self.proxy_pool.remove_proxy(proxy_info['ip'], proxy_info['port'])
return 0.0
except Exception as e:
session_suffix = session_id[-4:] if session_id and len(session_id) >= 4 else session_id or "None"
logging.warning(
f"Failed to check balance for session_id ending in ...{session_suffix}: {e}"
)
return 0.0
def register(self) -> Optional[Dict]:
logging.info("REGISTER FUNCTION STARTED")
# Wait for proxy pool to have at least one proxy before starting registration
if self.proxy_pool:
logging.info("Waiting for proxy pool to initialize...")
wait_time = 0
while self.proxy_pool.get_count() == 0 and wait_time < 60: # Wait up to 60 seconds
time.sleep(1)
wait_time += 1
if wait_time % 10 == 0: # Log every 10 seconds
logging.info(f"Still waiting for proxies... ({wait_time}s elapsed)")
if self.proxy_pool.get_count() > 0:
logging.info(f"Proxy pool ready with {self.proxy_pool.get_count()} proxies")
else:
logging.warning("Proxy pool initialization timed out, proceeding anyway")
url = "https://app.freeplay.ai/app_data/auth/signup"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Accept": "application/json",
"Content-Type": "application/json",
"origin": "https://app.freeplay.ai",
"referer": "https://app.freeplay.ai/signup",
}
attempt = 0
logging.info("STARTING RETRY LOOP")
while attempt < 50: # Try up to 50 times
attempt += 1
logging.info(f"LOOP ITERATION {attempt} STARTED")
# Wait 2 seconds between attempts (except for the first attempt)
if attempt > 1:
logging.info(f"Waiting 2 seconds before attempt {attempt}...")
time.sleep(2)
proxy_info = None
try:
payload = {
"email": self.faker.email(),
"password": f"aA1!{uuid.uuid4().hex[:8]}",
"account_name": self.faker.name(),
"first_name": self.faker.first_name(),
"last_name": self.faker.last_name(),
}
proxy_info = self.proxy_pool.get_proxy() if self.proxy_pool else None
# Use HTTP proxy for HTTPS requests (CONNECT tunneling)
proxies = {"http": proxy_info['full'], "https": proxy_info['full']} if proxy_info else None
logging.info(f"Registration attempt {attempt}/50 using proxy {proxy_info['full'] if proxy_info else 'None'}")
# If no proxy available, skip this attempt
if self.proxy_pool and not proxy_info:
logging.warning(f"No proxy available for attempt {attempt}, skipping...")
continue
logging.info(f"ABOUT TO MAKE REQUEST WITH PROXY {proxy_info['full'] if proxy_info else 'None'}")
response = requests.post(
url,
data=json.dumps(payload),
headers=headers,
proxies=proxies,
timeout=20,
)
logging.info(f"REQUEST COMPLETED WITH STATUS {response.status_code}")
if response.status_code == 200:
data = response.json()
project_id = data.get("project_id")
session = response.cookies.get("session")
if project_id and session:
logging.info(f"Successfully registered account: {payload['email']} on attempt {attempt}")
return {
"email": payload["email"],
"password": payload["password"],
"session_id": session,
"project_id": project_id,
"balance": 5.0,
}
logging.warning(f"Registration attempt {attempt}/50 failed with status {response.status_code}: {response.text}")
logging.info(f"CONTINUING TO NEXT ATTEMPT {attempt + 1}")
except requests.exceptions.ProxyError as e:
logging.warning(f"CAUGHT REQUESTS.EXCEPTIONS.PROXYERROR: {e}")
if self.proxy_pool and proxy_info:
logging.info(f"ABOUT TO REMOVE PROXY {proxy_info['ip']}:{proxy_info['port']}")
self.proxy_pool.remove_proxy(proxy_info['ip'], proxy_info['port'])
logging.info(f"PROXY REMOVAL COMPLETED")
logging.info(f"CONTINUING AFTER PROXY ERROR TO ATTEMPT {attempt + 1}")
continue
except Exception as e:
logging.error(f"CAUGHT GENERIC EXCEPTION: {type(e).__name__}: {e}")
logging.info(f"CONTINUING AFTER UNEXPECTED ERROR TO ATTEMPT {attempt + 1}")
continue
logging.error("Failed to register a new account after 50 attempts.")
logging.info("REGISTER FUNCTION ENDING")
return None
def chat(
self,
session_id: str,
project_id: str,
model_config: Dict,
messages: List[Dict],
params: Dict,
) -> requests.Response:
url = f"https://app.freeplay.ai/app_data/projects/{project_id}/llm-completions"
headers = {
"accept": "*/*",
"origin": "https://app.freeplay.ai",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
}
cookies = {"session": session_id}
# 将 system message 转换为 user message
for msg in messages:
if msg["role"] == "system":
msg["role"] = "user"
json_payload = {
"messages": messages,
"params": [
{
"name": "max_tokens",
"value": params.get("max_tokens", model_config["max_tokens"]),
"type": "integer",
},
{
"name": "temperature",
"value": params.get("temperature", 1.0),
"type": "float",
},
{"name": "top_p", "value": params.get("top_p", 1.0), "type": "float"},
],
"model_id": model_config["model_id"],
"variables": {},
"history": None,
"asset_references": {},
}
files = {"json_data": (None, json.dumps(json_payload))}
proxy_info = self.proxy_pool.get_proxy() if self.proxy_pool else None
proxies = {"http": proxy_info['full'], "https": proxy_info['full']} if proxy_info else None
try:
return requests.post(
url, headers=headers, cookies=cookies, files=files, stream=True, proxies=proxies
)
except requests.exceptions.ProxyError as e:
logging.warning(f"Proxy error during chat: {e}")
if self.proxy_pool and proxy_info:
self.proxy_pool.remove_proxy(proxy_info['ip'], proxy_info['port'])
raise # Re-raise to be caught by the retry logic
class AccountManager:
def __init__(self, filepath: str):
self.filepath = filepath
self.accounts = []
self.lock = threading.Lock()
self.load_accounts()
def load_accounts(self):
with self.lock:
if not os.path.exists(self.filepath):
self.accounts = []
return
with open(self.filepath, "r", encoding="utf-8") as f:
self.accounts = [json.loads(line) for line in f if line.strip()]
logging.info(f"Loaded {len(self.accounts)} accounts from {self.filepath}")
def save_accounts(self):
# This operation is disabled to ensure the application is stateless.
# Account data is now only managed in memory for the duration of the process.
pass
def add_account(self, account: Dict):
with self.lock:
self.accounts.append(account)
self.save_accounts()
logging.info(f"Added new account: {account.get('email')}")
def get_account(self) -> Optional[Dict]:
with self.lock:
# 优先选择余额最高的
available_accounts = [
acc for acc in self.accounts if acc.get("balance", 0) > 0
]
if not available_accounts:
return None
return max(available_accounts, key=lambda x: x.get("balance", 0))
def update_account(self, account_data: Dict):
with self.lock:
for i, acc in enumerate(self.accounts):
if acc["session_id"] == account_data["session_id"]:
self.accounts[i] = account_data
break
self.save_accounts()
def get_all_accounts(self) -> List[Dict]:
with self.lock:
return self.accounts.copy()
class KeyMaintainer(threading.Thread):
def __init__(
self, account_manager: AccountManager, client: FreeplayClient, config: Dict
):
super().__init__(daemon=True)
self.manager = account_manager
self.client = client
self.config = config
def run(self):
while True:
try:
logging.info("KeyMaintainer: Starting maintenance cycle.")
accounts = self.manager.get_all_accounts()
# Update balances
for account in accounts:
balance = self.client.check_balance(account["session_id"])
if balance != account.get("balance"):
account["balance"] = balance
self.manager.update_account(account)
logging.info(f"Account {account['email']} balance updated to ${balance:.4f}")
# Check if new accounts are needed
healthy_accounts = [
acc for acc in self.manager.get_all_accounts()
if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
]
needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
while needed > 0:
logging.info(f"Healthy accounts ({len(healthy_accounts)}) below threshold. Need to register {needed} new accounts. Retrying immediately.")
new_account = self.client.register()
if new_account:
self.manager.add_account(new_account)
healthy_accounts = [
acc for acc in self.manager.get_all_accounts()
if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
]
needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
except Exception as e:
logging.error(f"Error in KeyMaintainer cycle: {e}")
time.sleep(self.config["CHECK_INTERVAL_SECONDS"])
# --- FastAPI应用 ---
@asynccontextmanager
async def lifespan(app: FastAPI):
initialize_app()
yield
app = FastAPI(title="Freeplay.ai to OpenAI API Adapter", lifespan=lifespan)
security = HTTPBearer()
def initialize_app():
global config, account_manager, freeplay_client, valid_client_keys, proxy_pool
with app_lock:
if account_manager: # 已经初始化
return
# 1. 加载配置
default_config = {
"HOST": "0.0.0.0",
"PORT": 7860,
"ACCOUNTS_FILE": "data/accounts.json",
"LOW_BALANCE_THRESHOLD": 2.0,
"ACTIVE_KEY_THRESHOLD": 5,
"CHECK_INTERVAL_SECONDS": 5,
"REGISTRATION_CONCURRENCY": 1,
"USE_PROXY_POOL": True,
"PROXY_POOL_CONFIG": {
"target_count": 10,
"min_threshold": 3,
"check_interval": 30
}
}
try:
with open("config.json", "r") as f:
config = json.load(f)
logging.info("Loaded config from config.json")
except (FileNotFoundError, json.JSONDecodeError):
config = default_config
logging.info("Using default config as config.json was not found or invalid.")
# 2. 加载客户端密钥
api_key = os.environ.get("API_KEY", "sk-123456")
valid_client_keys = {api_key}
logging.info("Loaded API_KEY from environment variable.")
# 3. 初始化代理池
if config.get("USE_PROXY_POOL"):
logging.info("Initializing proxy pool...")
proxy_pool_config = config.get("PROXY_POOL_CONFIG", {})
logging.info(f"Proxy pool config being used: {proxy_pool_config}")
proxy_pool = ProxyPool(proxy_pool_config)
# Initialize proxy pool asynchronously to avoid blocking app startup
threading.Thread(target=proxy_pool.initialize, daemon=True).start()
else:
logging.info("Proxy pool is disabled in config.")
# 4. 初始化服务
freeplay_client = FreeplayClient(proxy_pool_instance=proxy_pool)
account_manager = AccountManager(filepath=config["ACCOUNTS_FILE"])
# 5. 启动后台维护线程
maintainer = KeyMaintainer(account_manager, freeplay_client, config)
maintainer.start()
logging.info("Key maintenance service started.")
async def authenticate_client(auth: HTTPAuthorizationCredentials = Depends(security)):
if not auth or auth.credentials not in valid_client_keys:
raise HTTPException(status_code=403, detail="Invalid client API key.")
@app.get("/v1/models", response_model=ModelList)
async def list_models(_: None = Depends(authenticate_client)):
model_infos = [
ModelInfo(id=name, owned_by=details["provider"])
for name, details in MODEL_MAPPING.items()
]
return ModelList(data=model_infos)
def stream_generator(
response: requests.Response, model_name: str, account: Dict
) -> Generator[str, None, None]:
chat_id = f"chatcmpl-{uuid.uuid4().hex}"
created = int(time.time())
# Start chunk
start_chunk = StreamResponse(
model=model_name, choices=[StreamChoice(delta={"role": "assistant"})]
).dict()
start_chunk["id"] = chat_id
start_chunk["created"] = created
yield f"data: {json.dumps(start_chunk)}\n\n"
try:
for line in response.iter_lines(decode_unicode=True):
if line and line.startswith("data: "):
try:
data = json.loads(line[6:])
if data.get("content"):
chunk = StreamResponse(
model=model_name,
choices=[StreamChoice(delta={"content": data["content"]})],
).dict()
chunk["id"] = chat_id
chunk["created"] = created
yield f"data: {json.dumps(chunk)}\n\n"
if data.get("cost") is not None:
break # 结束
except json.JSONDecodeError:
continue
finally:
# End chunk
end_chunk = StreamResponse(
model=model_name, choices=[StreamChoice(delta={}, finish_reason="stop")]
).dict()
end_chunk["id"] = chat_id
end_chunk["created"] = created
yield f"data: {json.dumps(end_chunk)}\n\n"
yield "data: [DONE]\n\n"
# 更新余额
new_balance = freeplay_client.check_balance(account["session_id"])
if new_balance != account.get("balance"):
account["balance"] = new_balance
account_manager.update_account(account)
logging.info(
f"Post-chat balance update for {account['email']}: ${new_balance:.4f}"
)
@app.post("/v1/chat/completions")
async def chat_completions(
req: ChatCompletionRequest, _: None = Depends(authenticate_client)
):
if req.model not in MODEL_MAPPING:
raise HTTPException(status_code=404, detail=f"Model '{req.model}' not found.")
model_config = MODEL_MAPPING[req.model]
messages_dict = [msg.dict() for msg in req.messages]
# Convert OpenAI vision format to Anthropic format
for message in messages_dict:
if isinstance(message.get("content"), list):
new_content = []
for part in message["content"]:
if part.get("type") == "text":
new_content.append({"type": "text", "text": part.get("text", "")})
elif part.get("type") == "image_url":
image_url = part.get("image_url", {}).get("url", "")
if image_url.startswith("data:"):
try:
# "data:image/jpeg;base64,{base64_string}"
header, encoded = image_url.split(",", 1)
media_type = header.split(":")[1].split(";")[0]
new_content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": encoded,
},
}
)
except Exception as e:
logging.warning(f"Could not parse image data URL: {e}")
message["content"] = new_content
# 账户选择和重试逻辑
max_retries = len(account_manager.get_all_accounts())
for attempt in range(max_retries):
account = account_manager.get_account()
if not account:
raise HTTPException(
status_code=503, detail="No available accounts in the pool."
)
try:
params = {
"max_tokens": req.max_tokens,
"temperature": req.temperature,
"top_p": req.top_p,
}
response = freeplay_client.chat(
account["session_id"],
account["project_id"],
model_config,
messages_dict,
params,
)
if response.status_code == 200:
# 请求成功
if req.stream:
return StreamingResponse(
stream_generator(response, req.model, account),
media_type="text/event-stream",
)
else:
full_content = ""
for line in response.iter_lines(decode_unicode=True):
if line and line.startswith("data: "):
try:
data = json.loads(line[6:])
content = data.get("content", "")
if content is not None:
full_content += content
if data.get("cost") is not None:
break
except json.JSONDecodeError:
continue
# 更新余额
new_balance = freeplay_client.check_balance(account["session_id"])
account["balance"] = new_balance
account_manager.update_account(account)
logging.info(
f"Post-chat balance update for {account['email']}: ${new_balance:.4f}"
)
return ChatCompletionResponse(
model=req.model,
choices=[
ChatCompletionChoice(
message=ChatMessage(
role="assistant", content=full_content
)
)
],
)
elif response.status_code in [401, 403, 404]:
logging.warning(
f"Account {account['email']} failed with status {response.status_code}. Disabling it."
)
account["balance"] = 0.0 # 禁用账户
account_manager.update_account(account)
continue # 重试下一个
else:
logging.error(
f"API call failed with status {response.status_code}: {response.text}"
)
response.raise_for_status()
except requests.exceptions.ProxyError:
# Proxy error was already logged and handled in FreeplayClient
logging.warning(f"Retrying request due to proxy error.")
# Don't disable the account, just retry with a new proxy (and potentially new account)
continue
except Exception as e:
logging.error(
f"Error with account {account['email']}: {e}. Trying next account."
)
account["balance"] = 0.0 # 发生未知异常也禁用
account_manager.update_account(account)
continue
raise HTTPException(
status_code=503, detail="All available accounts failed to process the request."
)
@app.get("/admin/accounts/status")
async def accounts_status(_: None = Depends(authenticate_client)):
accounts = account_manager.get_all_accounts()
total_balance = sum(acc.get("balance", 0) for acc in accounts)
healthy_count = len(
[
acc
for acc in accounts
if acc.get("balance", 0) > config.get("LOW_BALANCE_THRESHOLD", 2.0)
]
)
return JSONResponse(
{
"total_accounts": len(accounts),
"healthy_accounts": healthy_count,
"total_balance": f"${total_balance:.4f}",
"accounts": [
{
"email": acc.get("email"),
"balance": f"${acc.get('balance', 0):.4f}",
"project_id": acc.get("project_id"),
}
for acc in accounts
],
}
)
if __name__ == "__main__":
import uvicorn
initialize_app()
logging.info("--- Freeplay.ai to OpenAI API Adapter ---")
logging.info(f"Starting server on {config['HOST']}:{config['PORT']}")
logging.info(f"Supported models: {list(MODEL_MAPPING.keys())}")
logging.info(f"Client keys loaded: {len(valid_client_keys)}")
logging.info(f"Accounts loaded: {len(account_manager.get_all_accounts())}")
logging.info("Endpoints:")
logging.info(" POST /v1/chat/completions (Client API Key Auth)")
logging.info(" GET /v1/models (Client API Key Auth)")
logging.info(" GET /admin/accounts/status (Client API Key Auth)")
uvicorn.run(app, host=config["HOST"], port=config["PORT"])