|
|
|
import argparse |
|
from flask import Flask, render_template |
|
from flask_socketio import SocketIO |
|
import pty |
|
import os |
|
import subprocess |
|
import select |
|
import termios |
|
import struct |
|
import fcntl |
|
import shlex |
|
import logging |
|
import sys |
|
|
|
logging.getLogger("werkzeug").setLevel(logging.ERROR) |
|
|
|
__version__ = "0.5.0.2" |
|
|
|
app = Flask(__name__, template_folder=".", static_folder=".", static_url_path="") |
|
app.config["SECRET_KEY"] = "secret!" |
|
app.config["fd"] = None |
|
app.config["child_pid"] = None |
|
socketio = SocketIO(app) |
|
|
|
|
|
def set_winsize(fd, row, col, xpix=0, ypix=0): |
|
logging.debug("setting window size with termios") |
|
winsize = struct.pack("HHHH", row, col, xpix, ypix) |
|
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) |
|
|
|
|
|
def read_and_forward_pty_output(): |
|
max_read_bytes = 1024 * 20 |
|
while True: |
|
socketio.sleep(0.01) |
|
if app.config["fd"]: |
|
timeout_sec = 0 |
|
(data_ready, _, _) = select.select([app.config["fd"]], [], [], timeout_sec) |
|
if data_ready: |
|
output = os.read(app.config["fd"], max_read_bytes).decode( |
|
errors="ignore" |
|
) |
|
socketio.emit("pty-output", {"output": output}, namespace="/pty") |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
|
|
@socketio.on("pty-input", namespace="/pty") |
|
def pty_input(data): |
|
"""write to the child pty. The pty sees this as if you are typing in a real |
|
terminal. |
|
""" |
|
if app.config["fd"]: |
|
logging.debug("received input from browser: %s" % data["input"]) |
|
os.write(app.config["fd"], data["input"].encode()) |
|
|
|
|
|
@socketio.on("resize", namespace="/pty") |
|
def resize(data): |
|
if app.config["fd"]: |
|
logging.debug(f"Resizing window to {data['rows']}x{data['cols']}") |
|
set_winsize(app.config["fd"], data["rows"], data["cols"]) |
|
|
|
|
|
@socketio.on("connect", namespace="/pty") |
|
def connect(): |
|
"""new client connected""" |
|
logging.info("new client connected") |
|
if app.config["child_pid"]: |
|
|
|
return |
|
|
|
|
|
(child_pid, fd) = pty.fork() |
|
if child_pid == 0: |
|
|
|
|
|
|
|
subprocess.run(app.config["cmd"]) |
|
else: |
|
|
|
|
|
app.config["fd"] = fd |
|
app.config["child_pid"] = child_pid |
|
set_winsize(fd, 50, 50) |
|
cmd = " ".join(shlex.quote(c) for c in app.config["cmd"]) |
|
|
|
|
|
socketio.start_background_task(target=read_and_forward_pty_output) |
|
|
|
logging.info("child pid is " + child_pid) |
|
logging.info( |
|
f"starting background task with command `{cmd}` to continously read " |
|
"and forward pty output to client" |
|
) |
|
logging.info("task started") |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description=( |
|
"A fully functional terminal in your browser. " |
|
"https://github.com/cs01/pyxterm.js" |
|
), |
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
|
) |
|
parser.add_argument( |
|
"-p", "--port", default=5000, help="port to run server on", type=int |
|
) |
|
parser.add_argument( |
|
"--host", |
|
default="127.0.0.1", |
|
help="host to run server on (use 0.0.0.0 to allow access from other hosts)", |
|
) |
|
parser.add_argument("--debug", action="store_true", help="debug the server") |
|
parser.add_argument("--version", action="store_true", help="print version and exit") |
|
parser.add_argument( |
|
"--command", default="bash", help="Command to run in the terminal" |
|
) |
|
parser.add_argument( |
|
"--cmd-args", |
|
default="", |
|
help="arguments to pass to command (i.e. --cmd-args='arg1 arg2 --flag')", |
|
) |
|
args = parser.parse_args() |
|
if args.version: |
|
print(__version__) |
|
exit(0) |
|
app.config["cmd"] = [args.command] + shlex.split(args.cmd_args) |
|
green = "\033[92m" |
|
end = "\033[0m" |
|
log_format = ( |
|
green |
|
+ "pyxtermjs > " |
|
+ end |
|
+ "%(levelname)s (%(funcName)s:%(lineno)s) %(message)s" |
|
) |
|
logging.basicConfig( |
|
format=log_format, |
|
stream=sys.stdout, |
|
level=logging.DEBUG if args.debug else logging.INFO, |
|
) |
|
logging.info(f"serving on http://{args.host}:{args.port}") |
|
socketio.run(app, debug=args.debug, port=args.port, host=args.host) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|