|
from flask import Flask, request |
|
import requests |
|
import base64 |
|
import json |
|
import re |
|
|
|
app = Flask(__name__) |
|
|
|
def parse_node_input(node_input): |
|
# 使用正则表达式提取 proxies 部分 |
|
proxies_match = re.search(r'proxies:\s*\n([\s\S]*?)(?:\n\w+:|$)', node_input) |
|
if not proxies_match: |
|
return [] |
|
|
|
proxies_text = proxies_match.group(1) |
|
nodes = re.findall(r'^\s*-\s*({[^}]+})', proxies_text, re.MULTILINE) |
|
|
|
parsed_nodes = [] |
|
for node in nodes: |
|
node_dict = {} |
|
items = re.findall(r'(\w+):\s*([^,}\n]+)', node) |
|
for key, value in items: |
|
if key in ['port', 'alterId']: |
|
node_dict[key] = int(value) |
|
elif key in ['udp', 'tls']: |
|
node_dict[key] = value.lower() == 'true' |
|
else: |
|
node_dict[key] = value.strip("'\"") |
|
|
|
# 处理 ws-opts |
|
ws_opts_match = re.search(r'ws-opts:\s*({[^}]+})', node) |
|
if ws_opts_match: |
|
ws_opts = ws_opts_match.group(1) |
|
path_match = re.search(r'path:\s*([^,}]+)', ws_opts) |
|
if path_match: |
|
node_dict['ws-opts.path'] = path_match.group(1).strip("'\"") |
|
headers_match = re.search(r'headers:\s*({[^}]+})', ws_opts) |
|
if headers_match: |
|
headers = headers_match.group(1) |
|
host_match = re.search(r'Host:\s*([^,}]+)', headers) |
|
if host_match: |
|
node_dict['ws-opts.headers.Host'] = host_match.group(1).strip("'\"") |
|
|
|
parsed_nodes.append(node_dict) |
|
|
|
return parsed_nodes |
|
|
|
def convert_to_vmess(node): |
|
try: |
|
path = node.get('ws-opts.path', '') |
|
vmess_node = { |
|
"v": "2", |
|
"ps": node['name'], |
|
"add": node['server'], |
|
"port": str(node['port']), |
|
"id": node['uuid'], |
|
"aid": str(node.get('alterId', 0)), |
|
"scy": node.get('cipher', 'auto'), |
|
"net": node.get('network', 'tcp'), |
|
"type": "none", |
|
"host": node.get('ws-opts.headers.Host', node.get('servername', "")), |
|
"path": path, |
|
"tls": "tls" if node.get('tls', False) else "" |
|
} |
|
json_str = json.dumps(vmess_node, separators=(',', ':')) |
|
vmess_base64 = base64.b64encode(json_str.encode()).decode() |
|
return f"vmess://{vmess_base64}" |
|
except KeyError as e: |
|
return f"Error in VMess conversion: Missing {e} key" |
|
|
|
def convert_to_vless(node): |
|
try: |
|
params = [] |
|
if node.get('tls', False): |
|
params.append("security=tls") |
|
if node.get('flow', ''): |
|
params.append(f"flow={node['flow']}") |
|
if node.get('servername', ''): |
|
params.append(f"sni={node['servername']}") |
|
if node.get('ws-opts.path', ''): |
|
params.append(f"path={node['ws-opts.path']}") |
|
if node.get('ws-opts.headers.Host', ''): |
|
params.append(f"host={node['ws-opts.headers.Host']}") |
|
if node.get('client-fingerprint', ''): |
|
params.append(f"fp={node['client-fingerprint']}") |
|
|
|
param_str = "&".join(params) |
|
vless_link = f"vless://{node['uuid']}@{node['server']}:{node['port']}?{param_str}#{node['name']}" |
|
return vless_link |
|
except KeyError as e: |
|
return f"Error in VLess conversion: Missing {e} key" |
|
|
|
def convert_to_trojan(node): |
|
try: |
|
trojan_link = f"trojan://{node['password']}@{node['server']}:{node['port']}?sni={node.get('sni', '')}#{node['name']}" |
|
return trojan_link |
|
except KeyError as e: |
|
return f"Error in Trojan conversion: Missing {e} key" |
|
|
|
def convert_to_ss(node): |
|
try: |
|
ss_link = f"ss://{base64.b64encode(f'{node['cipher']}:{node['password']}'.encode()).decode()}@{node['server']}:{node['port']}#{node['name']}" |
|
return ss_link |
|
except KeyError as e: |
|
return f"Error in Shadowsocks conversion: Missing {e} key" |
|
|
|
def convert_to_hysteria2(node): |
|
try: |
|
hysteria2_link = ( |
|
f"hysteria2://{node['password']}@{node['server']}:{node['port']}" |
|
f"?auth={node.get('auth', '')}&skip-cert-verify={str(node.get('skip-cert-verify', False)).lower()}" |
|
f"&udp={str(node.get('udp', False)).lower()}#{node['name']}" |
|
) |
|
return hysteria2_link |
|
except KeyError as e: |
|
return f"Error in Hysteria2 conversion: Missing {e} key" |
|
|
|
@app.route('/', methods=['GET']) |
|
def convert_nodes(): |
|
url = request.args.get('url') |
|
if not url: |
|
return "Please provide a URL", 400 |
|
|
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
node_input = response.text |
|
except requests.RequestException as e: |
|
return f"Error fetching URL: {str(e)}", 500 |
|
|
|
nodes = parse_node_input(node_input) |
|
|
|
converted_nodes = [] |
|
|
|
for node in nodes: |
|
if node['type'] == 'ss': |
|
converted_nodes.append(convert_to_ss(node)) |
|
elif node['type'] == 'vmess': |
|
converted_nodes.append(convert_to_vmess(node)) |
|
elif node['type'] == 'vless': |
|
converted_nodes.append(convert_to_vless(node)) |
|
elif node['type'] == 'trojan': |
|
converted_nodes.append(convert_to_trojan(node)) |
|
elif node['type'] == 'hysteria2': |
|
converted_nodes.append(convert_to_hysteria2(node)) |
|
else: |
|
converted_nodes.append(f"Unknown node type: {node['type']}") |
|
|
|
result = "\n".join(converted_nodes) |
|
return result, 200, {'Content-Type': 'text/plain; charset=utf-8'} |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=8080) |
|
|