Implement new structure
This commit is contained in:
223
src/node/wrapper.ts
Normal file
223
src/node/wrapper.ts
Normal file
@@ -0,0 +1,223 @@
|
||||
import { logger, field } from "@coder/logger"
|
||||
import * as cp from "child_process"
|
||||
import { Emitter } from "../common/emitter"
|
||||
|
||||
interface HandshakeMessage {
|
||||
type: "handshake"
|
||||
}
|
||||
|
||||
interface RelaunchMessage {
|
||||
type: "relaunch"
|
||||
version: string
|
||||
}
|
||||
|
||||
export type Message = RelaunchMessage | HandshakeMessage
|
||||
|
||||
export class ProcessError extends Error {
|
||||
public constructor(message: string, public readonly code: number | undefined) {
|
||||
super(message)
|
||||
this.name = this.constructor.name
|
||||
Error.captureStackTrace(this, this.constructor)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure we control when the process exits.
|
||||
*/
|
||||
const exit = process.exit
|
||||
process.exit = function(code?: number) {
|
||||
logger.warn(`process.exit() was prevented: ${code || "unknown code"}.`)
|
||||
} as (code?: number) => never
|
||||
|
||||
/**
|
||||
* Allows the wrapper and inner processes to communicate.
|
||||
*/
|
||||
export class IpcMain {
|
||||
private readonly _onMessage = new Emitter<Message>()
|
||||
public readonly onMessage = this._onMessage.event
|
||||
private readonly _onDispose = new Emitter<NodeJS.Signals | undefined>()
|
||||
public readonly onDispose = this._onDispose.event
|
||||
|
||||
public constructor(public readonly parentPid?: number) {
|
||||
process.on("SIGINT", () => this._onDispose.emit("SIGINT"))
|
||||
process.on("SIGTERM", () => this._onDispose.emit("SIGTERM"))
|
||||
process.on("exit", () => this._onDispose.emit(undefined))
|
||||
|
||||
this.onDispose((signal) => {
|
||||
// Remove listeners to avoid possibly triggering disposal again.
|
||||
process.removeAllListeners()
|
||||
|
||||
// Let any other handlers run first then exit.
|
||||
logger.debug(`${parentPid ? "inner process" : "wrapper"} ${process.pid} disposing`, field("code", signal))
|
||||
setTimeout(() => exit(0), 0)
|
||||
})
|
||||
|
||||
// Kill the inner process if the parent dies. This is for the case where the
|
||||
// parent process is forcefully terminated and cannot clean up.
|
||||
if (parentPid) {
|
||||
setInterval(() => {
|
||||
try {
|
||||
// process.kill throws an exception if the process doesn't exist.
|
||||
process.kill(parentPid, 0)
|
||||
} catch (_) {
|
||||
// Consider this an error since it should have been able to clean up
|
||||
// the child process unless it was forcefully killed.
|
||||
logger.error(`parent process ${parentPid} died`)
|
||||
this._onDispose.emit(undefined)
|
||||
}
|
||||
}, 5000)
|
||||
}
|
||||
}
|
||||
|
||||
public handshake(child?: cp.ChildProcess): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const target = child || process
|
||||
const onMessage = (message: Message): void => {
|
||||
logger.debug(
|
||||
`${child ? "wrapper" : "inner process"} ${process.pid} received message from ${
|
||||
child ? child.pid : this.parentPid
|
||||
}`,
|
||||
field("message", message)
|
||||
)
|
||||
if (message.type === "handshake") {
|
||||
target.removeListener("message", onMessage)
|
||||
target.on("message", (msg) => this._onMessage.emit(msg))
|
||||
// The wrapper responds once the inner process starts the handshake.
|
||||
if (child) {
|
||||
if (!target.send) {
|
||||
throw new Error("child not spawned with IPC")
|
||||
}
|
||||
target.send({ type: "handshake" })
|
||||
}
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
target.on("message", onMessage)
|
||||
if (child) {
|
||||
child.once("error", reject)
|
||||
child.once("exit", (code) => {
|
||||
reject(new ProcessError(`Unexpected exit with code ${code}`, code !== null ? code : undefined))
|
||||
})
|
||||
} else {
|
||||
// The inner process initiates the handshake.
|
||||
this.send({ type: "handshake" })
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
public relaunch(version: string): void {
|
||||
this.send({ type: "relaunch", version })
|
||||
}
|
||||
|
||||
private send(message: Message): void {
|
||||
if (!process.send) {
|
||||
throw new Error("not spawned with IPC")
|
||||
}
|
||||
process.send(message)
|
||||
}
|
||||
}
|
||||
|
||||
export const ipcMain = new IpcMain(
|
||||
typeof process.env.CODE_SERVER_PARENT_PID !== "undefined" ? parseInt(process.env.CODE_SERVER_PARENT_PID) : undefined
|
||||
)
|
||||
|
||||
export interface WrapperOptions {
|
||||
maxMemory?: number
|
||||
nodeOptions?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a way to wrap a process for the purpose of updating the running
|
||||
* instance.
|
||||
*/
|
||||
export class WrapperProcess {
|
||||
private process?: cp.ChildProcess
|
||||
private started?: Promise<void>
|
||||
|
||||
public constructor(private currentVersion: string, private readonly options?: WrapperOptions) {
|
||||
ipcMain.onDispose(() => {
|
||||
if (this.process) {
|
||||
this.process.removeAllListeners()
|
||||
this.process.kill()
|
||||
}
|
||||
})
|
||||
|
||||
ipcMain.onMessage(async (message) => {
|
||||
switch (message.type) {
|
||||
case "relaunch":
|
||||
logger.info(`Relaunching: ${this.currentVersion} -> ${message.version}`)
|
||||
this.currentVersion = message.version
|
||||
this.started = undefined
|
||||
if (this.process) {
|
||||
this.process.removeAllListeners()
|
||||
this.process.kill()
|
||||
}
|
||||
try {
|
||||
await this.start()
|
||||
} catch (error) {
|
||||
logger.error(error.message)
|
||||
exit(typeof error.code === "number" ? error.code : 1)
|
||||
}
|
||||
break
|
||||
default:
|
||||
logger.error(`Unrecognized message ${message}`)
|
||||
break
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
public start(): Promise<void> {
|
||||
if (!this.started) {
|
||||
const child = this.spawn()
|
||||
logger.debug(`spawned inner process ${child.pid}`)
|
||||
this.started = ipcMain.handshake(child).then(() => {
|
||||
child.once("exit", (code) => {
|
||||
logger.debug(`inner process ${child.pid} exited unexpectedly`)
|
||||
exit(code || 0)
|
||||
})
|
||||
})
|
||||
this.process = child
|
||||
}
|
||||
return this.started
|
||||
}
|
||||
|
||||
private spawn(): cp.ChildProcess {
|
||||
// Flags to pass along to the Node binary.
|
||||
let nodeOptions = `${process.env.NODE_OPTIONS || ""} ${(this.options && this.options.nodeOptions) || ""}`
|
||||
if (!/max_old_space_size=(\d+)/g.exec(nodeOptions)) {
|
||||
nodeOptions += ` --max_old_space_size=${(this.options && this.options.maxMemory) || 2048}`
|
||||
}
|
||||
|
||||
return cp.fork(process.argv[1], process.argv.slice(2), {
|
||||
env: {
|
||||
...process.env,
|
||||
CODE_SERVER_PARENT_PID: process.pid.toString(),
|
||||
NODE_OPTIONS: nodeOptions,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// // It's possible that the pipe has closed (for example if you run code-server
|
||||
// // --version | head -1). Assume that means we're done.
|
||||
if (!process.stdout.isTTY) {
|
||||
process.stdout.on("error", () => exit())
|
||||
}
|
||||
|
||||
export const wrap = (fn: () => Promise<void>): void => {
|
||||
if (ipcMain.parentPid) {
|
||||
ipcMain
|
||||
.handshake()
|
||||
.then(() => fn())
|
||||
.catch((error: ProcessError): void => {
|
||||
logger.error(error.message)
|
||||
exit(typeof error.code === "number" ? error.code : 1)
|
||||
})
|
||||
} else {
|
||||
const wrapper = new WrapperProcess(require("../../package.json").version)
|
||||
wrapper.start().catch((error) => {
|
||||
logger.error(error.message)
|
||||
exit(typeof error.code === "number" ? error.code : 1)
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user