Split child and parent wrappers

I think having them combined and relying on if statements was getting
confusing especially if we want to add additional messages with
different payloads (which will soon be the case).
This commit is contained in:
Asher 2020-11-18 11:43:25 -06:00
parent 2a3608df53
commit d55e06936b
No known key found for this signature in database
GPG Key ID: D63C1EF81242354A
3 changed files with 146 additions and 103 deletions

View File

@ -19,7 +19,7 @@ import { coderCloudBind } from "./coder-cloud"
import { commit, version } from "./constants" import { commit, version } from "./constants"
import { register } from "./routes" import { register } from "./routes"
import { humanPath, isFile, open } from "./util" import { humanPath, isFile, open } from "./util"
import { ipcMain, WrapperProcess } from "./wrapper" import { isChild, wrapper } from "./wrapper"
export const runVsCodeCli = (args: DefaultedArgs): void => { export const runVsCodeCli = (args: DefaultedArgs): void => {
logger.debug("forking vs code cli...") logger.debug("forking vs code cli...")
@ -137,7 +137,7 @@ const main = async (args: DefaultedArgs): Promise<void> => {
logger.info(" - Connected to cloud agent") logger.info(" - Connected to cloud agent")
} catch (err) { } catch (err) {
logger.error(err.message) logger.error(err.message)
ipcMain.exit(1) wrapper.exit(1)
} }
} }
@ -161,9 +161,9 @@ async function entry(): Promise<void> {
// There's no need to check flags like --help or to spawn in an existing // There's no need to check flags like --help or to spawn in an existing
// instance for the child process because these would have already happened in // instance for the child process because these would have already happened in
// the parent and the child wouldn't have been spawned. // the parent and the child wouldn't have been spawned.
if (ipcMain.isChild) { if (isChild(wrapper)) {
await ipcMain.handshake() await wrapper.handshake()
ipcMain.preventExit() wrapper.preventExit()
return main(args) return main(args)
} }
@ -201,11 +201,10 @@ async function entry(): Promise<void> {
return openInExistingInstance(args, socketPath) return openInExistingInstance(args, socketPath)
} }
const wrapper = new WrapperProcess(require("../../package.json").version)
return wrapper.start() return wrapper.start()
} }
entry().catch((error) => { entry().catch((error) => {
logger.error(error.message) logger.error(error.message)
ipcMain.exit(error) wrapper.exit(error)
}) })

View File

