| import os |
| import uuid |
| import json |
| import time |
| import threading |
| import qrcode |
| import io |
| import base64 |
| import hashlib |
| import secrets |
| import sqlite3 |
| from datetime import datetime, timedelta |
| from flask import Flask, request, jsonify, render_template, send_file, session |
| from flask_socketio import SocketIO, emit, join_room |
| from werkzeug.utils import secure_filename |
| from functools import wraps |
|
|
| app = Flask(__name__) |
| app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', secrets.token_hex(32)) |
| app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 |
| app.config['UPLOAD_FOLDER'] = '/tmp/tds_uploads' |
| app.config['SESSION_COOKIE_HTTPONLY'] = True |
| app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30) |
|
|
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
| socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') |
|
|
| DB_PATH = '/tmp/tds.db' |
|
|
| |
| def get_db(): |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
| def init_db(): |
| with get_db() as db: |
| db.execute(''' |
| CREATE TABLE IF NOT EXISTS users ( |
| id TEXT PRIMARY KEY, |
| cp TEXT UNIQUE NOT NULL, |
| name TEXT NOT NULL, |
| birth TEXT NOT NULL, |
| age INTEGER NOT NULL, |
| password_hash TEXT NOT NULL, |
| usb_key_hash TEXT, |
| usb_key_id TEXT, |
| is_admin INTEGER DEFAULT 0, |
| created_at TEXT NOT NULL |
| ) |
| ''') |
| db.execute(''' |
| CREATE TABLE IF NOT EXISTS auth_tokens ( |
| token TEXT PRIMARY KEY, |
| user_id TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| expires_at TEXT NOT NULL |
| ) |
| ''') |
| |
| admin = db.execute("SELECT id FROM users WHERE cp = '00000000'").fetchone() |
| if not admin: |
| admin_id = str(uuid.uuid4()) |
| admin_pass = hash_password(os.environ.get('ADMIN_PASSWORD', 'admin2025')) |
| db.execute(''' |
| INSERT INTO users (id, cp, name, birth, age, password_hash, is_admin, created_at) |
| VALUES (?, '00000000', 'Administrator', '2000-01-01', 25, ?, 1, ?) |
| ''', (admin_id, admin_pass, datetime.utcnow().isoformat())) |
| db.commit() |
|
|
| def hash_password(pw: str) -> str: |
| return hashlib.sha256(pw.encode()).hexdigest() |
|
|
| def generate_cp() -> str: |
| while True: |
| cp = str(secrets.randbelow(90000000) + 10000000) |
| with get_db() as db: |
| exists = db.execute("SELECT id FROM users WHERE cp = ?", (cp,)).fetchone() |
| if not exists: |
| return cp |
|
|
| def create_token(user_id: str) -> str: |
| token = secrets.token_hex(32) |
| expires = (datetime.utcnow() + timedelta(days=30)).isoformat() |
| with get_db() as db: |
| db.execute("INSERT INTO auth_tokens VALUES (?, ?, ?, ?)", |
| (token, user_id, datetime.utcnow().isoformat(), expires)) |
| db.commit() |
| return token |
|
|
| def get_user_by_token(token: str): |
| with get_db() as db: |
| row = db.execute(''' |
| SELECT u.* FROM users u |
| JOIN auth_tokens t ON t.user_id = u.id |
| WHERE t.token = ? AND t.expires_at > ? |
| ''', (token, datetime.utcnow().isoformat())).fetchone() |
| return dict(row) if row else None |
|
|
| def require_auth(f): |
| @wraps(f) |
| def decorated(*args, **kwargs): |
| token = request.cookies.get('tds_token') or request.headers.get('X-Auth-Token') |
| if not token: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| user = get_user_by_token(token) |
| if not user: |
| return jsonify({'error': 'Unauthorized'}), 401 |
| request.current_user = user |
| return f(*args, **kwargs) |
| return decorated |
|
|
| |
| transfer_sessions = {} |
| EXPIRY_SECONDS = 300 |
|
|
| def cleanup_expired(): |
| while True: |
| time.sleep(30) |
| now = datetime.utcnow() |
| expired = [sid for sid, s in list(transfer_sessions.items()) if now > s['expires_at']] |
| for sid in expired: |
| s = transfer_sessions.pop(sid, {}) |
| for f in s.get('files', []): |
| try: os.remove(f['path']) |
| except: pass |
| socketio.emit('session_expired', {'session_id': sid}, room=sid) |
|
|
| threading.Thread(target=cleanup_expired, daemon=True).start() |
|
|
| def generate_qr(data: str) -> str: |
| qr = qrcode.QRCode(version=1, box_size=8, border=2, |
| error_correction=qrcode.constants.ERROR_CORRECT_H) |
| qr.add_data(data) |
| qr.make(fit=True) |
| img = qr.make_image(fill_color="white", back_color="black") |
| buf = io.BytesIO() |
| img.save(buf, format='PNG') |
| return base64.b64encode(buf.getvalue()).decode() |
|
|
| |
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/api/auth/register', methods=['POST']) |
| def register(): |
| data = request.json or {} |
| name = data.get('name', '').strip() |
| birth = data.get('birth', '').strip() |
| age = data.get('age', 0) |
| password = data.get('password', '').strip() |
| usb_key_id = data.get('usb_key_id', '').strip() |
|
|
| if not all([name, birth, password]): |
| return jsonify({'error': 'Missing required fields'}), 400 |
|
|
| cp = generate_cp() |
| user_id = str(uuid.uuid4()) |
| pw_hash = hash_password(password) |
|
|
| usb_key_hash = None |
| if usb_key_id: |
| usb_key_hash = hashlib.sha256(usb_key_id.encode()).hexdigest() |
|
|
| try: |
| with get_db() as db: |
| db.execute(''' |
| INSERT INTO users (id, cp, name, birth, age, password_hash, usb_key_hash, usb_key_id, created_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) |
| ''', (user_id, cp, name, birth, age, pw_hash, usb_key_hash, usb_key_id, datetime.utcnow().isoformat())) |
| db.commit() |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| token = create_token(user_id) |
| resp = jsonify({'success': True, 'cp': cp, 'name': name, 'token': token}) |
| resp.set_cookie('tds_token', token, httponly=True, max_age=30*24*3600, samesite='Lax') |
| return resp |
|
|
| @app.route('/api/auth/login', methods=['POST']) |
| def login(): |
| data = request.json or {} |
| identifier = data.get('identifier', '').strip() |
| password = data.get('password', '').strip() |
| usb_key_id = data.get('usb_key_id', '').strip() |
|
|
| with get_db() as db: |
| user = None |
|
|
| |
| if usb_key_id and not password: |
| usb_hash = hashlib.sha256(usb_key_id.encode()).hexdigest() |
| user = db.execute("SELECT * FROM users WHERE usb_key_hash = ?", (usb_hash,)).fetchone() |
| if not user: |
| return jsonify({'error': 'USB key not recognized'}), 401 |
|
|
| |
| elif identifier and password: |
| pw_hash = hash_password(password) |
| user = db.execute( |
| "SELECT * FROM users WHERE (cp = ? OR name = ?) AND password_hash = ?", |
| (identifier, identifier, pw_hash) |
| ).fetchone() |
| if not user: |
| return jsonify({'error': 'Invalid credentials'}), 401 |
|
|
| else: |
| return jsonify({'error': 'Provide credentials or USB key'}), 400 |
|
|
| user = dict(user) |
| token = create_token(user['id']) |
| resp = jsonify({ |
| 'success': True, |
| 'cp': user['cp'], |
| 'name': user['name'], |
| 'is_admin': bool(user['is_admin']), |
| 'token': token |
| }) |
| resp.set_cookie('tds_token', token, httponly=True, max_age=30*24*3600, samesite='Lax') |
| return resp |
|
|
| @app.route('/api/auth/logout', methods=['POST']) |
| def logout(): |
| token = request.cookies.get('tds_token') |
| if token: |
| with get_db() as db: |
| db.execute("DELETE FROM auth_tokens WHERE token = ?", (token,)) |
| db.commit() |
| resp = jsonify({'success': True}) |
| resp.delete_cookie('tds_token') |
| return resp |
|
|
| @app.route('/api/auth/me') |
| def me(): |
| token = request.cookies.get('tds_token') |
| if not token: |
| return jsonify({'logged_in': False}) |
| user = get_user_by_token(token) |
| if not user: |
| return jsonify({'logged_in': False}) |
| return jsonify({ |
| 'logged_in': True, |
| 'cp': user['cp'], |
| 'name': user['name'], |
| 'is_admin': bool(user['is_admin']) |
| }) |
|
|
| @app.route('/api/auth/link-usb', methods=['POST']) |
| @require_auth |
| def link_usb(): |
| data = request.json or {} |
| usb_key_id = data.get('usb_key_id', '').strip() |
| if not usb_key_id: |
| return jsonify({'error': 'No USB key ID provided'}), 400 |
| usb_hash = hashlib.sha256(usb_key_id.encode()).hexdigest() |
| with get_db() as db: |
| db.execute("UPDATE users SET usb_key_hash = ?, usb_key_id = ? WHERE id = ?", |
| (usb_hash, usb_key_id, request.current_user['id'])) |
| db.commit() |
| return jsonify({'success': True}) |
|
|
| |
| @app.route('/api/session/create', methods=['POST']) |
| @require_auth |
| def create_session(): |
| data = request.json or {} |
| session_id = str(uuid.uuid4()) |
| user = request.current_user |
|
|
| transfer_sessions[session_id] = { |
| 'id': session_id, |
| 'mode': data.get('mode', 'send'), |
| 'user': {'name': user['name'], 'cp': user['cp']}, |
| 'files': [], |
| 'created_at': datetime.utcnow(), |
| 'expires_at': datetime.utcnow() + timedelta(seconds=EXPIRY_SECONDS), |
| 'status': 'waiting', |
| 'connected_peers': [] |
| } |
|
|
| app_url = os.environ.get('APP_URL', request.host_url.rstrip('/')) |
| qr_url = f"{app_url}/session/{session_id}" |
| qr_img = generate_qr(qr_url) |
|
|
| return jsonify({'session_id': session_id, 'qr_image': qr_img, |
| 'qr_url': qr_url, 'expires_in': EXPIRY_SECONDS}) |
|
|
| @app.route('/api/session/<session_id>', methods=['GET']) |
| @require_auth |
| def get_session(session_id): |
| s = transfer_sessions.get(session_id) |
| if not s: |
| return jsonify({'error': 'Session not found or expired'}), 404 |
| remaining = max(0, int((s['expires_at'] - datetime.utcnow()).total_seconds())) |
| return jsonify({ |
| 'session_id': session_id, 'mode': s['mode'], 'user': s['user'], |
| 'status': s['status'], |
| 'files': [{'id': f['id'], 'name': f['name'], 'size': f['size'], 'type': f['type']} for f in s['files']], |
| 'expires_in': remaining |
| }) |
|
|
| @app.route('/api/session/<session_id>/upload', methods=['POST']) |
| @require_auth |
| def upload_file(session_id): |
| s = transfer_sessions.get(session_id) |
| if not s: |
| return jsonify({'error': 'Session expired'}), 404 |
| uploaded = [] |
| for file in request.files.getlist('files'): |
| filename = secure_filename(file.filename) |
| file_id = str(uuid.uuid4()) |
| path = os.path.join(app.config['UPLOAD_FOLDER'], f"{file_id}_{filename}") |
| file.save(path) |
| size = os.path.getsize(path) |
| entry = {'id': file_id, 'name': filename, 'size': size, 'type': file.content_type, 'path': path} |
| s['files'].append(entry) |
| uploaded.append({'id': file_id, 'name': filename, 'size': size, 'type': file.content_type}) |
| s['status'] = 'ready' |
| socketio.emit('files_ready', {'session_id': session_id, 'files': uploaded, 'user': s['user']}, room=session_id) |
| return jsonify({'uploaded': len(uploaded), 'files': uploaded}) |
|
|
| @app.route('/api/session/<session_id>/download/<file_id>') |
| @require_auth |
| def download_file(session_id, file_id): |
| s = transfer_sessions.get(session_id) |
| if not s: |
| return jsonify({'error': 'Session expired'}), 404 |
| for f in s['files']: |
| if f['id'] == file_id: |
| return send_file(f['path'], as_attachment=True, download_name=f['name']) |
| return jsonify({'error': 'File not found'}), 404 |
|
|
| @app.route('/session/<session_id>') |
| def session_page(session_id): |
| return render_template('index.html', prefill_session=session_id) |
|
|
| @socketio.on('join_session') |
| def on_join(data): |
| session_id = data.get('session_id') |
| if session_id in transfer_sessions: |
| join_room(session_id) |
| s = transfer_sessions[session_id] |
| s['connected_peers'].append(request.sid) |
| emit('peer_joined', {'session_id': session_id, 'status': s['status']}, room=session_id) |
|
|
| @socketio.on('ping_session') |
| def on_ping(data): |
| session_id = data.get('session_id') |
| if session_id in transfer_sessions: |
| s = transfer_sessions[session_id] |
| remaining = max(0, int((s['expires_at'] - datetime.utcnow()).total_seconds())) |
| emit('session_tick', {'expires_in': remaining, 'status': s['status']}) |
|
|
| if __name__ == '__main__': |
| init_db() |
| port = int(os.environ.get('PORT', 7860)) |
| socketio.run(app, host='0.0.0.0', port=port, debug=False, allow_unsafe_werkzeug=True) |
|
|