|
from flask import Flask, request, jsonify, send_file |
|
from flask_cors import CORS |
|
from werkzeug.utils import secure_filename |
|
import tempfile |
|
import uuid |
|
import os |
|
import io |
|
import base64 |
|
import time |
|
import json |
|
import hashlib |
|
import qrcode |
|
from PIL import Image |
|
import requests |
|
import random |
|
import string |
|
from flask_socketio import SocketIO, emit, join_room, leave_room |
|
import threading |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
|
|
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading', logger=False, engineio_logger=False) |
|
|
|
|
|
CHAT_ROOMS = {} |
|
CHAT_MESSAGES = {} |
|
|
|
|
|
SECRETS = {} |
|
SHORT_LINKS = {} |
|
ANALYTICS = {} |
|
|
|
|
|
MAX_FILE_SIZE = 5 * 1024 * 1024 |
|
ALLOWED_EXTENSIONS = { |
|
'image': ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg'], |
|
'video': ['mp4', 'avi', 'mov', 'wmv', 'flv', 'webm'], |
|
'audio': ['mp3', 'wav', 'ogg', 'aac', 'm4a'], |
|
'document': ['pdf', 'txt', 'doc', 'docx', 'rtf', 'odt'] |
|
} |
|
|
|
def get_file_type(filename): |
|
"""Determine file type based on extension""" |
|
if not filename: |
|
return 'unknown' |
|
|
|
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' |
|
|
|
for file_type, extensions in ALLOWED_EXTENSIONS.items(): |
|
if ext in extensions: |
|
return file_type |
|
|
|
return 'unknown' |
|
|
|
def generate_short_id(): |
|
"""Generate a short, unique ID""" |
|
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) |
|
|
|
def get_client_ip(request): |
|
"""Get client IP address""" |
|
if request.headers.get('X-Forwarded-For'): |
|
return request.headers.get('X-Forwarded-For').split(',')[0].strip() |
|
elif request.headers.get('X-Real-IP'): |
|
return request.headers.get('X-Real-IP') |
|
else: |
|
return request.remote_addr |
|
|
|
def get_location_info(ip): |
|
"""Get location information from IP (mock implementation)""" |
|
|
|
try: |
|
|
|
if ip == '127.0.0.1' or ip.startswith('192.168.'): |
|
return { |
|
'country': 'Local', |
|
'city': 'Local', |
|
'region': 'Local', |
|
'timezone': 'Local' |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
'country': 'Unknown', |
|
'city': 'Unknown', |
|
'region': 'Unknown', |
|
'timezone': 'Unknown' |
|
} |
|
except: |
|
return { |
|
'country': 'Unknown', |
|
'city': 'Unknown', |
|
'region': 'Unknown', |
|
'timezone': 'Unknown' |
|
} |
|
|
|
def generate_qr_code(data): |
|
"""Generate QR code for the given data""" |
|
qr = qrcode.QRCode( |
|
version=1, |
|
error_correction=qrcode.constants.ERROR_CORRECT_L, |
|
box_size=10, |
|
border=4, |
|
) |
|
qr.add_data(data) |
|
qr.make(fit=True) |
|
|
|
img = qr.make_image(fill_color="black", back_color="white") |
|
|
|
|
|
buffer = io.BytesIO() |
|
img.save(buffer, format='PNG') |
|
img_str = base64.b64encode(buffer.getvalue()).decode() |
|
|
|
return f"data:image/png;base64,{img_str}" |
|
|
|
def record_access(secret_id, request): |
|
"""Record access analytics""" |
|
ip = get_client_ip(request) |
|
location = get_location_info(ip) |
|
user_agent = request.headers.get('User-Agent', '') |
|
|
|
|
|
device_type = 'desktop' |
|
if any(mobile in user_agent.lower() for mobile in ['mobile', 'android', 'iphone', 'ipad']): |
|
device_type = 'mobile' |
|
|
|
analytics_entry = { |
|
'timestamp': time.time(), |
|
'ip': ip, |
|
'location': location, |
|
'user_agent': user_agent, |
|
'device_type': device_type, |
|
'referer': request.headers.get('Referer', ''), |
|
'accept_language': request.headers.get('Accept-Language', '') |
|
} |
|
|
|
if secret_id not in ANALYTICS: |
|
ANALYTICS[secret_id] = [] |
|
|
|
ANALYTICS[secret_id].append(analytics_entry) |
|
|
|
return analytics_entry |
|
|
|
@app.route("/") |
|
def index(): |
|
"""Health check endpoint""" |
|
return jsonify({ |
|
"status": "running", |
|
"service": "Sharelock Backend", |
|
"version": "2.0.0", |
|
"features": [ |
|
"End-to-end encryption", |
|
"File uploads (5MB max)", |
|
"QR code generation", |
|
"Analytics tracking", |
|
"Short URLs", |
|
"Self-destruct messages", |
|
"Real-time chat rooms" |
|
] |
|
}) |
|
|
|
@app.route("/api/store", methods=["POST"]) |
|
def store(): |
|
"""Store encrypted secret with enhanced features""" |
|
try: |
|
form = request.form |
|
data = form.get("data") |
|
|
|
if not data: |
|
return jsonify({"error": "Data is required"}), 400 |
|
|
|
|
|
ttl = int(form.get("ttl", 300)) |
|
view_once = form.get("view_once", "false").lower() == "true" |
|
delay_seconds = int(form.get("delay_seconds", 0)) |
|
theme = form.get("theme", "default") |
|
password_hint = form.get("password_hint", "") |
|
|
|
|
|
file_data = None |
|
file_type = None |
|
file_name = None |
|
|
|
if 'file' in request.files: |
|
file = request.files['file'] |
|
if file and file.filename: |
|
|
|
file.seek(0, os.SEEK_END) |
|
file_size = file.tell() |
|
file.seek(0) |
|
|
|
if file_size > MAX_FILE_SIZE: |
|
return jsonify({"error": f"File too large. Max size: {MAX_FILE_SIZE/1024/1024:.1f}MB"}), 400 |
|
|
|
|
|
file_name = secure_filename(file.filename) |
|
file_type = get_file_type(file_name) |
|
|
|
if file_type == 'unknown': |
|
return jsonify({"error": "File type not supported"}), 400 |
|
|
|
|
|
file_content = file.read() |
|
file_data = base64.b64encode(file_content).decode('utf-8') |
|
|
|
|
|
secret_id = str(uuid.uuid4()) |
|
short_id = generate_short_id() |
|
|
|
|
|
while short_id in SHORT_LINKS: |
|
short_id = generate_short_id() |
|
|
|
|
|
SECRETS[secret_id] = { |
|
"data": data, |
|
"file_data": file_data, |
|
"file_type": file_type, |
|
"file_name": file_name, |
|
"expire_at": time.time() + ttl, |
|
"view_once": view_once, |
|
"delay_seconds": delay_seconds, |
|
"theme": theme, |
|
"password_hint": password_hint, |
|
"created_at": time.time(), |
|
"creator_ip": get_client_ip(request), |
|
"access_count": 0 |
|
} |
|
|
|
|
|
SHORT_LINKS[short_id] = secret_id |
|
|
|
|
|
base_url = request.host_url.rstrip('/') |
|
secret_url = f"{base_url}/tools/sharelock?id={secret_id}" |
|
qr_code = generate_qr_code(secret_url) |
|
|
|
return jsonify({ |
|
"id": secret_id, |
|
"short_id": short_id, |
|
"short_url": f"{base_url}/s/{short_id}", |
|
"qr_code": qr_code, |
|
"expires_at": SECRETS[secret_id]["expire_at"], |
|
"has_file": file_data is not None |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/fetch/<secret_id>") |
|
def fetch(secret_id): |
|
"""Fetch and decrypt secret with analytics - MODIFIED TO HANDLE verify_only""" |
|
try: |
|
|
|
if secret_id in SHORT_LINKS: |
|
secret_id = SHORT_LINKS[secret_id] |
|
|
|
secret = SECRETS.get(secret_id) |
|
if not secret: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
|
|
if time.time() > secret["expire_at"]: |
|
|
|
if secret_id in SECRETS: |
|
del SECRETS[secret_id] |
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
return jsonify({"error": "Secret has expired"}), 410 |
|
|
|
|
|
verify_only = request.args.get('verify_only', 'false').lower() == 'true' |
|
|
|
|
|
if not verify_only: |
|
|
|
analytics_entry = record_access(secret_id, request) |
|
|
|
|
|
secret["access_count"] += 1 |
|
|
|
|
|
response = { |
|
"data": secret["data"], |
|
"theme": secret.get("theme", "default"), |
|
"delay_seconds": secret.get("delay_seconds", 0), |
|
"password_hint": secret.get("password_hint", ""), |
|
"access_count": secret["access_count"] |
|
} |
|
|
|
|
|
if secret.get("file_data"): |
|
response["file_data"] = secret["file_data"] |
|
response["file_type"] = secret.get("file_type", "unknown") |
|
response["file_name"] = secret.get("file_name", "unknown") |
|
|
|
|
|
if secret["view_once"] and not verify_only: |
|
|
|
del SECRETS[secret_id] |
|
|
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
break |
|
|
|
return jsonify(response) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/analytics/<secret_id>") |
|
def get_analytics(secret_id): |
|
"""Get analytics for a specific secret - MODIFIED TO HANDLE verify_only""" |
|
try: |
|
|
|
verify_only = request.args.get('verify_only', 'false').lower() == 'true' |
|
|
|
|
|
if secret_id not in SECRETS and secret_id not in ANALYTICS: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
|
|
if not verify_only: |
|
|
|
record_access(secret_id, request) |
|
|
|
analytics_data = ANALYTICS.get(secret_id, []) |
|
|
|
|
|
formatted_analytics = [] |
|
for entry in analytics_data: |
|
formatted_analytics.append({ |
|
"timestamp": entry["timestamp"], |
|
"datetime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(entry["timestamp"])), |
|
"ip": entry["ip"], |
|
"location": entry["location"], |
|
"device_type": entry["device_type"], |
|
"user_agent": entry["user_agent"][:100] + "..." if len(entry["user_agent"]) > 100 else entry["user_agent"] |
|
}) |
|
|
|
return jsonify({ |
|
"secret_id": secret_id, |
|
"total_accesses": len(formatted_analytics), |
|
"analytics": formatted_analytics |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/secrets") |
|
def list_secrets(): |
|
"""List all active secrets (for dashboard)""" |
|
try: |
|
current_time = time.time() |
|
active_secrets = [] |
|
|
|
for secret_id, secret in SECRETS.items(): |
|
if current_time <= secret["expire_at"]: |
|
|
|
short_id = None |
|
for s_id, full_id in SHORT_LINKS.items(): |
|
if full_id == secret_id: |
|
short_id = s_id |
|
break |
|
|
|
active_secrets.append({ |
|
"id": secret_id, |
|
"short_id": short_id, |
|
"created_at": secret["created_at"], |
|
"expires_at": secret["expire_at"], |
|
"view_once": secret["view_once"], |
|
"has_file": secret.get("file_data") is not None, |
|
"file_type": secret.get("file_type"), |
|
"theme": secret.get("theme", "default"), |
|
"access_count": secret.get("access_count", 0), |
|
"preview": secret["data"][:100] + "..." if len(secret["data"]) > 100 else secret["data"] |
|
}) |
|
|
|
return jsonify({ |
|
"secrets": active_secrets, |
|
"total": len(active_secrets) |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/delete/<secret_id>", methods=["DELETE"]) |
|
def delete_secret(secret_id): |
|
"""Manually delete a secret - MODIFIED TO HANDLE verify_only""" |
|
try: |
|
|
|
verify_only = request.args.get('verify_only', 'false').lower() == 'true' |
|
|
|
if secret_id not in SECRETS: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
|
|
if not verify_only: |
|
|
|
record_access(secret_id, request) |
|
|
|
|
|
del SECRETS[secret_id] |
|
|
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
break |
|
|
|
return jsonify({"message": "Secret deleted successfully"}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/s/<short_id>") |
|
def redirect_short_link(short_id): |
|
"""Redirect short link to full URL""" |
|
if short_id not in SHORT_LINKS: |
|
return jsonify({"error": "Short link not found"}), 404 |
|
|
|
secret_id = SHORT_LINKS[short_id] |
|
base_url = request.host_url.rstrip('/') |
|
return f""" |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Sharelock - Redirecting...</title> |
|
<meta http-equiv="refresh" content="0;url={base_url}/tools/sharelock?id={secret_id}"> |
|
</head> |
|
<body> |
|
<p>Redirecting to secure message...</p> |
|
<p>If you are not redirected automatically, <a href="{base_url}/tools/sharelock?id={secret_id}">click here</a>.</p> |
|
</body> |
|
</html> |
|
""" |
|
|
|
@app.route("/api/qr/<secret_id>") |
|
def get_qr_code(secret_id): |
|
"""Generate QR code for a secret""" |
|
try: |
|
if secret_id not in SECRETS: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
base_url = request.host_url.rstrip('/') |
|
secret_url = f"{base_url}/tools/sharelock?id={secret_id}" |
|
qr_code = generate_qr_code(secret_url) |
|
|
|
return jsonify({"qr_code": qr_code}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/stats") |
|
def get_stats(): |
|
"""Get overall statistics""" |
|
try: |
|
total_secrets = len(SECRETS) |
|
total_accesses = sum(len(analytics) for analytics in ANALYTICS.values()) |
|
|
|
|
|
file_types = {} |
|
for secret in SECRETS.values(): |
|
file_type = secret.get("file_type", "text") |
|
file_types[file_type] = file_types.get(file_type, 0) + 1 |
|
|
|
|
|
themes = {} |
|
for secret in SECRETS.values(): |
|
theme = secret.get("theme", "default") |
|
themes[theme] = themes.get(theme, 0) + 1 |
|
|
|
return jsonify({ |
|
"total_secrets": total_secrets, |
|
"total_accesses": total_accesses, |
|
"file_types": file_types, |
|
"themes": themes, |
|
"active_short_links": len(SHORT_LINKS) |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/cleanup", methods=["POST"]) |
|
def cleanup_expired(): |
|
"""Clean up expired secrets""" |
|
try: |
|
current_time = time.time() |
|
expired_count = 0 |
|
|
|
|
|
expired_secrets = [] |
|
for secret_id, secret in SECRETS.items(): |
|
if current_time > secret["expire_at"]: |
|
expired_secrets.append(secret_id) |
|
|
|
|
|
for secret_id in expired_secrets: |
|
del SECRETS[secret_id] |
|
expired_count += 1 |
|
|
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
break |
|
|
|
return jsonify({ |
|
"message": f"Cleaned up {expired_count} expired secrets", |
|
"expired_count": expired_count |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
|
|
@app.route("/api/chat/create", methods=["POST"]) |
|
def create_chat_room(): |
|
try: |
|
form = request.form |
|
ttl = int(form.get("ttl", 3600)) |
|
max_receivers = int(form.get("max_receivers", 5)) |
|
password = form.get("password", "") |
|
allow_files = form.get("allow_files", "true").lower() == "true" |
|
|
|
room_id = str(uuid.uuid4()) |
|
admin_session = str(uuid.uuid4()) |
|
|
|
CHAT_ROOMS[room_id] = { |
|
"admin_session": admin_session, |
|
"created_at": time.time(), |
|
"expires_at": time.time() + ttl, |
|
"settings": { |
|
"max_receivers": max_receivers, |
|
"password": password, |
|
"allow_files": allow_files, |
|
"burn_on_admin_exit": True |
|
}, |
|
"active_sessions": {}, |
|
"receiver_counter": 0 |
|
} |
|
|
|
CHAT_MESSAGES[room_id] = [] |
|
|
|
|
|
return jsonify({ |
|
"room_id": room_id, |
|
"admin_session": admin_session, |
|
"expires_at": CHAT_ROOMS[room_id]["expires_at"] |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/chat/join/<room_id>") |
|
def join_chat_room(room_id): |
|
try: |
|
password = request.args.get("password", "") |
|
admin_session = request.args.get("admin", "") |
|
|
|
if room_id not in CHAT_ROOMS: |
|
return jsonify({"error": "Chat room not found"}), 404 |
|
|
|
room = CHAT_ROOMS[room_id] |
|
|
|
|
|
if time.time() > room["expires_at"]: |
|
return jsonify({"error": "Chat room has expired"}), 410 |
|
|
|
|
|
if room["settings"]["password"] and password != room["settings"]["password"]: |
|
return jsonify({"error": "Wrong password"}), 403 |
|
|
|
|
|
if admin_session == room["admin_session"]: |
|
role = "admin" |
|
session_id = admin_session |
|
else: |
|
|
|
active_receivers = sum(1 for s in room["active_sessions"].values() if s["role"] == "receiver") |
|
if active_receivers >= room["settings"]["max_receivers"]: |
|
return jsonify({"error": "Chat room is full"}), 403 |
|
|
|
role = "receiver" |
|
session_id = str(uuid.uuid4()) |
|
room["receiver_counter"] += 1 |
|
|
|
return jsonify({ |
|
"session_id": session_id, |
|
"role": role, |
|
"receiver_number": room["receiver_counter"] if role == "receiver" else None, |
|
"room_settings": room["settings"], |
|
"expires_at": room["expires_at"] |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@socketio.on('join_chat') |
|
def handle_join_chat(data): |
|
room_id = data['room_id'] |
|
session_id = data['session_id'] |
|
role = data['role'] |
|
|
|
if room_id not in CHAT_ROOMS: |
|
return |
|
|
|
join_room(room_id) |
|
|
|
|
|
CHAT_ROOMS[room_id]["active_sessions"][session_id] = { |
|
"role": role, |
|
"receiver_number": data.get('receiver_number'), |
|
"joined_at": time.time(), |
|
"last_seen": time.time() |
|
} |
|
|
|
|
|
emit('user_joined', { |
|
'role': role, |
|
'receiver_number': data.get('receiver_number'), |
|
'active_count': len(CHAT_ROOMS[room_id]["active_sessions"]) |
|
}, room=room_id, include_self=False) |
|
|
|
@socketio.on('send_chat_message') |
|
def handle_chat_message(data): |
|
room_id = data['room_id'] |
|
session_id = data['session_id'] |
|
|
|
if room_id not in CHAT_ROOMS or session_id not in CHAT_ROOMS[room_id]["active_sessions"]: |
|
return |
|
|
|
session = CHAT_ROOMS[room_id]["active_sessions"][session_id] |
|
|
|
message = { |
|
"id": str(uuid.uuid4()), |
|
"sender_role": session["role"], |
|
"sender_number": session.get("receiver_number"), |
|
"content": data['content'], |
|
"timestamp": time.time(), |
|
"type": "text" |
|
} |
|
|
|
CHAT_MESSAGES[room_id].append(message) |
|
|
|
|
|
emit('new_message', message, room=room_id) |
|
|
|
@socketio.on('leave_chat') |
|
def handle_leave_chat(data): |
|
room_id = data['room_id'] |
|
session_id = data['session_id'] |
|
|
|
if room_id in CHAT_ROOMS and session_id in CHAT_ROOMS[room_id]["active_sessions"]: |
|
session = CHAT_ROOMS[room_id]["active_sessions"][session_id] |
|
del CHAT_ROOMS[room_id]["active_sessions"][session_id] |
|
|
|
|
|
if session["role"] == "admin" and CHAT_ROOMS[room_id]["settings"]["burn_on_admin_exit"]: |
|
emit('room_closing', {'reason': 'Admin left the room'}, room=room_id) |
|
del CHAT_ROOMS[room_id] |
|
if room_id in CHAT_MESSAGES: |
|
del CHAT_MESSAGES[room_id] |
|
else: |
|
emit('user_left', { |
|
'role': session["role"], |
|
'receiver_number': session.get("receiver_number"), |
|
'active_count': len(CHAT_ROOMS[room_id]["active_sessions"]) |
|
}, room=room_id) |
|
|
|
leave_room(room_id) |
|
|
|
|
|
@app.errorhandler(404) |
|
def not_found(error): |
|
return jsonify({"error": "Endpoint not found"}), 404 |
|
|
|
@app.errorhandler(500) |
|
def internal_error(error): |
|
return jsonify({"error": "Internal server error"}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
print("π Sharelock Backend Starting...") |
|
print("π Features enabled:") |
|
print(" β
End-to-end encryption") |
|
print(" β
File uploads (5MB max)") |
|
print(" β
QR code generation") |
|
print(" β
Analytics tracking") |
|
print(" β
Short URLs") |
|
print(" β
Self-destruct messages") |
|
print(" β
Multiple themes") |
|
print(" β
Password hints") |
|
print(" β
verify_only parameter support") |
|
print(" β
Real-time chat rooms") |
|
print("π Server running on http://0.0.0.0:7860") |
|
|
|
|
|
socketio.run(app, host="0.0.0.0", port=7860, debug=False, allow_unsafe_werkzeug=True) |