import os import time import logging import requests from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, request, jsonify, Response, stream_with_context # 配置日志记录 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info" TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions" MODELS_ENDPOINT = "https://api.siliconflow.cn/v1/models" app = Flask(__name__) # 全局变量,用于存储模型列表和免费模型列表 all_models = [] free_models = [] # 全局的 key 列表 invalid_keys_global = [] free_keys_global = [] unverified_keys_global = [] valid_keys_global = [] def get_credit_summary(api_key): """ 使用 API 密钥获取额度信息。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.get(API_ENDPOINT, headers=headers) response.raise_for_status() data = response.json().get("data", {}) total_balance = data.get("totalBalance", 0) return {"total_balance": float(total_balance)} except requests.exceptions.RequestException as e: logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") return None except (KeyError, TypeError) as e: logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}") return None except ValueError as e: logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}") return None def test_model_availability(api_key, model_name): """ 测试指定的模型是否可用。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.post(TEST_MODEL_ENDPOINT, headers=headers, json={ "model": model_name, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 10, "stream": False }, timeout=10) # 检查是否是429错误 if response.status_code == 429: return True response.json() # 尝试解析 JSON 响应 return True except requests.exceptions.RequestException as e: logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}") return False except ValueError: logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},响应不是有效的 JSON 格式") return False def load_keys(): """ 从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。 """ keys_str = os.environ.get("KEYS") test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") invalid_keys = [] free_keys = [] unverified_keys = [] valid_keys = [] if keys_str: keys = [key.strip() for key in keys_str.split(',')] logging.info(f"加载的 keys:{keys}") for key in keys: credit_summary = get_credit_summary(key) if credit_summary is None: invalid_keys.append(key) else: total_balance = credit_summary.get("total_balance", 0) if total_balance <= 0: free_keys.append(key) else: if test_model_availability(key, test_model): valid_keys.append(key) else: unverified_keys.append(key) logging.info(f"无效 KEY:{invalid_keys}") logging.info(f"免费 KEY:{free_keys}") logging.info(f"未实名 KEY:{unverified_keys}") logging.info(f"有效 KEY:{valid_keys}") # 更新全局的 key 列表 global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global invalid_keys_global = invalid_keys free_keys_global = free_keys unverified_keys_global = unverified_keys valid_keys_global = valid_keys else: logging.warning("环境变量 KEYS 未设置。") def get_all_models(api_key): """ 获取所有模型列表。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.get(MODELS_ENDPOINT, headers=headers, params={"sub_type": "chat"}) response.raise_for_status() data = response.json() # 确保 data 是字典且包含 'data' 键,'data' 对应的值是一个列表 if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list): return [model.get("id") for model in data["data"] if isinstance(model, dict) and "id" in model] else: logging.error("获取模型列表失败:响应数据格式不正确") return [] except requests.exceptions.RequestException as e: logging.error(f"获取模型列表失败,API Key:{api_key},错误信息:{e}") return [] except (KeyError, TypeError) as e: logging.error(f"解析模型列表失败,API Key:{api_key},错误信息:{e}") return [] def refresh_models(): """ 刷新模型列表和免费模型列表。 """ global all_models, free_models # 使用 valid_keys_global 中的第一个 key 获取完整模型列表 if valid_keys_global: all_models = get_all_models(valid_keys_global[0]) else: logging.warning("没有有效的key,无法获取完整模型列表。") all_models = [] # 从环境变量读取免费模型列表 free_models_str = os.environ.get("FREE_MODELS") if free_models_str: free_models = [model.strip() for model in free_models_str.split(',')] else: logging.warning("环境变量 FREE_MODELS 未设置或为空,免费模型列表将为空。") free_models = [] logging.info(f"所有模型列表:{all_models}") logging.info(f"免费模型列表:{free_models}") def determine_request_type(model_name): """ 根据用户请求的模型判断请求类型。 """ if model_name in free_models: return "free" elif model_name in all_models: return "paid" else: return "unknown" def select_key(request_type): """ 根据请求类型选择合适的 KEY。 """ if request_type == "free": # 免费请求:使用 2、3、4 类 KEY available_keys = free_keys_global + unverified_keys_global + valid_keys_global elif request_type == "paid": # 付费请求:使用 3、4 类 KEY available_keys = unverified_keys_global + valid_keys_global else: # 未知请求:使用所有 KEY available_keys = free_keys_global + unverified_keys_global + valid_keys_global if not available_keys: return None # 简单的轮询策略选择 KEY key = available_keys[int(time.time() * 1000) % len(available_keys)] return key def check_authorization(request): """ 检查请求头中的 Authorization 字段是否匹配环境变量 AUTHORIZATION_KEY。 """ authorization_key = os.environ.get("AUTHORIZATION_KEY") if not authorization_key: logging.warning("环境变量 AUTHORIZATION_KEY 未设置,请设置后重试。") return False auth_header = request.headers.get('Authorization') if not auth_header: logging.warning("请求头中缺少 Authorization 字段。") return False if auth_header != f"Bearer {authorization_key}": logging.warning(f"无效的 Authorization 密钥:{auth_header}") return False return True def get_total_balance(): """ 获取所有大于等于0的key的额度的和。 """ total_balance = 0 for key in free_keys_global + unverified_keys_global + valid_keys_global: credit_summary = get_credit_summary(key) if credit_summary is not None: total_balance += credit_summary.get("total_balance", 0) return total_balance # 创建一个后台调度器 scheduler = BackgroundScheduler() # 添加定时任务,每小时执行一次 load_keys 函数 scheduler.add_job(load_keys, 'interval', hours=1) # 添加定时任务,每10分钟执行一次 refresh_models 函数 scheduler.add_job(refresh_models, 'interval', minutes=10) @app.route('/') def index(): """ 处理根路由的访问请求。 """ return "

