|
import os |
|
import time |
|
import urllib.parse |
|
import requests |
|
import random |
|
import hashlib |
|
import json |
|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, Response |
|
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user |
|
from werkzeug.security import check_password_hash, generate_password_hash |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.environ.get('SECRET_KEY', 'starsky_secret_key') |
|
|
|
|
|
login_manager = LoginManager() |
|
login_manager.init_app(app) |
|
login_manager.login_view = 'login' |
|
|
|
|
|
class User(UserMixin): |
|
def __init__(self, id, username, password_hash): |
|
self.id = id |
|
self.username = username |
|
self.password_hash = password_hash |
|
|
|
|
|
default_username = os.environ.get('ADMIN_USERNAME', 'admin') |
|
default_password = os.environ.get('ADMIN_PASSWORD', 'admin123') |
|
|
|
|
|
users = { |
|
default_username: User( |
|
default_username, |
|
default_username, |
|
generate_password_hash(default_password) |
|
) |
|
} |
|
|
|
|
|
class ProxySystem: |
|
def __init__(self): |
|
|
|
self.apis = [ |
|
"https://api.v1.mk/sub", |
|
"https://pub-api-1.bianyuan.xyz/sub" |
|
] |
|
|
|
|
|
self._cache_file = "cache.json" |
|
self._cache = {} |
|
self._cache_time = {} |
|
self._cache_ttl = 3600 |
|
self._load_cache() |
|
|
|
|
|
self.stats = { |
|
"cache_hits": 0, |
|
"cache_misses": 0, |
|
"api_successes": 0, |
|
"api_failures": 0 |
|
} |
|
|
|
def _load_cache(self): |
|
"""从磁盘加载缓存""" |
|
try: |
|
if os.path.exists(self._cache_file): |
|
with open(self._cache_file, 'r') as f: |
|
cache_data = json.load(f) |
|
self._cache = cache_data.get('cache', {}) |
|
self._cache_time = cache_data.get('cache_time', {}) |
|
print(f"已加载 {len(self._cache)} 条缓存数据") |
|
except Exception as e: |
|
print(f"加载缓存失败: {e}") |
|
self._cache = {} |
|
self._cache_time = {} |
|
|
|
def _save_cache(self): |
|
"""保存缓存到磁盘""" |
|
try: |
|
|
|
if len(self._cache) > 100: |
|
|
|
sorted_keys = sorted(self._cache_time.items(), key=lambda x: x[1]) |
|
keys_to_remove = [k for k, v in sorted_keys[:len(self._cache) - 100]] |
|
for key in keys_to_remove: |
|
self._cache.pop(key, None) |
|
self._cache_time.pop(key, None) |
|
|
|
cache_data = { |
|
'cache': self._cache, |
|
'cache_time': self._cache_time |
|
} |
|
|
|
with open(self._cache_file, 'w') as f: |
|
json.dump(cache_data, f) |
|
except Exception as e: |
|
print(f"保存缓存失败: {e}") |
|
|
|
def get_random_api(self): |
|
"""获取随机API""" |
|
return random.choice(self.apis) |
|
|
|
def get_cache_key(self, params): |
|
"""生成缓存键""" |
|
|
|
sorted_params = sorted(params.items()) |
|
param_str = urllib.parse.urlencode(sorted_params) |
|
return hashlib.md5(param_str.encode()).hexdigest() |
|
|
|
def get_from_cache(self, params): |
|
"""从缓存获取结果""" |
|
key = self.get_cache_key(params) |
|
current_time = time.time() |
|
|
|
if key in self._cache and current_time - self._cache_time.get(key, 0) < self._cache_ttl: |
|
self.stats["cache_hits"] += 1 |
|
return self._cache.get(key) |
|
|
|
self.stats["cache_misses"] += 1 |
|
return None |
|
|
|
def save_to_cache(self, params, data): |
|
"""保存到缓存""" |
|
key = self.get_cache_key(params) |
|
self._cache[key] = data |
|
self._cache_time[key] = time.time() |
|
|
|
|
|
if self.stats["cache_misses"] % 5 == 0: |
|
self._save_cache() |
|
|
|
def convert(self, params): |
|
"""执行转换,优先使用缓存""" |
|
|
|
cache_result = self.get_from_cache(params) |
|
if cache_result: |
|
return cache_result |
|
|
|
|
|
errors = [] |
|
|
|
|
|
apis = list(self.apis) |
|
random.shuffle(apis) |
|
|
|
for api_url in apis: |
|
try: |
|
|
|
response = requests.get(api_url, params=params, timeout=30) |
|
|
|
if response.status_code == 200: |
|
|
|
content = response.content |
|
content_type = response.headers.get('Content-Type', 'text/plain') |
|
|
|
|
|
cache_data = { |
|
'content': content.decode('utf-8', errors='ignore'), |
|
'content_type': content_type |
|
} |
|
self.save_to_cache(params, cache_data) |
|
|
|
self.stats["api_successes"] += 1 |
|
return content, content_type |
|
else: |
|
errors.append(f"API {api_url} 返回状态码: {response.status_code}") |
|
except Exception as e: |
|
errors.append(f"API {api_url} 请求失败: {str(e)}") |
|
self.stats["api_failures"] += 1 |
|
|
|
|
|
error_message = "所有转换API均不可用:\n" + "\n".join(errors) |
|
raise Exception(error_message) |
|
|
|
def get_stats(self): |
|
"""获取统计信息""" |
|
return { |
|
"cache_size": len(self._cache), |
|
"cache_hits": self.stats["cache_hits"], |
|
"cache_misses": self.stats["cache_misses"], |
|
"api_successes": self.stats["api_successes"], |
|
"api_failures": self.stats["api_failures"], |
|
"cache_hit_ratio": self.stats["cache_hits"] / (self.stats["cache_hits"] + self.stats["cache_misses"]) if (self.stats["cache_hits"] + self.stats["cache_misses"]) > 0 else 0 |
|
} |
|
|
|
|
|
proxy_system = ProxySystem() |
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
return users.get(user_id) |
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
password = request.form.get('password') |
|
|
|
user = users.get(username) |
|
if user and check_password_hash(user.password_hash, password): |
|
login_user(user) |
|
return redirect(url_for('index')) |
|
else: |
|
flash('用户名或密码错误') |
|
|
|
return render_template('login.html') |
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
logout_user() |
|
return redirect(url_for('login')) |
|
|
|
@app.route('/') |
|
@login_required |
|
def index(): |
|
|
|
try: |
|
stats = proxy_system.get_stats() |
|
cache_hit_ratio = f"{stats['cache_hit_ratio']*100:.1f}%" if stats['cache_hit_ratio'] > 0 else "0%" |
|
except: |
|
cache_hit_ratio = "计算中" |
|
|
|
return render_template('index.html', |
|
conversion_mode="隐私代理+本地缓存", |
|
cache_hit_ratio=cache_hit_ratio) |
|
|
|
@app.route('/convert', methods=['POST']) |
|
@login_required |
|
def convert(): |
|
|
|
backend_url = request.form.get('backend_url', 'https://raw.githubusercontent.com/yuanwangokk-1/subscribe/refs/heads/main/ACL4SSR/ACL4SSR.ini') |
|
target = request.form.get('target', 'clash') |
|
original_url = request.form.get('original_url', '') |
|
|
|
if not original_url: |
|
return jsonify({"status": "error", "message": "订阅链接不能为空"}) |
|
|
|
try: |
|
|
|
params = { |
|
'target': target, |
|
'url': original_url, |
|
'config': backend_url, |
|
'emoji': 'true', |
|
'list': 'false', |
|
'udp': 'false', |
|
'tfo': 'false', |
|
'expand': 'true', |
|
'scv': 'false', |
|
'fdn': 'false', |
|
'new_name': 'true' |
|
} |
|
|
|
|
|
server_url = request.url_root.rstrip('/') |
|
proxy_url = f"{server_url}/api/sub?{urllib.parse.urlencode(params)}" |
|
|
|
return jsonify({ |
|
"status": "success", |
|
"result": proxy_url, |
|
"mode": "proxy" |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"status": "error", "message": f"处理失败: {str(e)}"}) |
|
|
|
@app.route('/api/sub') |
|
def api_sub_proxy(): |
|
"""智能代理API - 带缓存""" |
|
try: |
|
|
|
params = request.args.to_dict() |
|
|
|
|
|
try: |
|
|
|
cache_result = proxy_system.get_from_cache(params) |
|
if cache_result: |
|
|
|
content = cache_result.get('content', '').encode('utf-8') |
|
content_type = cache_result.get('content_type', 'text/plain') |
|
|
|
headers = { |
|
'Content-Type': content_type, |
|
'X-Proxy-By': 'Your Private Space', |
|
'X-Cache': 'HIT', |
|
'X-Content-Type-Options': 'nosniff', |
|
'Cache-Control': 'max-age=3600' |
|
} |
|
|
|
return Response(content, headers=headers) |
|
|
|
|
|
content, content_type = proxy_system.convert(params) |
|
|
|
|
|
headers = { |
|
'Content-Type': content_type, |
|
'X-Proxy-By': 'Your Private Space', |
|
'X-Cache': 'MISS', |
|
'X-Content-Type-Options': 'nosniff', |
|
'Cache-Control': 'max-age=3600' |
|
} |
|
|
|
return Response(content, headers=headers) |
|
except Exception as e: |
|
return f"转换失败: {str(e)}", 500 |
|
|
|
except Exception as e: |
|
return f"请求处理失败: {str(e)}", 500 |
|
|
|
|
|
@app.route('/stats') |
|
@login_required |
|
def show_stats(): |
|
stats = proxy_system.get_stats() |
|
return jsonify(stats) |
|
|
|
|
|
@app.route('/clear-cache', methods=['POST']) |
|
@login_required |
|
def clear_cache(): |
|
try: |
|
proxy_system._cache = {} |
|
proxy_system._cache_time = {} |
|
proxy_system._save_cache() |
|
return jsonify({"status": "success", "message": "缓存已清除"}) |
|
except Exception as e: |
|
return jsonify({"status": "error", "message": f"清除缓存失败: {str(e)}"}) |
|
|
|
|
|
if __name__ == '__main__' and os.environ.get('DEVELOPMENT') == 'true': |
|
app.run(host='0.0.0.0', port=7860, debug=True) |
|
|