|
import re |
|
import yaml |
|
import aiohttp |
|
import asyncio |
|
import datetime |
|
import sys |
|
import traceback |
|
from aiohttp import web |
|
from urllib.parse import parse_qs |
|
from cachetools import TTLCache |
|
from functools import partial |
|
|
|
|
|
cache = TTLCache(maxsize=1000, ttl=1800) |
|
|
|
async def fetch_url(url, session): |
|
async with session.get(url) as response: |
|
return await response.text() |
|
|
|
async def extract_and_transform_proxies(input_text): |
|
|
|
pattern = r'([a-zA-Z0-9+/=]+)(?:@|:\/\/)([^:]+):(\d+)' |
|
matches = re.findall(pattern, input_text) |
|
|
|
proxies = [] |
|
for match in matches: |
|
encoded_info, server, port = match |
|
try: |
|
decoded_info = base64.b64decode(encoded_info).decode('utf-8') |
|
method, password = decoded_info.split(':') |
|
proxy = { |
|
'name': f'{server}:{port}', |
|
'type': 'ss', |
|
'server': server, |
|
'port': int(port), |
|
'cipher': method, |
|
'password': password |
|
} |
|
proxies.append(proxy) |
|
except: |
|
continue |
|
|
|
|
|
yaml_data = yaml.dump({'proxies': proxies}, allow_unicode=True) |
|
return yaml_data |
|
|
|
async def log_request(request, response): |
|
print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - " |
|
f"Request: {request.method} {request.path} - " |
|
f"Response: {response.status}", flush=True) |
|
|
|
@web.middleware |
|
async def logging_middleware(request, handler): |
|
response = await handler(request) |
|
await log_request(request, response) |
|
return response |
|
|
|
async def handle_request(request): |
|
if request.path == '/': |
|
query_params = parse_qs(request.query_string) |
|
if 'url' in query_params: |
|
url = query_params['url'][0] |
|
force_refresh = 'nocache' in query_params |
|
|
|
if not force_refresh and url in cache: |
|
print(f"Cache hit for URL: {url}", flush=True) |
|
return web.Response(text=cache[url], content_type='text/plain') |
|
|
|
try: |
|
print(f"Fetching URL: {url}", flush=True) |
|
async with aiohttp.ClientSession() as session: |
|
input_text = await fetch_url(url, session) |
|
print(f"URL content length: {len(input_text)}", flush=True) |
|
result = await extract_and_transform_proxies(input_text) |
|
print(f"Transformed result length: {len(result)}", flush=True) |
|
|
|
|
|
cache[url] = result |
|
|
|
return web.Response(text=result, content_type='text/plain') |
|
except Exception as e: |
|
print(f"Error processing request: {str(e)}", flush=True) |
|
traceback.print_exc() |
|
return web.Response(text=f"Error: {str(e)}", status=500) |
|
else: |
|
usage_guide = """ |
|
<html> |
|
<body> |
|
<h1>代理配置转换工具</h1> |
|
<p>使用方法:在URL参数中提供包含代理配置的网址。</p> |
|
<p>示例:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config</code></p> |
|
<p>强制刷新缓存:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config&nocache</code></p> |
|
</body> |
|
</html> |
|
""" |
|
return web.Response(text=usage_guide, content_type='text/html') |
|
else: |
|
return web.Response(text="Not Found", status=404) |
|
|
|
async def init_app(): |
|
app = web.Application(middlewares=[logging_middleware]) |
|
app.router.add_get('/', handle_request) |
|
return app |
|
|
|
if __name__ == "__main__": |
|
print(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") |
|
print("Server running on port 8080") |
|
web.run_app(init_app(), port=8080, print=lambda _: None) |
|
|