Welcome to SiliconFlow

" @app.route('/check_tokens', methods=['POST']) def check_tokens(): """ 处理前端发送的 Token 检测请求。 """ tokens = request.json.get('tokens', []) test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") results = [] for token in tokens: credit_summary = get_credit_summary(token) if credit_summary is None: results.append({"token": token, "type": "无效 KEY", "balance": 0, "message": "无法获取额度信息"}) else: total_balance = credit_summary.get("total_balance", 0) if total_balance <= 0: results.append({"token": token, "type": "免费 KEY", "balance": total_balance, "message": "额度不足"}) else: if test_model_availability(token, test_model): results.append({"token": token, "type": "有效 KEY", "balance": total_balance, "message": "可以使用指定模型"}) else: results.append({"token": token, "type": "未实名 KEY", "balance": total_balance, "message": "无法使用指定模型"}) return jsonify(results) @app.route('/handsome/v1/chat/completions', methods=['POST']) def handsome_chat_completions(): """ 处理 /handsome/v1/chat/completions 路由的请求,添加鉴权。 """ if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 data = request.get_json() if not data or 'model' not in data: return jsonify({"error": "Invalid request data"}), 400 model_name = data['model'] request_type = determine_request_type(model_name) api_key = select_key(request_type) if not api_key: return jsonify({"error": "No available API key for this request type"}), 400 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } # 转发请求到真正的 API try: response = requests.post( TEST_MODEL_ENDPOINT, headers=headers, json=data, stream=data.get("stream", False), timeout=60 ) # 检查是否是429错误 if response.status_code == 429: return jsonify(response.json()), 429 if data.get("stream", False): return Response(stream_with_context(response.iter_content(chunk_size=1024)), content_type=response.headers['Content-Type']) else: response.raise_for_status() return jsonify(response.json()) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 500 @app.route('/handsome/v1/models', methods=['GET']) def handsome_models(): """ 处理 /handsome/v1/models 路由的请求,添加鉴权,返回所有模型列表。 """ if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 return jsonify({"data": [{"id": model} for model in all_models]}) @app.route('/handsome/v1/balance', methods=['GET']) def handsome_balance(): """ 处理 /handsome/v1/balance 路由的请求,添加鉴权,返回所有可用KEY的额度总和。 """ if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 total_balance = get_total_balance() return jsonify({"total_balance": total_balance}) if __name__ == '__main__': # 打印所有环境变量,方便调试 logging.info(f"环境变量:{os.environ}") # 初始化全局的 key 列表 invalid_keys_global = [] free_keys_global = [] unverified_keys_global = [] valid_keys_global = [] # 启动调度器 scheduler.start() # 手动触发一次 load_keys 任务 load_keys() logging.info("首次加载 keys 已手动触发执行") # 手动触发一次 refresh_models 任务 refresh_models() logging.info("首次刷新模型列表已手动触发执行") # 启动 Flask 应用,监听所有 IP 的 7860 端口(Hugging Face Space 默认端口) app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))