browser / app.py
abc1181's picture
Update app.py
8a8c3ce verified
import os, json, time, base64, uuid, threading, queue, io
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
import jupyter_client
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}})
VALID_API_KEY = os.getenv("SANDBOX_API_KEY", "")
def check_auth():
if not VALID_API_KEY:
return True
key = (
request.headers.get("X-API-KEY", "") or
request.headers.get("Authorization", "").replace("Bearer ", "")
)
return key == VALID_API_KEY
sessions = {}
sessions_lock = threading.Lock()
def get_or_create_kernel(session_id="default"):
with sessions_lock:
if session_id not in sessions:
km = jupyter_client.KernelManager(kernel_name='python3')
km.start_kernel()
kc = km.client()
kc.start_channels()
kc.wait_for_ready(timeout=30)
sessions[session_id] = {
"km": km, "kc": kc,
"created_at": time.time(),
"last_used": time.time()
}
else:
sessions[session_id]["last_used"] = time.time()
return sessions[session_id]["kc"]
def kill_kernel(session_id):
with sessions_lock:
if session_id in sessions:
try:
sessions[session_id]["kc"].stop_channels()
sessions[session_id]["km"].shutdown_kernel(now=True)
except:
pass
del sessions[session_id]
def execute_code(kc, code, timeout=60):
msg_id = kc.execute(code)
stdout = []
stderr = []
result = []
artifacts = []
error = None
deadline = time.time() + timeout
while time.time() < deadline:
try:
msg = kc.get_iopub_msg(timeout=1)
msg_type = msg['msg_type']
content = msg['content']
if msg_type == 'stream':
if content['name'] == 'stdout': stdout.append(content['text'])
else: stderr.append(content['text'])
elif msg_type == 'execute_result':
result.append(content['data'].get('text/plain', ''))
if 'text/html' in content['data']:
result.append(content['data']['text/html'])
elif msg_type == 'display_data':
data = content.get('data', {})
if 'image/png' in data:
artifacts.append({
"type": "image/png",
"data": data['image/png'],
"filename": f"artifact_{len(artifacts)+1}.png"
})
if 'image/jpeg' in data:
artifacts.append({
"type": "image/jpeg",
"data": data['image/jpeg'],
"filename": f"artifact_{len(artifacts)+1}.jpg"
})
if 'text/plain' in data:
result.append(data['text/plain'])
elif msg_type == 'error':
error = {
"ename": content.get('ename', 'Error'),
"evalue": content.get('evalue', ''),
"traceback": "\n".join(
line.encode('ascii', errors='ignore').decode()
for line in content.get('traceback', [])
)
}
elif msg_type == 'status' and content.get('execution_state') == 'idle':
break
except queue.Empty:
continue
except Exception:
break
return {
"stdout": "".join(stdout),
"stderr": "".join(stderr),
"result": "\n".join(result),
"artifacts": artifacts,
"error": error
}
# ── Cleanup idle kernels ──────────────────────────────────────────────────────
def cleanup_loop():
while True:
time.sleep(1800)
now = time.time()
to_kill = []
with sessions_lock:
for sid, data in sessions.items():
if now - data["last_used"] > 3600:
to_kill.append(sid)
for sid in to_kill:
kill_kernel(sid)
threading.Thread(target=cleanup_loop, daemon=True).start()
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.route('/')
def status():
return jsonify({
"status": "online",
"active_sessions": len(sessions),
"message": "Obsidian Jupyter Kernel Sandbox"
})
@app.route('/execute', methods=['POST'])
def execute():
if not check_auth():
return jsonify({"output": "Unauthorized"}), 401
data = request.json or {}
code = data.get("code", "")
session_id = data.get("session_id", "default")
timeout = data.get("timeout", 60)
if not code:
return jsonify({"output": "No code provided"}), 400
try:
kc = get_or_create_kernel(session_id)
result = execute_code(kc, code, timeout=timeout)
parts = []
if result["stdout"]: parts.append(result["stdout"])
if result["result"]: parts.append(result["result"])
if result["error"]:
parts.append(
f"Error: {result['error']['ename']}: {result['error']['evalue']}\n"
f"{result['error']['traceback']}"
)
elif result["stderr"] and not result["error"]:
parts.append(result["stderr"])
return jsonify({
"output": "\n".join(parts) or "Executed successfully (no output)",
"artifacts": result["artifacts"],
"error": result["error"]
})
except Exception as e:
return jsonify({"output": f"Kernel Error: {str(e)}"}), 500
@app.route('/install', methods=['POST'])
def install():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
data = request.json or {}
package = data.get("package", "")
session_id = data.get("session_id", "default")
if not package:
return jsonify({"error": "No package specified"}), 400
kc = get_or_create_kernel(session_id)
result = execute_code(
kc,
f"import subprocess; subprocess.run(['pip', 'install', '{package}', '-q'])",
timeout=120
)
return jsonify({"message": f"{package} installed", "output": result["stdout"]})
@app.route('/sessions', methods=['GET'])
def list_sessions():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
with sessions_lock:
return jsonify({
"sessions": [
{"id": sid, "created_at": d["created_at"], "last_used": d["last_used"]}
for sid, d in sessions.items()
]
})
@app.route('/sessions/<session_id>', methods=['DELETE'])
def delete_session(session_id):
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
kill_kernel(session_id)
return jsonify({"message": f"Session {session_id} terminated"})
@app.route('/sessions', methods=['POST'])
def create_session():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
session_id = str(uuid.uuid4())
get_or_create_kernel(session_id)
return jsonify({"session_id": session_id})
@app.route('/upload', methods=['POST'])
def upload_file():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
data = request.json or {}
filename = data.get("filename", "uploaded_file")
file_b64 = data.get("base64", "")
session_id = data.get("session_id", "default")
file_bytes = base64.b64decode(file_b64)
kc = get_or_create_kernel(session_id)
encoded = base64.b64encode(file_bytes).decode()
code = f"""
import base64
with open('/tmp/{filename}', 'wb') as f:
f.write(base64.b64decode('{encoded}'))
print('File written to /tmp/{filename}')
"""
result = execute_code(kc, code)
return jsonify({"output": result["stdout"], "path": f"/tmp/{filename}"})
# 1. Terminal command execution
@app.route('/terminal', methods=['POST'])
def terminal():
if not check_auth(): return jsonify({"error": "Unauthorized"}), 401
data = request.json or {}
cmd = data.get("command", "")
session_id = data.get("session_id", "default")
if not cmd:
return jsonify({"error": "No command"}), 400
safe_code = f"""
import subprocess, shlex
result = subprocess.run(
shlex.split({repr(cmd)}),
capture_output=True, text=True, timeout=30, cwd='/tmp'
)
print(result.stdout)
if result.stderr: print("STDERR:", result.stderr)
print("Exit code:", result.returncode)
"""
kc = get_or_create_kernel(session_id)
return jsonify(execute_code(kc, safe_code))
# 2. File download — return file from /tmp/ as base64
@app.route('/download', methods=['POST'])
def download_file():
if not check_auth(): return jsonify({"error": "Unauthorized"}), 401
data = request.json or {}
filename = data.get("filename", "")
session_id = data.get("session_id", "default")
code = f"""
import base64, os
path = '/tmp/{filename}'
if os.path.exists(path):
with open(path, 'rb') as f:
b64 = base64.b64encode(f.read()).decode()
print(f"FILE_B64_START:{{b64}}:FILE_B64_END")
print(f"Size: {{os.path.getsize(path)}} bytes")
else:
print("File not found:", path)
"""
kc = get_or_create_kernel(session_id)
result = execute_code(kc, code)
# Extract b64 from output
import re
m = re.search(r'FILE_B64_START:(.+?):FILE_B64_END', result.get("stdout",""))
if m:
return jsonify({"filename": filename, "base64": m.group(1), "found": True})
return jsonify({"found": False, "output": result.get("stdout","")})
# 3. List /tmp files
@app.route('/files', methods=['GET', 'POST'])
def list_files():
if not check_auth(): return jsonify({"error": "Unauthorized"}), 401
data = (request.json or {}) if request.method == 'POST' else {}
session_id = data.get("session_id", "default")
code = """
import os, json
files = []
for f in os.listdir('/tmp'):
path = f'/tmp/{f}'
if os.path.isfile(path):
files.append({"name": f, "size": os.path.getsize(path),
"modified": os.path.getmtime(path)})
print(json.dumps(sorted(files, key=lambda x: x['modified'], reverse=True)))
"""
kc = get_or_create_kernel(session_id)
result = execute_code(kc, code)
try:
return jsonify({"files": __import__('json').loads(result.get("stdout","[]"))})
except Exception:
return jsonify({"files": [], "raw": result.get("stdout","")})
# 4. Audio output endpoint
@app.route('/audio', methods=['POST'])
def generate_audio():
"""Generate audio via TTS or synthesis in the kernel."""
if not check_auth(): return jsonify({"error": "Unauthorized"}), 401
data = request.json or {}
session_id = data.get("session_id", "default")
code = data.get("code", "")
kc = get_or_create_kernel(session_id)
result = execute_code(kc, code, timeout=120)
# Collect audio artifacts specifically
audio_artifacts = [a for a in result.get("artifacts", [])
if a.get("type","").startswith("audio/")]
return jsonify({**result, "audio_artifacts": audio_artifacts})
# 5. Browser actions endpoint (structured)
@app.route('/browser', methods=['POST'])
def browser_action():
"""
Execute structured browser actions without raw code.
action: screenshot | navigate | click | type | extract | pdf
"""
if not check_auth(): return jsonify({"error": "Unauthorized"}), 401
data = request.json or {}
action = data.get("action", "screenshot")
url = data.get("url", "")
session_id = data.get("session_id", "browser")
if action == "screenshot":
code = f"""
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(args=['--no-sandbox'])
page = browser.new_page(viewport={{'width':1280,'height':720}})
page.goto('{url}', wait_until='networkidle', timeout=30000)
screenshot = page.screenshot(full_page={data.get('full_page', False)})
title = page.title()
browser.close()
import IPython.display as d
d.display(d.Image(data=screenshot))
print(f"URL: {url}")
print(f"Title: {{title}}")
"""
elif action == "extract":
selector = data.get("selector", "body")
code = f"""
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(args=['--no-sandbox'])
page = browser.new_page()
page.goto('{url}', wait_until='networkidle', timeout=30000)
text = page.inner_text('{selector}')
browser.close()
print(text[:5000])
"""
elif action == "pdf":
code = f"""
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(args=['--no-sandbox'])
page = browser.new_page()
page.goto('{url}', wait_until='networkidle', timeout=30000)
pdf_bytes = page.pdf(format='A4')
browser.close()
import base64
print("PDF_B64:" + base64.b64encode(pdf_bytes).decode())
print(f"PDF size: {{len(pdf_bytes)}} bytes")
"""
else:
return jsonify({"error": f"Unknown action: {action}"}), 400
kc = get_or_create_kernel(session_id)
return jsonify(execute_code(kc, code, timeout=60))
# 6. Health endpoint with capabilities list
@app.route('/capabilities')
def capabilities():
return jsonify({
"python": True,
"playwright": True,
"packages": [
"pandas","numpy","matplotlib","seaborn","scipy","plotly",
"requests","beautifulsoup4","pillow","openpyxl",
"PyPDF2","python-docx","fpdf2","reportlab","kaleido"
],
"endpoints": [
"/execute","/install","/upload","/download",
"/terminal","/files","/browser","/audio",
"/sessions","/capabilities"
]
})
# 7. Interrupt running kernel (for long loops)
@app.route('/interrupt/<session_id>', methods=['POST'])
def interrupt_kernel(session_id):
if not check_auth(): return jsonify({"error": "Unauthorized"}), 401
with sessions_lock:
if session_id not in sessions:
return jsonify({"error": "Session not found"}), 404
try:
sessions[session_id]["km"].interrupt_kernel()
return jsonify({"message": f"Kernel {session_id} interrupted"})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
print("[SANDBOX] Pre-warming default kernel...")
get_or_create_kernel("default")
print("[SANDBOX] Ready on port 7860")
app.run(host="0.0.0.0", port=7860)