@ -8,7 +8,7 @@ import { rootPath } from "./constants"
import { settings } from "./settings" import { settings } from "./settings"
import { SocketProxyProvider } from "./socket" import { SocketProxyProvider } from "./socket"
import { isFile } from "./util" import { isFile } from "./util"
import { ipcMain } from "./wrapper" import { wrapper } from "./wrapper"
export class VscodeProvider { export class VscodeProvider {
public readonly serverRootPath: string public readonly serverRootPath: string
@ -20,7 +20,7 @@ export class VscodeProvider {
public constructor() { public constructor() {
this.vsRootPath = path.resolve(rootPath, "lib/vscode") this.vsRootPath = path.resolve(rootPath, "lib/vscode")
this.serverRootPath = path.join(this.vsRootPath, "out/vs/server") this.serverRootPath = path.join(this.vsRootPath, "out/vs/server")
ipcMain.onDispose(() => this.dispose()) wrapper.onDispose(() => this.dispose())
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {

View File

@ -1,4 +1,4 @@
import { field, logger } from "@coder/logger" import { Logger, field, logger } from "@coder/logger"
import * as cp from "child_process" import * as cp from "child_process"
import * as path from "path" import * as path from "path"
import * as rfs from "rotating-file-stream" import * as rfs from "rotating-file-stream"
@ -14,9 +14,9 @@ interface RelaunchMessage {
version: string version: string
} }
export type Message = RelaunchMessage | HandshakeMessage type Message = RelaunchMessage | HandshakeMessage
export class ProcessError extends Error { class ProcessError extends Error {
public constructor(message: string, public readonly code: number | undefined) { public constructor(message: string, public readonly code: number | undefined) {
super(message) super(message)
this.name = this.constructor.name this.name = this.constructor.name
@ -25,16 +25,26 @@ export class ProcessError extends Error {
} }
/** /**
* Allows the wrapper and inner processes to communicate. * Wrapper around a process that tries to gracefully exit when a process exits
* and provides a way to prevent `process.exit`.
*/ */
export class IpcMain { abstract class Process {
private readonly _onMessage = new Emitter<Message>() /**
public readonly onMessage = this._onMessage.event * Emit this to trigger a graceful exit.
private readonly _onDispose = new Emitter<NodeJS.Signals | undefined>() */
public readonly onDispose = this._onDispose.event protected readonly _onDispose = new Emitter<NodeJS.Signals | undefined>()
public readonly processExit: (code?: number) => never = process.exit
public constructor(private readonly parentPid?: number) { /**
* Emitted when the process is about to be disposed.
*/
public readonly onDispose = this._onDispose.event
/**
* Uniquely named logger for the process.
*/
public abstract logger: Logger
public constructor() {
process.on("SIGINT", () => this._onDispose.emit("SIGINT")) process.on("SIGINT", () => this._onDispose.emit("SIGINT"))
process.on("SIGTERM", () => this._onDispose.emit("SIGTERM")) process.on("SIGTERM", () => this._onDispose.emit("SIGTERM"))
process.on("exit", () => this._onDispose.emit(undefined)) process.on("exit", () => this._onDispose.emit(undefined))
@ -43,42 +53,27 @@ export class IpcMain {
// Remove listeners to avoid possibly triggering disposal again. // Remove listeners to avoid possibly triggering disposal again.
process.removeAllListeners() process.removeAllListeners()
// Try waiting for other handlers run first then exit. // Try waiting for other handlers to run first then exit.
logger.debug(`${parentPid ? "inner process" : "wrapper"} ${process.pid} disposing`, field("code", signal)) this.logger.debug("disposing", field("code", signal))
wait.then(() => this.exit(0)) wait.then(() => this.exit(0))
setTimeout(() => this.exit(0), 5000) setTimeout(() => this.exit(0), 5000)
}) })
// Kill the inner process if the parent dies. This is for the case where the
// parent process is forcefully terminated and cannot clean up.
if (parentPid) {
setInterval(() => {
try {
// process.kill throws an exception if the process doesn't exist.
process.kill(parentPid, 0)
} catch (_) {
// Consider this an error since it should have been able to clean up
// the child process unless it was forcefully killed.
logger.error(`parent process ${parentPid} died`)
this._onDispose.emit(undefined)
}
}, 5000)
}
} }
/** /**
* Ensure we control when the process exits. * Ensure control over when the process exits.
*/ */
public preventExit(): void { public preventExit(): void {
process.exit = function (code?: number) { ;(process.exit as any) = (code?: number) => {
logger.warn(`process.exit() was prevented: ${code || "unknown code"}.`) this.logger.warn(`process.exit() was prevented: ${code || "unknown code"}.`)
} as (code?: number) => never }
} }
public get isChild(): boolean { private readonly processExit: (code?: number) => never = process.exit
return typeof this.parentPid !== "undefined"
}
/**
* Will always exit even if normal exit is being prevented.
*/
public exit(error?: number | ProcessError): never { public exit(error?: number | ProcessError): never {
if (error && typeof error !== "number") { if (error && typeof error !== "number") {
this.processExit(typeof error.code === "number" ? error.code : 1) this.processExit(typeof error.code === "number" ? error.code : 1)
@ -86,47 +81,61 @@ export class IpcMain {
this.processExit(error) this.processExit(error)
} }
} }
}
public handshake(child?: cp.ChildProcess): Promise<void> { /**
return new Promise((resolve, reject) => { * Child process that will clean up after itself if the parent goes away and can
const target = child || process * perform a handshake with the parent and ask it to relaunch.
*/
class ChildProcess extends Process {
public logger = logger.named(`child:${process.pid}`)
public constructor(private readonly parentPid: number) {
super()
// Kill the inner process if the parent dies. This is for the case where the
// parent process is forcefully terminated and cannot clean up.
setInterval(() => {
try {
// process.kill throws an exception if the process doesn't exist.
process.kill(this.parentPid, 0)
} catch (_) {
// Consider this an error since it should have been able to clean up
// the child process unless it was forcefully killed.
this.logger.error(`parent process ${parentPid} died`)
this._onDispose.emit(undefined)
}
}, 5000)
}
/**
* Initiate the handshake and wait for a response from the parent.
*/
public handshake(): Promise<void> {
return new Promise((resolve) => {
const onMessage = (message: Message): void => { const onMessage = (message: Message): void => {
logger.debug( logger.debug(`received message from ${this.parentPid}`, field("message", message))
`${child ? "wrapper" : "inner process"} ${process.pid} received message from ${
child ? child.pid : this.parentPid
}`,
field("message", message),
)
if (message.type === "handshake") { if (message.type === "handshake") {
target.removeListener("message", onMessage) process.removeListener("message", onMessage)
target.on("message", (msg) => this._onMessage.emit(msg))
// The wrapper responds once the inner process starts the handshake.
if (child) {
if (!target.send) {
throw new Error("child not spawned with IPC")
}
target.send({ type: "handshake" })
}
resolve() resolve()
} }
} }
target.on("message", onMessage) // Initiate the handshake and wait for the reply.
if (child) { process.on("message", onMessage)
child.once("error", reject) this.send({ type: "handshake" })
child.once("exit", (code) => {
reject(new ProcessError(`Unexpected exit with code ${code}`, code !== null ? code : undefined))
})
} else {
// The inner process initiates the handshake.
this.send({ type: "handshake" })
}
}) })
} }
/**
* Notify the parent process that it should relaunch the child.
*/
public relaunch(version: string): void { public relaunch(version: string): void {
this.send({ type: "relaunch", version }) this.send({ type: "relaunch", version })
} }
/**
* Send a message to the parent.
*/
private send(message: Message): void { private send(message: Message): void {
if (!process.send) { if (!process.send) {
throw new Error("not spawned with IPC") throw new Error("not spawned with IPC")
@ -135,29 +144,30 @@ export class IpcMain {
} }
} }
/**
* Channel for communication between the child and parent processes.
*/
export const ipcMain = new IpcMain(
typeof process.env.CODE_SERVER_PARENT_PID !== "undefined" ? parseInt(process.env.CODE_SERVER_PARENT_PID) : undefined,
)
export interface WrapperOptions { export interface WrapperOptions {
maxMemory?: number maxMemory?: number
nodeOptions?: string nodeOptions?: string
} }
/** /**
* Provides a way to wrap a process for the purpose of updating the running * Parent process wrapper that spawns the child process and performs a handshake
* instance. * with it. Will relaunch the child if it receives a SIGUSR1 or is asked to by
* the child. If the child otherwise exits the parent will also exit.
*/ */
export class WrapperProcess { export class ParentProcess extends Process {
private process?: cp.ChildProcess public logger = logger.named(`parent:${process.pid}`)
private child?: cp.ChildProcess
private started?: Promise<void> private started?: Promise<void>
private readonly logStdoutStream: rfs.RotatingFileStream private readonly logStdoutStream: rfs.RotatingFileStream
private readonly logStderrStream: rfs.RotatingFileStream private readonly logStderrStream: rfs.RotatingFileStream
protected readonly _onChildMessage = new Emitter<Message>()
protected readonly onChildMessage = this._onChildMessage.event
public constructor(private currentVersion: string, private readonly options?: WrapperOptions) { public constructor(private currentVersion: string, private readonly options?: WrapperOptions) {
super()
const opts = { const opts = {
size: "10M", size: "10M",
maxFiles: 10, maxFiles: 10,
@ -165,19 +175,19 @@ export class WrapperProcess {
this.logStdoutStream = rfs.createStream(path.join(paths.data, "coder-logs", "code-server-stdout.log"), opts) this.logStdoutStream = rfs.createStream(path.join(paths.data, "coder-logs", "code-server-stdout.log"), opts)
this.logStderrStream = rfs.createStream(path.join(paths.data, "coder-logs", "code-server-stderr.log"), opts) this.logStderrStream = rfs.createStream(path.join(paths.data, "coder-logs", "code-server-stderr.log"), opts)
ipcMain.onDispose(() => { this.onDispose(() => {
this.disposeChild() this.disposeChild()
}) })
ipcMain.onMessage((message) => { this.onChildMessage((message) => {
switch (message.type) { switch (message.type) {
case "relaunch": case "relaunch":
logger.info(`Relaunching: ${this.currentVersion} -> ${message.version}`) this.logger.info(`Relaunching: ${this.currentVersion} -> ${message.version}`)
this.currentVersion = message.version this.currentVersion = message.version
this.relaunch() this.relaunch()
break break
default: default:
logger.error(`Unrecognized message ${message}`) this.logger.error(`Unrecognized message ${message}`)
break break
} }
}) })
@ -185,9 +195,9 @@ export class WrapperProcess {
private disposeChild(): void { private disposeChild(): void {
this.started = undefined this.started = undefined
if (this.process) { if (this.child) {
this.process.removeAllListeners() this.child.removeAllListeners()
this.process.kill() this.child.kill()
} }
} }
@ -196,16 +206,16 @@ export class WrapperProcess {
try { try {
await this.start() await this.start()
} catch (error) { } catch (error) {
logger.error(error.message) this.logger.error(error.message)
ipcMain.exit(typeof error.code === "number" ? error.code : 1) this.exit(typeof error.code === "number" ? error.code : 1)
} }
} }
public start(): Promise<void> { public start(): Promise<void> {
// If we have a process then we've already bound this. // If we have a process then we've already bound this.
if (!this.process) { if (!this.child) {
process.on("SIGUSR1", async () => { process.on("SIGUSR1", async () => {
logger.info("Received SIGUSR1; hotswapping") this.logger.info("Received SIGUSR1; hotswapping")
this.relaunch() this.relaunch()
}) })
} }
@ -217,7 +227,7 @@ export class WrapperProcess {
private async _start(): Promise<void> { private async _start(): Promise<void> {
const child = this.spawn() const child = this.spawn()
this.process = child this.child = child
// Log both to stdout and to the log directory. // Log both to stdout and to the log directory.
if (child.stdout) { if (child.stdout) {
@ -229,13 +239,13 @@ export class WrapperProcess {
child.stderr.pipe(process.stderr) child.stderr.pipe(process.stderr)
} }
logger.debug(`spawned inner process ${child.pid}`) this.logger.debug(`spawned inner process ${child.pid}`)
await ipcMain.handshake(child) await this.handshake(child)
child.once("exit", (code) => { child.once("exit", (code) => {
logger.debug(`inner process ${child.pid} exited unexpectedly`) this.logger.debug(`inner process ${child.pid} exited unexpectedly`)
ipcMain.exit(code || 0) this.exit(code || 0)
}) })
} }
@ -256,18 +266,52 @@ export class WrapperProcess {
stdio: ["ipc"], stdio: ["ipc"],
}) })
} }
/**
* Wait for a handshake from the child then reply.
*/
private handshake(child: cp.ChildProcess): Promise<void> {
return new Promise((resolve, reject) => {
const onMessage = (message: Message): void => {
logger.debug(`received message from ${child.pid}`, field("message", message))
if (message.type === "handshake") {
child.removeListener("message", onMessage)
child.on("message", (msg) => this._onChildMessage.emit(msg))
child.send({ type: "handshake" })
resolve()
}
}
child.on("message", onMessage)
child.once("error", reject)
child.once("exit", (code) => {
reject(new ProcessError(`Unexpected exit with code ${code}`, code !== null ? code : undefined))
})
})
}
}
/**
* Process wrapper.
*/
export const wrapper =
typeof process.env.CODE_SERVER_PARENT_PID !== "undefined"
? new ChildProcess(parseInt(process.env.CODE_SERVER_PARENT_PID))
: new ParentProcess(require("../../package.json").version)
export function isChild(proc: ChildProcess | ParentProcess): proc is ChildProcess {
return proc instanceof ChildProcess
} }
// It's possible that the pipe has closed (for example if you run code-server // It's possible that the pipe has closed (for example if you run code-server
// --version | head -1). Assume that means we're done. // --version | head -1). Assume that means we're done.
if (!process.stdout.isTTY) { if (!process.stdout.isTTY) {
process.stdout.on("error", () => ipcMain.exit()) process.stdout.on("error", () => wrapper.exit())
} }
// Don't let uncaught exceptions crash the process. // Don't let uncaught exceptions crash the process.
process.on("uncaughtException", (error) => { process.on("uncaughtException", (error) => {
logger.error(`Uncaught exception: ${error.message}`) wrapper.logger.error(`Uncaught exception: ${error.message}`)
if (typeof error.stack !== "undefined") { if (typeof error.stack !== "undefined") {
logger.error(error.stack) wrapper.logger.error(error.stack)
} }
}) })