|
from atexit import register as exit_register |
|
from os import close, read, name as os_name |
|
from pathlib import Path |
|
from pty import openpty |
|
from re import compile, search |
|
from shutil import move |
|
from subprocess import PIPE, Popen, STDOUT, check_output, CalledProcessError |
|
from sys import stdout |
|
from time import sleep, time |
|
from urllib.parse import unquote, urlparse |
|
|
|
from requests import get as get_url, head as get_head |
|
from requests.structures import CaseInsensitiveDict |
|
|
|
WORK_FOLDER = Path('/content') |
|
|
|
zrok_bin = WORK_FOLDER / 'zrok' |
|
zrok_fallback_tokens = [ |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
'твои токены на https://myzrok.io', 'твои токены на https://myzrok.io', |
|
] |
|
claudflare_bin = WORK_FOLDER / 'claudflared' |
|
tmole_bin = WORK_FOLDER / 'tmole' |
|
tunwg_bin = WORK_FOLDER / 'tunwg' |
|
go_localt_bin = WORK_FOLDER / 'go_localt' |
|
gradio_bin = WORK_FOLDER / 'frpc_linux_amd64' |
|
colab_native_url = WORK_FOLDER / 'colab_url.txt' |
|
|
|
links_file = WORK_FOLDER / 'links.txt' |
|
|
|
|
|
def is_list(variable) -> bool: |
|
return isinstance(variable, list) |
|
|
|
|
|
def is_str(variable) -> bool: |
|
return isinstance(variable, str) |
|
|
|
|
|
def determine_archive_format(filepath: str | Path) -> str | None: |
|
filepath = Path(filepath) |
|
zip_signature = bytes([0x50, 0x4B, 0x03, 0x04]) |
|
seven_z_signature = bytes([0x37, 0x7A, 0xBC, 0xAF, 0x27, 0x1C]) |
|
lzma_xz_signature = bytes([0xFD, 0x37, 0x7A, 0x58, 0x5A]) |
|
tgz_signature = bytes([0x1F, 0x8B]) |
|
tbz_signature = bytes([0x42, 0x5A, 0x68]) |
|
ustar_signature = bytes([0x75, 0x73, 0x74, 0x61, 0x72]) |
|
with filepath.open('rb') as file: |
|
header = file.read(262) |
|
if header.startswith(zip_signature): |
|
return 'zip' |
|
elif header.startswith(seven_z_signature) or header.startswith(lzma_xz_signature): |
|
return '7z' |
|
elif header.startswith(tgz_signature): |
|
return 'tar.gz' |
|
elif header.startswith(tbz_signature): |
|
return 'tar.bz2' |
|
elif header[0x101:0x101 + len(ustar_signature)] == ustar_signature: |
|
return 'tar' |
|
return None |
|
|
|
|
|
def unpack_archive(archive_path: str | Path, dest_path: str | Path, rm_archive: bool = True): |
|
if str(archive_path).startswith(('https://', 'http://')): |
|
archive_path = download(archive_path, save_path=WORK_FOLDER, progress=False) |
|
if not Path(archive_path).exists(): |
|
raise RuntimeError(f'архив {archive_path} не найден.') |
|
archive_path = Path(archive_path) |
|
dest_path = Path(dest_path) |
|
|
|
if not archive_path.exists(): |
|
raise RuntimeError(f'архив {archive_path} не найден.') |
|
|
|
determine_format = determine_archive_format(archive_path) |
|
|
|
try: |
|
if determine_format == '7z' or archive_path.suffix == '.7z': |
|
run(f'7z -bso0 -bd -slp -y x {str(archive_path)} -o{str(dest_path)}') |
|
archive_path.unlink(missing_ok=True) if rm_archive else None |
|
elif determine_format == 'tar' or archive_path.suffix in ['.tar']: |
|
run(f'tar -xvpf {str(archive_path)} -C {str(dest_path)}') |
|
archive_path.unlink(missing_ok=True) if rm_archive else None |
|
elif determine_format == 'tar.gz' or archive_path.suffix in ['.tar.gz', '.tar.bz2', '.tar.xz']: |
|
result = run(f'tar -xvzpf {str(archive_path)} -C {str(dest_path)}') |
|
if result['status_code'] != 0: |
|
run(f'gzip -d {str(archive_path)}', dest_path) |
|
archive_path.unlink(missing_ok=True) if rm_archive else None |
|
elif determine_format == 'zip' or archive_path.suffix == '.zip': |
|
run(f'unzip {str(archive_path)} -d {str(dest_path)}') |
|
archive_path.unlink(missing_ok=True) if rm_archive else None |
|
else: |
|
run(f'7z -bso0 -bd -slp -y x {str(archive_path)} -o{str(dest_path)}') |
|
archive_path.unlink(missing_ok=True) if rm_archive else None |
|
except: |
|
try: |
|
run(f'7z -bso0 -bd -mmt4 -slp -y x {str(archive_path)} -o{str(dest_path)}') |
|
archive_path.unlink(missing_ok=True) if rm_archive else None |
|
except: |
|
raise RuntimeError( |
|
f'формат архива {archive_path.suffix} не определен, нужно задать формат в "determine_format".') |
|
|
|
|
|
def run(command: str, cwd: str | Path | None = None, live_output: bool = False) -> dict: |
|
process = Popen(command, shell=True, cwd=cwd, stdout=PIPE, stderr=STDOUT) |
|
encodings = ['iso-8859-5', 'windows-1251', 'iso-8859-1', 'cp866', 'koi8-r', 'mac_cyrillic'] |
|
is_progress_bar_pattern = r'\d+%|\d+/\d+' |
|
|
|
def decode_output(output_data): |
|
for encoding in encodings: |
|
try: |
|
return output_data.decode(encoding) |
|
except UnicodeDecodeError: |
|
continue |
|
return output_data.decode('utf-8', errors='replace') |
|
|
|
final_output = [] |
|
last_progress_bar = '' |
|
|
|
def process_output(): |
|
nonlocal last_progress_bar |
|
for line in iter(process.stdout.readline, b''): |
|
try: |
|
line_decoded = line.decode('utf-8').strip() |
|
except UnicodeDecodeError: |
|
line_decoded = decode_output(line.strip()) |
|
|
|
if search(is_progress_bar_pattern, line_decoded): |
|
last_progress_bar = line_decoded |
|
if live_output: |
|
stdout.write('\r' + line_decoded) |
|
else: |
|
final_output.append(line_decoded) |
|
if live_output: |
|
stdout.write('\n' + line_decoded) |
|
|
|
if live_output: stdout.flush() |
|
|
|
process_output() |
|
process.wait() |
|
if last_progress_bar: |
|
final_output.append(last_progress_bar) |
|
return { |
|
'status_code': process.returncode, |
|
'output': '\n'.join(final_output) |
|
} |
|
|
|
|
|
def is_valid_url(url: str) -> bool: |
|
headers = None |
|
for attempt in range(2): |
|
try: |
|
with get_url(url, headers=headers, allow_redirects=True, stream=True) as response: |
|
if 200 <= response.status_code < 300: |
|
return True |
|
except: |
|
try: |
|
response = get_head(url, headers=headers, allow_redirects=True) |
|
if 200 <= response.status_code < 300: |
|
return True |
|
except: |
|
pass |
|
else: |
|
break |
|
return False |
|
|
|
|
|
def get_filename_from_headers(requests_headers: CaseInsensitiveDict) -> str | None: |
|
content_disposition = requests_headers.get('content-disposition') |
|
if not content_disposition: |
|
return requests_headers.get('filename') |
|
parts = content_disposition.split(';') |
|
filename = None |
|
for part in parts: |
|
part = part.strip() |
|
if part.startswith('filename*='): |
|
encoding, _, encoded_filename = part[len('filename*='):].partition("''") |
|
filename = unquote(encoded_filename, encoding=encoding) |
|
break |
|
elif part.startswith('filename='): |
|
filename = part[len('filename='):].strip('"') |
|
break |
|
return filename |
|
|
|
|
|
def download(url: str, filename: str | Path | None = None, save_path: str | Path | None = None, |
|
progress: bool = True) -> Path | None: |
|
headers = None |
|
url_with_header = url.replace('"', '').replace("'", '').split('--header=') |
|
if len(url_with_header) > 1: |
|
url = (url_with_header[0]).strip() |
|
header = url_with_header[1] |
|
headers = { |
|
header.split(':')[0].strip(): header.split(':')[1].strip() |
|
} |
|
if is_valid_url(url): |
|
save_path = Path(save_path) if save_path else Path.cwd() |
|
save_path.mkdir(parents=True, exist_ok=True) |
|
|
|
with get_url(url, stream=True, allow_redirects=True, headers=headers) as request: |
|
file_size = int(request.headers.get('content-length', 0)) |
|
file_name = filename or get_filename_from_headers(request.headers) or Path(urlparse(request.url).path).name |
|
file_path = save_path / file_name |
|
|
|
chunk_size = max(4096, file_size // 2000) |
|
downloaded_size = 0 |
|
try: |
|
with open(file_path, 'wb') as fp: |
|
start = time() |
|
for chunk in request.iter_content(chunk_size=chunk_size): |
|
if chunk: |
|
fp.write(chunk) |
|
if progress: |
|
downloaded_size += len(chunk) |
|
percent_completed = downloaded_size / file_size * 100 |
|
elapsed_time = time() - start |
|
print(f'\rзагрузка {file_name}: {percent_completed:.2f}% | {elapsed_time:.2f} сек.', |
|
end='') |
|
except Exception as e: |
|
raise RuntimeError(f'не удалось загрузить файл по ссылке {url}:\n{e}') |
|
return file_path |
|
else: |
|
raise RuntimeError(f'недействительная ссылка на файл: {url}') |
|
|
|
|
|
def is_process_running(process_name: str | Path) -> bool: |
|
try: |
|
output = check_output(['pgrep', '-f', str(process_name)], text=True) |
|
return len(output.strip().split('\n')) > 0 |
|
except CalledProcessError: |
|
return False |
|
|
|
|
|
def move_path(old_path: Path | str, new_path: Path | str): |
|
old, new = Path(old_path), Path(new_path) |
|
if old_path.exists(): |
|
try: |
|
old_path.replace(new) |
|
except: |
|
try: |
|
move(old, new) |
|
except: |
|
if os_name == 'posix': |
|
run(f'mv "{old}" "{new}"') |
|
else: |
|
run(f'move "{old}" "{new}"') |
|
else: |
|
raise RuntimeError(f'не найден исходный путь для перемещения: {old}') |
|
|
|
|
|
def terminate_process(process_name: str, process_obj: Popen): |
|
process_obj.stdout.close() |
|
process_obj.stderr.close() |
|
process_obj.terminate() |
|
run(f'pkill -f {process_name}') |
|
|
|
|
|
def is_ipv4(address: str) -> bool: |
|
ipv4_pattern = compile(r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$') |
|
if ipv4_pattern.match(address): |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def get_revproxy_url(bin_url: str, need_unpack: bool, bin_path: Path, start_commands: list, read_from_stderr: bool, |
|
lines_to_read: int, url_pattern: str = r'https://\S+', write_link: bool = False) -> str: |
|
if not bin_path.exists(): |
|
files_before = {item.name for item in WORK_FOLDER.iterdir() if item.is_file()} |
|
if need_unpack: |
|
unpack_archive(download(bin_url, progress=False), WORK_FOLDER, rm_archive=True) |
|
else: |
|
download(bin_url, save_path=bin_path.parent, progress=False) |
|
files_after = {item.name for item in WORK_FOLDER.iterdir() if item.is_file()} |
|
new_files = files_after - files_before |
|
if len(new_files) == 1: |
|
unpacked_file = WORK_FOLDER / new_files.pop() |
|
move_path(unpacked_file, bin_path) |
|
else: |
|
unpacked_files = ', '.join(new_files) |
|
raise RuntimeError(f'ошибка при определении нового файла после распаковки!\n' |
|
f'ожидалось что за время работы добавится один файл (распакованный бинарник),\n' |
|
f'а добавилсь эти: {unpacked_files}') |
|
bin_path.chmod(0o777) |
|
|
|
if is_process_running(bin_path.name): |
|
run(f'pkill -f {bin_path.name}') |
|
|
|
process = Popen(start_commands, stdout=PIPE, stderr=PIPE, text=True) |
|
lines = [] |
|
stream = process.stderr if read_from_stderr else process.stdout |
|
for _ in range(lines_to_read): |
|
line = stream.readline() |
|
|
|
if len(lines) == lines_to_read or not line: |
|
break |
|
lines.append(line.strip()) |
|
|
|
ipv4 = next((line for line in lines if is_ipv4(line)), None) |
|
|
|
urls = [search(url_pattern, url).group() for url in lines if search(url_pattern, url)] |
|
|
|
if urls: |
|
exit_register(terminate_process, bin_path.name, process) |
|
if write_link: |
|
links_file.write_text(urls[0]) |
|
return f'{urls[0]}\n пароль(IP): {ipv4}' if ipv4 else urls[0] |
|
else: |
|
run(f'pkill -f {bin_path.name}') |
|
output = '\n'.join(lines) |
|
raise RuntimeError(f'ссылку получить не удалось, вывод работы бинарника:\n{output}') |
|
|
|
|
|
def get_tmole_url(port: int) -> str: |
|
return get_revproxy_url( |
|
bin_url='https://tunnelmole.com/downloads/tmole-linux.gz', |
|
need_unpack=True, |
|
bin_path=Path(tmole_bin), |
|
start_commands=[str(Path(tmole_bin)), str(port)], |
|
read_from_stderr=False, |
|
lines_to_read=5 |
|
) |
|
|
|
|
|
def get_tunwg_url(port: int) -> str: |
|
return get_revproxy_url( |
|
bin_url='https://github.com/ntnj/tunwg/releases/latest/download/tunwg', |
|
need_unpack=False, |
|
bin_path=Path(tunwg_bin), |
|
start_commands=[f'{tunwg_bin}', f'--forward=http://127.0.0.1:{port}'], |
|
read_from_stderr=True, |
|
lines_to_read=2 |
|
) |
|
|
|
|
|
def get_cloudflared_url(port: int) -> str: |
|
return get_revproxy_url( |
|
bin_url='https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64', |
|
need_unpack=False, |
|
bin_path=Path(claudflare_bin), |
|
start_commands=[f'{claudflare_bin}', 'tunnel', '--url', f'http://127.0.0.1:{port}'], |
|
read_from_stderr=True, |
|
lines_to_read=6, |
|
url_pattern=r'(?P<url>https?://\S+\.trycloudflare\.com)' |
|
) |
|
|
|
|
|
def get_localt_url(port: int) -> str: |
|
return get_revproxy_url( |
|
bin_url='https://huggingface.co/prolapse/go_localt/resolve/main/go_localt', |
|
need_unpack=False, |
|
bin_path=Path(go_localt_bin), |
|
start_commands=[f'{go_localt_bin}', f'{port}'], |
|
read_from_stderr=False, |
|
lines_to_read=2 |
|
) |
|
|
|
|
|
def zrok_token_is_valid(zrok_token: str) -> bool: |
|
try: |
|
run(f'./{zrok_bin.name} disable', cwd=zrok_bin.parent) |
|
except: |
|
pass |
|
master, slave = openpty() |
|
process = Popen(f'./{zrok_bin.name} enable {zrok_token}', shell=True, cwd=zrok_bin.parent, stdin=slave, |
|
stdout=slave, stderr=slave, text=True) |
|
close(slave) |
|
output = [] |
|
try: |
|
while True: |
|
try: |
|
data = read(master, 1024) |
|
except OSError: |
|
break |
|
if not data: |
|
break |
|
output.append(data.decode('utf-8')) |
|
except: |
|
pass |
|
close(master) |
|
process.wait() |
|
full_output = ''.join(output) |
|
if 'successfully enabled' in full_output: |
|
return True |
|
|
|
return False |
|
|
|
|
|
def get_zrok_token() -> str: |
|
for token in zrok_fallback_tokens: |
|
if zrok_token_is_valid(token): |
|
zrok_token = token |
|
break |
|
else: |
|
raise RuntimeError('не удалось найти валидный zrok-токен.') |
|
return zrok_token |
|
|
|
|
|
def get_zrok_url(port: int) -> str: |
|
zrok_bin_url = 'https://huggingface.co/prolapse/zrok/resolve/main/zrok.tar' |
|
if not zrok_bin.exists(): |
|
unpack_archive(download(zrok_bin_url, progress=False), WORK_FOLDER, rm_archive=True) |
|
get_zrok_token() |
|
com = f'{zrok_bin} share public http://localhost:{port}/ --headless'.split(' ') |
|
return get_revproxy_url( |
|
bin_url=zrok_bin_url, |
|
need_unpack=True, |
|
bin_path=Path(zrok_bin), |
|
start_commands=com, |
|
read_from_stderr=True, |
|
lines_to_read=2, |
|
url_pattern=r'(?P<url>https?://\S+\.share\.zrok\.io)' |
|
) |
|
|
|
|
|
def get_gradio_url(port: int) -> str | None: |
|
max_attempts = 3 |
|
for attempt in range(max_attempts): |
|
try: |
|
response = get_url('https://api.gradio.app/v2/tunnel-request') |
|
if response and response.status_code == 200: |
|
remote_host, remote_port = response.json()[0]['host'], int(response.json()[0]['port']) |
|
com = [f'{gradio_bin}', 'http', '-n', 'random', '-l', f'{port}', '-i', '127.0.0.1', '--uc', '--sd', |
|
'random', '--ue', '--server_addr', f'{remote_host}:{remote_port}', '--disable_log_color'] |
|
return get_revproxy_url( |
|
bin_url='https://cdn-media.huggingface.co/frpc-gradio-0.1/frpc_linux_amd64', |
|
need_unpack=False, |
|
bin_path=Path(gradio_bin), |
|
start_commands=com, |
|
read_from_stderr=False, |
|
lines_to_read=3, |
|
) |
|
except Exception as e: |
|
if attempt < max_attempts - 1: |
|
print(f'попытка {attempt + 1} получить ссылку градио провалилась: {e}. пробуем еще...') |
|
sleep(5) |
|
return None |
|
else: |
|
raise RuntimeError(f'после трех попыток поднять туннель градио не удалось: {e}') |
|
return None |
|
|
|
|
|
proxies_functions = { |
|
'zrok': get_zrok_url, |
|
'tmole': get_tmole_url, |
|
'tunwg': get_tunwg_url, |
|
'cloudflared': get_cloudflared_url, |
|
'localt': get_localt_url, |
|
'gradio': get_gradio_url |
|
} |
|
|
|
|
|
def try_all(port: int) -> str: |
|
results = [] |
|
for proxy_name, proxy_func in proxies_functions.items(): |
|
try: |
|
url = proxy_func(port) |
|
results.append(url) |
|
except Exception as e: |
|
print(f'{proxy_name}: {e}') |
|
return '\n'.join(results) |
|
|
|
|
|
def get_share_link(host: str, port: int) -> str: |
|
if host in proxies_functions: |
|
return proxies_functions[host](port) |
|
elif host == 'all': |
|
return try_all(port) |
|
else: |
|
available_proxies_functions = ', '.join([func.__name__ for func in proxies_functions.values()]) |
|
return f'у меня нет функции ассоциированной с {host}, доступные функции:\n{available_proxies_functions}' |
|
|
|
|
|
|
|
|
|
""" |
|
|
|
# как использовать: |
|
|
|
port = 7860 # порт, на котором запущен веб-интерфейс |
|
|
|
# чтобы использовать определенный прокси-сервер, нужно передать его название как первый аргумент: |
|
|
|
link = get_share_link('gradio', port) |
|
print(link) |
|
|
|
link = get_share_link('zrok', port) |
|
print(link) |
|
|
|
link = get_share_link('cloudflared', port) |
|
print(link) |
|
|
|
link = get_share_link('tmole', port) |
|
print(link) |
|
|
|
link = get_share_link('tunwg', port) |
|
print(link) |
|
|
|
link = get_share_link('localt', port) |
|
print(link) |
|
|
|
# или все сразу (не рекомендуется): |
|
all_possible_links = get_share_link('all', port) |
|
print(all_possible_links) |
|
|
|
""" |
|
|