| import { |
| abortEmbeddedPiRun, |
| getActiveEmbeddedRunCount, |
| waitForActiveEmbeddedRuns, |
| } from "../../agents/pi-embedded-runner/runs.js"; |
| import type { startGatewayServer } from "../../gateway/server.js"; |
| import { acquireGatewayLock } from "../../infra/gateway-lock.js"; |
| import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; |
| import { |
| consumeGatewaySigusr1RestartAuthorization, |
| isGatewaySigusr1RestartExternallyAllowed, |
| markGatewaySigusr1RestartHandled, |
| } from "../../infra/restart.js"; |
| import { createSubsystemLogger } from "../../logging/subsystem.js"; |
| import { |
| getActiveTaskCount, |
| markGatewayDraining, |
| resetAllLanes, |
| waitForActiveTasks, |
| } from "../../process/command-queue.js"; |
| import { createRestartIterationHook } from "../../process/restart-recovery.js"; |
| import type { defaultRuntime } from "../../runtime.js"; |
|
|
| const gatewayLog = createSubsystemLogger("gateway"); |
|
|
| type GatewayRunSignalAction = "stop" | "restart"; |
|
|
| export async function runGatewayLoop(params: { |
| start: () => Promise<Awaited<ReturnType<typeof startGatewayServer>>>; |
| runtime: typeof defaultRuntime; |
| lockPort?: number; |
| }) { |
| let lock = await acquireGatewayLock({ port: params.lockPort }); |
| let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null; |
| let shuttingDown = false; |
| let restartResolver: (() => void) | null = null; |
|
|
| const cleanupSignals = () => { |
| process.removeListener("SIGTERM", onSigterm); |
| process.removeListener("SIGINT", onSigint); |
| process.removeListener("SIGUSR1", onSigusr1); |
| }; |
| const exitProcess = (code: number) => { |
| cleanupSignals(); |
| params.runtime.exit(code); |
| }; |
| const releaseLockIfHeld = async (): Promise<boolean> => { |
| if (!lock) { |
| return false; |
| } |
| await lock.release(); |
| lock = null; |
| return true; |
| }; |
| const reacquireLockForInProcessRestart = async (): Promise<boolean> => { |
| try { |
| lock = await acquireGatewayLock({ port: params.lockPort }); |
| return true; |
| } catch (err) { |
| gatewayLog.error(`failed to reacquire gateway lock for in-process restart: ${String(err)}`); |
| exitProcess(1); |
| return false; |
| } |
| }; |
| const handleRestartAfterServerClose = async () => { |
| const hadLock = await releaseLockIfHeld(); |
| |
| const respawn = restartGatewayProcessWithFreshPid(); |
| if (respawn.mode === "spawned" || respawn.mode === "supervised") { |
| const modeLabel = |
| respawn.mode === "spawned" |
| ? `spawned pid ${respawn.pid ?? "unknown"}` |
| : "supervisor restart"; |
| gatewayLog.info(`restart mode: full process restart (${modeLabel})`); |
| exitProcess(0); |
| return; |
| } |
| if (respawn.mode === "failed") { |
| gatewayLog.warn( |
| `full process restart failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`, |
| ); |
| } else { |
| gatewayLog.info( |
| `restart mode: in-process restart (${respawn.detail ?? "OPENCLAW_NO_RESPAWN"})`, |
| ); |
| } |
| if (hadLock && !(await reacquireLockForInProcessRestart())) { |
| return; |
| } |
| shuttingDown = false; |
| restartResolver?.(); |
| }; |
| const handleStopAfterServerClose = async () => { |
| await releaseLockIfHeld(); |
| exitProcess(0); |
| }; |
|
|
| const DRAIN_TIMEOUT_MS = 90_000; |
| const SHUTDOWN_TIMEOUT_MS = 5_000; |
|
|
| const request = (action: GatewayRunSignalAction, signal: string) => { |
| if (shuttingDown) { |
| gatewayLog.info(`received ${signal} during shutdown; ignoring`); |
| return; |
| } |
| shuttingDown = true; |
| const isRestart = action === "restart"; |
| gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); |
|
|
| |
| const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; |
| const forceExitTimer = setTimeout(() => { |
| gatewayLog.error("shutdown timed out; exiting without full cleanup"); |
| |
| |
| |
| exitProcess(isRestart ? 1 : 0); |
| }, forceExitMs); |
|
|
| void (async () => { |
| try { |
| |
| |
| if (isRestart) { |
| |
| |
| markGatewayDraining(); |
| const activeTasks = getActiveTaskCount(); |
| const activeRuns = getActiveEmbeddedRunCount(); |
|
|
| |
| |
| if (activeRuns > 0) { |
| abortEmbeddedPiRun(undefined, { mode: "compacting" }); |
| } |
|
|
| if (activeTasks > 0 || activeRuns > 0) { |
| gatewayLog.info( |
| `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, |
| ); |
| const [tasksDrain, runsDrain] = await Promise.all([ |
| activeTasks > 0 |
| ? waitForActiveTasks(DRAIN_TIMEOUT_MS) |
| : Promise.resolve({ drained: true }), |
| activeRuns > 0 |
| ? waitForActiveEmbeddedRuns(DRAIN_TIMEOUT_MS) |
| : Promise.resolve({ drained: true }), |
| ]); |
| if (tasksDrain.drained && runsDrain.drained) { |
| gatewayLog.info("all active work drained"); |
| } else { |
| gatewayLog.warn("drain timeout reached; proceeding with restart"); |
| |
| |
| abortEmbeddedPiRun(undefined, { mode: "all" }); |
| } |
| } |
| } |
|
|
| await server?.close({ |
| reason: isRestart ? "gateway restarting" : "gateway stopping", |
| restartExpectedMs: isRestart ? 1500 : null, |
| }); |
| } catch (err) { |
| gatewayLog.error(`shutdown error: ${String(err)}`); |
| } finally { |
| clearTimeout(forceExitTimer); |
| server = null; |
| if (isRestart) { |
| await handleRestartAfterServerClose(); |
| } else { |
| await handleStopAfterServerClose(); |
| } |
| } |
| })(); |
| }; |
|
|
| const onSigterm = () => { |
| gatewayLog.info("signal SIGTERM received"); |
| request("stop", "SIGTERM"); |
| }; |
| const onSigint = () => { |
| gatewayLog.info("signal SIGINT received"); |
| request("stop", "SIGINT"); |
| }; |
| const onSigusr1 = () => { |
| gatewayLog.info("signal SIGUSR1 received"); |
| const authorized = consumeGatewaySigusr1RestartAuthorization(); |
| if (!authorized && !isGatewaySigusr1RestartExternallyAllowed()) { |
| gatewayLog.warn( |
| "SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).", |
| ); |
| return; |
| } |
| markGatewaySigusr1RestartHandled(); |
| request("restart", "SIGUSR1"); |
| }; |
|
|
| process.on("SIGTERM", onSigterm); |
| process.on("SIGINT", onSigint); |
| process.on("SIGUSR1", onSigusr1); |
|
|
| try { |
| const onIteration = createRestartIterationHook(() => { |
| |
| |
| |
| |
| |
| |
| resetAllLanes(); |
| }); |
|
|
| |
| |
| let isFirstStart = true; |
| |
| while (true) { |
| onIteration(); |
| try { |
| server = await params.start(); |
| isFirstStart = false; |
| } catch (err) { |
| |
| |
| |
| |
| if (isFirstStart) { |
| throw err; |
| } |
| server = null; |
| |
| |
| |
| |
| await releaseLockIfHeld(); |
| const errMsg = err instanceof Error ? err.message : String(err); |
| const errStack = err instanceof Error && err.stack ? `\n${err.stack}` : ""; |
| gatewayLog.error( |
| `gateway startup failed: ${errMsg}. ` + |
| `Process will stay alive; fix the issue and restart.${errStack}`, |
| ); |
| } |
| await new Promise<void>((resolve) => { |
| restartResolver = resolve; |
| }); |
| } |
| } finally { |
| await releaseLockIfHeld(); |
| cleanupSignals(); |
| } |
| } |
|
|