hermes-bot / entry.py
Z User
fix: eliminate gateway dual-start race condition
20f5934
#!/usr/bin/env python3
"""Hermes Bot — HuggingFace Space Entry Point.
Serves as the main HTTP entry point on port 7860.
Routes traffic to:
- hermes-web-ui BFF (Node.js on :6060) for the WebUI interface
- Original dashboard on :7860/ and /deploy
- Gateway health on /api/gateway-status
"""
import json
import os
import re
import subprocess
import sys
import threading
import time
import logging
import psutil
import mimetypes
import urllib.request
import urllib.error
import urllib.parse
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from queue import Queue, Empty
from io import BytesIO
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
allow_reuse_address = True
allow_reuse_port = True # Allow multiple binds during restart
# Limit concurrent connections to prevent DoS / RAM exhaustion
_semaphore = threading.Semaphore(50)
def server_bind(self):
# Set SO_REUSEADDR and SO_REUSEPORT before binding
import socket
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, 15, 1) # SO_REUSEPORT
except (AttributeError, OSError):
pass
super().server_bind()
def process_request(self, request, client_address):
with self._semaphore:
return super().process_request(request, client_address)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
HERMES_HOME = os.path.expanduser("~/.hermes")
DATA_DIR = "/data/hermes"
LOG_DIR = os.path.join(HERMES_HOME, "logs")
LOG_FILE = os.path.join(LOG_DIR, "gateway.log")
CONFIG_FILE = os.path.join(HERMES_HOME, "config.yaml")
ENV_FILE = os.path.join(HERMES_HOME, ".env")
DASHBOARD_HTML = "/app/dashboard.html"
WEBUI_CLIENT_DIR = "/app/webui-client"
# Backend URLs
WEBUI_BFF_URL = "http://127.0.0.1:6060"
GATEWAY_URL = "http://127.0.0.1:8642"
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("entry")
# ---------------------------------------------------------------------------
# Global state
# ---------------------------------------------------------------------------
_gateway_start_time = time.time()
_log_subscribers: list[Queue] = []
_log_tail_offset = 0
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mask_key(key: str) -> str:
if not key or len(key) < 10:
return "····"
return key[:7] + "····" + key[-4:]
def _load_env() -> dict[str, str]:
env = {}
try:
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, _, v = line.partition("=")
env[k.strip()] = v.strip().strip("\"'")
except FileNotFoundError:
pass
return env
def _load_config() -> dict:
try:
import yaml
with open(CONFIG_FILE) as f:
return yaml.safe_load(f) or {}
except ImportError:
cfg: dict = {}
try:
with open(CONFIG_FILE) as f:
for line in f:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if ":" in stripped and not line.startswith(" "):
k, _, v = stripped.partition(":")
k, v = k.strip(), v.strip()
if v:
cfg[k] = v
except FileNotFoundError:
pass
return cfg
except FileNotFoundError:
return {}
def _get_sessions_count() -> int:
sessions_dir = os.path.join(HERMES_HOME, "sessions")
if not os.path.isdir(sessions_dir):
return 0
count = 0
for _ in Path(sessions_dir).rglob("*.json"):
count += 1
return count
def _get_session_list() -> list[dict]:
sessions_dir = os.path.join(HERMES_HOME, "sessions")
sessions = []
if not os.path.isdir(sessions_dir):
return sessions
for f in sorted(Path(sessions_dir).rglob("*.json"), key=os.path.getmtime, reverse=True)[:20]:
try:
data = json.loads(f.read_text())
name = f.stem
msgs = len(data.get("trajectory", data.get("messages", [])))
mtime = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M")
platform = data.get("platform", data.get("channel", ""))
sessions.append({
"id": f.stem,
"name": name[:40],
"messages": msgs,
"last": mtime,
"platform": platform,
"active": (time.time() - f.stat().st_mtime) < 3600,
})
except Exception:
sessions.append({"id": f.stem, "name": f.stem[:40], "messages": 0, "last": "--", "platform": "", "active": False})
return sessions
def _proxy_request(url: str, method: str = "GET", headers: dict = None,
body: bytes = None, timeout: int = 120) -> tuple[int, dict, bytes]:
"""Proxy a request to a backend URL and return (status, response_headers, body)."""
req = urllib.request.Request(url, data=body, method=method)
if headers:
for k, v in headers.items():
try:
req.add_header(k, v)
except Exception:
pass
resp_body = b""
resp_headers = {}
resp_status = 502
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp_status = resp.status
resp_body = resp.read()
# Collect response headers (skip hop-by-hop headers)
skip = {"transfer-encoding", "connection", "keep-alive", "server", "date"}
for key, val in resp.getheaders():
if key.lower() not in skip:
resp_headers[key] = val
except urllib.error.HTTPError as e:
resp_status = e.code
try:
resp_body = e.read()
except Exception:
resp_body = b""
for key, val in list(e.headers.items()):
skip = {"transfer-encoding", "connection", "keep-alive", "server", "date"}
if key.lower() not in skip:
resp_headers[key] = val
except Exception as e:
logger.warning("Proxy error: %s -> %s", url, e)
resp_body = json.dumps({"error": str(e)}).encode()
return resp_status, resp_headers, resp_body
# ---------------------------------------------------------------------------
# Log tailer
# ---------------------------------------------------------------------------
def _parse_log_line(line: str) -> dict | None:
m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?\s*(.*)", line)
if not m:
m = re.match(r"\[?(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\]?\s*\[?(INFO|WARN|WARNING|ERROR|DEBUG)\]?.*?:\s*(.*)", line)
if not m:
return None
return {
"time": m.group(1),
"level": m.group(2).upper(),
"msg": m.group(3).strip(),
"hl": "feishu" in m.group(3).lower() or "connected" in m.group(3).lower(),
}
def _log_tailer():
global _log_tail_offset
while True:
try:
if not os.path.isfile(LOG_FILE):
time.sleep(2)
continue
with open(LOG_FILE, "r", errors="replace") as f:
f.seek(_log_tail_offset)
new_lines = f.readlines()
_log_tail_offset = f.tell()
if new_lines:
parsed = []
for line in new_lines:
p = _parse_log_line(line.strip())
if p:
parsed.append(p)
if parsed:
dead = []
for q in _log_subscribers:
try:
q.put_nowait(parsed)
except Exception:
dead.append(q)
for q in dead:
_log_subscribers.remove(q)
time.sleep(0.5)
except Exception as e:
logger.debug("Log tailer error: %s", e)
time.sleep(1)
# ---------------------------------------------------------------------------
# Persistent storage setup
# ---------------------------------------------------------------------------
def _ensure_persistent_storage():
for d in ("sessions", "memories", "uploads", "logs", "palace", "skills", "weixin"):
os.makedirs(os.path.join(DATA_DIR, d), exist_ok=True)
hermes = Path(HERMES_HOME)
hermes.mkdir(parents=True, exist_ok=True)
for d in ("sessions", "memories", "uploads", "logs", "palace", "skills", "weixin"):
target = hermes / d
if not target.exists():
try:
target.symlink_to(os.path.join(DATA_DIR, d))
logger.info("Symlink: %s -> %s", d, os.path.join(DATA_DIR, d))
except OSError:
target.mkdir(exist_ok=True)
logger.warning("Could not symlink %s, using local dir", d)
# ---------------------------------------------------------------------------
# HTTP Handler — Reverse Proxy + Dashboard
# ---------------------------------------------------------------------------
class ProxyHandler(BaseHTTPRequestHandler):
"""Routes traffic between WebUI BFF, Gateway, and Dashboard."""
def log_message(self, fmt, *args):
pass # silence request logs
def _check_auth(self) -> bool:
"""Check Bearer token auth for sensitive endpoints."""
auth = self.headers.get("Authorization", "")
expected = os.environ.get("AUTH_TOKEN", "hermes-bot-2026")
return auth == f"Bearer {expected}"
def _send_json(self, data: dict, status=200):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def _send_html(self, path: str):
try:
with open(path, "rb") as f:
body = f.read()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except FileNotFoundError:
self.send_error(404)
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(length) if length > 0 else b""
def _send_file(self, filepath: str, content_type: str = None):
try:
with open(filepath, "rb") as f:
body = f.read()
if content_type is None:
content_type = mimetypes.guess_type(filepath)[0] or "application/octet-stream"
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "public, max-age=86400")
self.end_headers()
self.wfile.write(body)
except FileNotFoundError:
self.send_error(404)
def _send_proxy_response(self, status: int, headers: dict, body: bytes):
self.send_response(status)
# Always add CORS
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Hermes-Session-Id, X-Hermes-Profile, X-Hermes-Session-Token")
content_type = headers.pop("Content-Type", headers.pop("content-type", "application/json"))
self.send_header("Content-Type", content_type)
content_length = headers.pop("Content-Length", headers.pop("content-length", None))
if content_length is None:
content_length = str(len(body))
self.send_header("Content-Length", content_length)
for k, v in headers.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(body)
def _forward_to_webui(self, path: str, method: str):
"""Forward request to WebUI BFF on :6060."""
url = f"{WEBUI_BFF_URL}{path}"
headers = {}
# Forward key headers
for h in ("Authorization", "Content-Type", "X-Hermes-Session-Id",
"X-Hermes-Profile", "X-Hermes-Session-Token"):
val = self.headers.get(h)
if val:
headers[h] = val
body = self._read_body() if method in ("POST", "PUT", "PATCH", "DELETE") else None
status, resp_headers, resp_body = _proxy_request(url, method, headers, body)
self._send_proxy_response(status, resp_headers, resp_body)
def _forward_to_gateway(self, path: str, method: str):
"""Forward request to Gateway API on :8642."""
url = f"{GATEWAY_URL}{path}"
headers = {}
for h in ("Authorization", "Content-Type", "X-Hermes-Session-Id"):
val = self.headers.get(h)
if val:
headers[h] = val
body = self._read_body() if method in ("POST", "PUT", "PATCH", "DELETE") else None
status, resp_headers, resp_body = _proxy_request(url, method, headers, body)
self._send_proxy_response(status, resp_headers, resp_body)
# ── OPTIONS (CORS preflight) ──
def do_OPTIONS(self):
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Hermes-Session-Id, X-Hermes-Profile, X-Hermes-Session-Token")
self.send_header("Access-Control-Max-Age", "86400")
self.send_header("Content-Length", "0")
self.end_headers()
# ── GET routes ──
def do_GET(self):
parsed = urlparse(self.path)
path = parsed.path
qs = parsed.query
method_override = "GET"
# ── Root → redirect to WebUI Chat ──
if path in ("/", "/index.html"):
self.send_response(302)
self.send_header("Location", "/webui#/hermes/chat")
self.send_header("Content-Length", "0")
self.end_headers()
return
# ── Deploy overview ──
if path == "/deploy":
return self._send_html("/app/deploy.html")
# ── Diagnostic: gateway log dump (auth required) ──
if path == "/api/diagnostics/gateway-log":
if not self._check_auth():
return self._send_json({"error": "unauthorized"}, 401)
return self._handle_gateway_log()
# ── Diagnostic: gateway restart (auth required) ──
if path == "/api/diagnostics/restart-gateway":
if not self._check_auth():
return self._send_json({"error": "unauthorized"}, 401)
return self._handle_restart_gateway()
# ── SSE log stream (auth required) ──
if path == "/api/logs/stream":
if not self._check_auth():
return self._send_json({"error": "unauthorized"}, 401)
return self._handle_sse()
# ── WebUI SPA: /webui, /webui/* → serve from webui-client static dir ──
if path.startswith("/webui"):
# Map /webui → /, /webui/xxx → /xxx for the static dir
if path in ("/webui", "/webui/"):
index_path = os.path.join(WEBUI_CLIENT_DIR, "index.html")
if os.path.isfile(index_path):
return self._send_html(index_path)
# Try static file
static_path = os.path.join(WEBUI_CLIENT_DIR, path[len("/webui"):].lstrip("/"))
if os.path.isfile(static_path):
return self._send_file(static_path)
# SPA fallback
index_path = os.path.join(WEBUI_CLIENT_DIR, "index.html")
if os.path.isfile(index_path):
return self._send_html(index_path)
# Final fallback: proxy to BFF
return self._forward_to_webui(path[5:] or "/", "GET")
# ── WebUI static assets (referenced by SPA as /assets/*, /favicon*) ──
# These must be served from webui-client dir because the SPA uses absolute paths
if path.startswith("/assets/") or path.startswith("/favicon") or path == "/logo.png":
static_path = os.path.join(WEBUI_CLIENT_DIR, path.lstrip("/"))
if os.path.isfile(static_path):
return self._send_file(static_path)
# If not found locally, try proxying to BFF
return self._forward_to_webui(self.path, "GET")
# ── WebUI API routes: /api/*, /v1/* → proxy to WebUI BFF ──
if path.startswith("/api/") or path.startswith("/v1/"):
return self._forward_to_webui(self.path, "GET")
# ── /health → proxy to BFF ──
if path == "/health":
return self._forward_to_webui("/health", "GET")
# ── /upload, /webhook → proxy to BFF ──
if path in ("/upload", "/webhook"):
return self._forward_to_webui(self.path, "GET")
# ── /socket.io/* → proxy to BFF (WebSocket upgrade handled separately) ──
if path.startswith("/socket.io/"):
return self._forward_to_webui(self.path, "GET")
self.send_error(404)
# ── POST routes ──
def _rewrite_webui_path(self, path: str) -> str:
"""Rewrite /webui/* to /* for BFF proxy."""
if path.startswith("/webui") and len(path) > 5:
return path[5:] or "/"
if path == "/webui":
return "/"
return path
def do_POST(self):
parsed = urlparse(self.path)
path = parsed.path
bff_path = self._rewrite_webui_path(path)
# All /api/*, /v1/*, /upload, /webhook → proxy to WebUI BFF
if path.startswith("/api/") or path.startswith("/v1/") or path in ("/upload", "/webhook") or path.startswith("/webui"):
return self._forward_to_webui(bff_path + parsed.query, "POST")
self.send_error(404)
def do_PUT(self):
parsed = urlparse(self.path)
path = parsed.path
bff_path = self._rewrite_webui_path(path)
if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"):
return self._forward_to_webui(bff_path + parsed.query, "PUT")
self.send_error(404)
def do_DELETE(self):
parsed = urlparse(self.path)
path = parsed.path
bff_path = self._rewrite_webui_path(path)
if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"):
return self._forward_to_webui(bff_path + parsed.query, "DELETE")
self.send_error(404)
def do_PATCH(self):
parsed = urlparse(self.path)
path = parsed.path
bff_path = self._rewrite_webui_path(path)
if path.startswith("/api/") or path.startswith("/v1/") or path.startswith("/webui"):
return self._forward_to_webui(bff_path + parsed.query, "PATCH")
self.send_error(404)
# ── Handlers ──
def _get_status(self) -> dict:
env = _load_env()
cfg = _load_config()
ram = psutil.virtual_memory()
is_running = False
pid = "N/A"
# Check gateway health via HTTP
try:
status, _, body = _proxy_request(f"{GATEWAY_URL}/health", "GET", timeout=3)
is_running = (status == 200)
except Exception:
pass
model = cfg.get("model", env.get("LLM_MODEL", "unknown"))
provider = cfg.get("provider", "openrouter")
fallback = cfg.get("fallback_model", {})
fallback_model = fallback.get("model", "") if isinstance(fallback, dict) else ""
return {
"running": is_running,
"pid": pid,
"model": model,
"provider": provider,
"fallback_model": fallback_model,
"platform": "飞书 Feishu",
"platform_mode": "WebSocket",
"sessions": _get_sessions_count(),
"messages": 0,
"ram": f"{ram.percent:.0f}%",
"uptime_ms": int((time.time() - _gateway_start_time) * 1000),
"config": {
"OPENROUTER_API_KEY": _mask_key(env.get("OPENROUTER_API_KEY", "")),
"FEISHU_APP_ID": env.get("FEISHU_APP_ID", ""),
"FEISHU_APP_SECRET": _mask_key(env.get("FEISHU_APP_SECRET", "")),
"terminal": cfg.get("terminal", {}).get("backend", "local") if isinstance(cfg.get("terminal"), dict) else "local",
"timezone": cfg.get("timezone", "北京时间 (UTC+8)"),
"max_turns": cfg.get("max_turns", "90"),
"memory": cfg.get("memory", {}).get("provider", "none") if isinstance(cfg.get("memory"), dict) else "none",
"mcp_servers": list(cfg.get("mcp_servers", {}).keys()) if isinstance(cfg.get("mcp_servers"), dict) else [],
"compress": cfg.get("compress", {}).get("enabled", False) if isinstance(cfg.get("compress"), dict) else False,
},
}
def _handle_sse(self):
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
q: Queue = Queue(maxsize=100)
_log_subscribers.append(q)
try:
history = self._get_log_history_inner(limit=100)
if history:
self.wfile.write(f"data: {json.dumps(history, ensure_ascii=False)}\n\n".encode())
self.wfile.flush()
while True:
try:
lines = q.get(timeout=30)
payload = f"data: {json.dumps(lines, ensure_ascii=False)}\n\n"
self.wfile.write(payload.encode())
self.wfile.flush()
except Empty:
self.wfile.write(":heartbeat\n\n".encode())
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
pass
finally:
if q in _log_subscribers:
_log_subscribers.remove(q)
def _handle_gateway_log(self):
"""Return raw gateway.log contents for diagnostics."""
log_path = LOG_FILE
if not os.path.isfile(log_path):
# Also check the data dir directly
alt = "/data/hermes/logs/gateway.log"
if os.path.isfile(alt):
log_path = alt
if not os.path.isfile(log_path):
self._send_json({"error": "gateway.log not found", "checked": [LOG_FILE, "/data/hermes/logs/gateway.log"]})
return
try:
with open(log_path, "r", errors="replace") as f:
content = f.read()
body = content.encode("utf-8") if content else b"(empty)"
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
except Exception as e:
self._send_json({"error": str(e)})
def _handle_restart_gateway(self):
"""Trigger gateway restart via watchdog signal file."""
try:
signal_file = "/tmp/hermes-gateway-restart"
with open(signal_file, "w") as f:
f.write(str(time.time()))
self._send_json({"ok": True, "message": "Restart signal sent"})
except Exception as e:
self._send_json({"ok": False, "error": str(e)})
def _get_log_history(self, query: str = "") -> list:
params = parse_qs(query)
limit = int(params.get("limit", ["100"])[0])
return self._get_log_history_inner(limit=limit)
def _get_log_history_inner(self, limit: int = 100) -> list:
if not os.path.isfile(LOG_FILE):
return []
try:
with open(LOG_FILE, "r", errors="replace") as f:
lines = f.readlines()[-limit:]
result = []
for line in lines:
p = _parse_log_line(line.strip())
if p:
result.append(p)
return result
except Exception:
return []
# ---------------------------------------------------------------------------
# Python-based Gateway Watchdog (robust, lives inside entry.py)
# ---------------------------------------------------------------------------
def _gateway_watchdog(interval: int = 30):
"""Monitor gateway process liveness and auto-restart if it dies.
This runs as a daemon thread inside entry.py so it survives
SIGTERM to the shell-level watchdog in start.sh.
"""
import signal as _signal
pid_file = os.path.join(HERMES_HOME, "gateway.pid")
alt_pid_file = "/tmp/hermes-gateway.pid"
lock_file = os.path.join(HERMES_HOME, ".gateway_runtime_lock")
takeover_file = os.path.join(HERMES_HOME, ".gateway_takeover")
gw_log = "/data/hermes/logs/gateway.log"
def _find_gateway_pid() -> int | None:
"""Find the gateway PID from pid files or by process name."""
for pf in (pid_file, alt_pid_file):
try:
with open(pf) as f:
pid = int(f.read().strip())
if pid > 0 and psutil.pid_exists(pid):
proc = psutil.Process(pid)
if "hermes" in " ".join(proc.cmdline()).lower() or "gateway" in " ".join(proc.cmdline()).lower():
return pid
except Exception:
pass
# Fallback: search by process name
try:
for proc in psutil.process_iter(["pid", "cmdline"]):
try:
cmdline = " ".join(proc.info["cmdline"] or [])
if "hermes_cli.main" in cmdline and "gateway" in cmdline:
return proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
except Exception:
pass
return None
def _kill_all_gateways():
"""Force-kill ALL existing gateway processes and wait for ports to free.
This prevents the 'another gateway is already using this Feishu app_id'
error and port conflicts when restarting after a crash.
"""
killed = []
# Find ALL hermes gateway processes (not just the one we track)
try:
for proc in psutil.process_iter(["pid", "cmdline", "create_time"]):
try:
cmdline = " ".join(proc.info["cmdline"] or [])
if "hermes_cli.main" in cmdline and "gateway" in cmdline:
pid = proc.info["pid"]
logger.info("[Watchdog] Force-killing residual gateway PID=%d", pid)
proc.kill() # SIGKILL — no graceful shutdown, just die
killed.append(pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
except Exception as e:
logger.warning("[Watchdog] Error scanning processes: %s", e)
# Also kill any lingering aiohttp servers on port 8642/8643
for port in (8642, 8643):
try:
for conn in psutil.net_connections(kind='inet'):
if conn.laddr.port == port and conn.status == 'LISTEN':
try:
owner = psutil.Process(conn.pid)
if conn.pid not in killed:
logger.info("[Watchdog] Force-killing process %d on port %d", conn.pid, port)
owner.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
except Exception:
pass
# Wait for processes to actually die and ports to be released
if killed:
time.sleep(3)
for pid in killed:
try:
p = psutil.Process(pid)
if p.is_running():
logger.warning("[Watchdog] PID %d still running after kill, waiting...", pid)
time.sleep(5)
except psutil.NoSuchProcess:
pass
# Clean ALL stale state files
for f in (pid_file, alt_pid_file, lock_file, takeover_file):
try:
os.remove(f)
except Exception:
pass
# Also clean any .feishu_lock or similar files
for pattern in ("feishu*.lock", "*.feishu_lock"):
import glob as _glob
for lock_f in _glob.glob(os.path.join(HERMES_HOME, pattern)):
try:
os.remove(lock_f)
logger.info("[Watchdog] Removed stale lock: %s", lock_f)
except Exception:
pass
return len(killed)
def _start_gateway():
"""Start the gateway process with full cleanup of any residual state."""
# Force-kill ALL residual gateway processes first
killed = _kill_all_gateways()
if killed:
logger.info("[Watchdog] Killed %d residual gateway process(es) before restart", killed)
log_fh = None
try:
log_fh = open(gw_log, "a")
log_fh.write(f"\n--- [Python Watchdog] Starting gateway at {datetime.now().isoformat()} ---\n")
log_fh.flush()
except Exception:
pass
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env["HERMES_ACCEPT_HOOKS"] = "1"
proc = subprocess.Popen(
[sys.executable, "-u", "-m", "hermes_cli.main", "gateway", "run", "-v", "--replace"],
stdout=log_fh if log_fh else subprocess.DEVNULL,
stderr=subprocess.STDOUT,
env=env,
start_new_session=True, # decouple from entry.py signals
)
try:
with open(alt_pid_file, "w") as f:
f.write(str(proc.pid))
except Exception:
pass
return proc
logger.info("[Watchdog] Python gateway watchdog started (interval=%ds)", interval)
# On startup: check if gateway is already running; if not, start it immediately
initial_pid = _find_gateway_pid()
if initial_pid is None:
logger.info("[Watchdog] No gateway running on startup — launching gateway now")
_start_gateway()
time.sleep(15) # Give it time to fully initialize
else:
logger.info("[Watchdog] Gateway already running (PID=%d) on startup", initial_pid)
time.sleep(5) # Brief settle time
last_restart = 0
restart_backoff = 30
while True:
try:
time.sleep(interval)
gw_pid = _find_gateway_pid()
if gw_pid is not None:
# Process is alive — but check if gateway is FUNCTIONAL (not zombie)
# A zombie gateway has a live process but dead asyncio executor,
# causing "Executor shutdown has been called" on every send.
try:
# Check gateway log for executor shutdown errors in last 60s
now_ts = time.time()
_zombie = False
if os.path.isfile(gw_log):
with open(gw_log, "r", errors="replace") as _lf:
for _line in _lf.readlines()[-50:]:
if "Executor shutdown" in _line and "RuntimeError" in _line:
_zombie = True
break
if _zombie:
logger.error("[Watchdog] Gateway is ZOMBIE (executor shutdown) — force-restarting...")
killed = _kill_all_gateways()
if not killed:
try:
os.kill(gw_pid, 9)
except Exception:
pass
time.sleep(3)
# Fall through to restart below
else:
restart_backoff = 30
continue
except Exception:
# If log check fails, assume gateway is fine
restart_backoff = 30
continue
else:
logger.warning("[Watchdog] Gateway process is DEAD — restarting...")
# Gateway is dead or zombie — restart with backoff
now = time.time()
if now - last_restart < restart_backoff:
continue
logger.warning("[Watchdog] Gateway process is DEAD — restarting...")
proc = _start_gateway()
last_restart = now
restart_backoff = min(restart_backoff * 2, 300) # exponential backoff, max 5min
# Wait and verify
time.sleep(15)
try:
if proc.poll() is None:
logger.info("[Watchdog] Gateway restarted successfully (PID=%d)", proc.pid)
restart_backoff = 30 # reset on success
else:
logger.error("[Watchdog] Gateway exited immediately with code %d", proc.returncode)
except Exception as e:
logger.error("[Watchdog] Failed to verify restart: %s", e)
except Exception as e:
logger.error("[Watchdog] Error: %s", e)
time.sleep(10)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
logger.info("=== Hermes Bot — HuggingFace Space Entry ===")
logger.info("WebUI at /webui (hermes-web-ui)")
logger.info("Dashboard at /")
# Setup persistent storage
_ensure_persistent_storage()
logger.info("Persistent storage ready at %s", DATA_DIR)
# Start HTTP proxy server FIRST (before background checks)
# HF Space requires the EXPOSE'd port to be bound quickly,
# otherwise it shows a 404 page even though the container is RUNNING.
try:
server = ThreadingHTTPServer(("0.0.0.0", 7860), ProxyHandler)
logger.info("Proxy listening on :7860")
except OSError as e:
logger.error("FATAL: Cannot bind port 7860: %s", e)
sys.exit(1)
# Start log tailer thread
tailer = threading.Thread(target=_log_tailer, daemon=True)
tailer.start()
logger.info("Log tailer started")
# Start Python-based gateway watchdog (survives shell death)
watchdog = threading.Thread(target=_gateway_watchdog, args=(30,), daemon=True, name="gateway-watchdog")
watchdog.start()
logger.info("Python gateway watchdog started")
# Check if backend services are reachable (non-blocking, best-effort)
# This runs in a background thread so it doesn't delay serve_forever()
def _wait_for_backends():
for attempt in range(10):
try:
urllib.request.urlopen(f"{GATEWAY_URL}/health", timeout=2)
logger.info("Gateway reachable at %s", GATEWAY_URL)
break
except Exception:
time.sleep(2)
for attempt in range(10):
try:
urllib.request.urlopen(f"{WEBUI_BFF_URL}/health", timeout=2)
logger.info("WebUI BFF reachable at %s", WEBUI_BFF_URL)
break
except Exception:
time.sleep(2)
threading.Thread(target=_wait_for_backends, daemon=True).start()
# Start serving (this blocks forever)
try:
logger.info(" / → Dashboard")
logger.info(" /webui → hermes-web-ui")
logger.info(" /api/* → proxy to WebUI BFF (:6060)")
logger.info(" /v1/* → proxy to WebUI BFF (:6060)")
server.serve_forever()
except Exception as e:
logger.error("FATAL: Proxy server error: %s", e)
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()