Make everything use active evals (#30)

* Add trace log level

* Use active eval to implement spdlog

* Split server/client active eval interfaces

Since all properties are *not* valid on both sides

* +200% fire resistance

* Implement exec using active evaluations

* Fully implement child process streams

* Watch impl, move child_process back to explicitly adding events

Automatically forwarding all events might be the right move, but wanna
think/discuss it a bit more because it didn't come out very cleanly.

* Would you like some args with that callback?

* Implement the rest of child_process using active evals

* Rampant memory leaks

Emit "kill" to active evaluations when client disconnects in order to
kill processes. Most likely won't be the final solution.

* Resolve some minor issues with output panel

* Implement node-pty with active evals

* Provide clearTimeout to vm sandbox

* Implement socket with active evals

* Extract some callback logic

Also remove some eval interfaces, need to re-think those.

* Implement net.Server and remainder of net.Socket using active evals

* Implement dispose for active evaluations

* Use trace for express requests

* Handle sending buffers through evaluation events

* Make event logging a bit more clear

* Fix some errors due to us not actually instantiating until connect/listen

* is this a commit message?

* We can just create the evaluator in the ctor

Not sure what I was thinking.

* memory leak for you, memory leak for everyone

* it's a ternary now

* Don't dispose automatically on close or error

The code may or may not be disposable at that point.

* Handle parsing buffers on the client side as well

* Remove unused protobuf

* Remove TypedValue

* Remove unused forkProvider and test

* Improve dispose pattern for active evals

* Socket calls close after error; no need to bind both

* Improve comment

* Comment is no longer wishy washy due to explicit boolean

* Simplify check for sendHandle and options

* Replace _require with __non_webpack_require__

Webpack will then replace this with `require` which we then provide to
the vm sandbox.

* Provide path.parse

* Prevent original-fs from loading

* Start with a pid of -1

vscode immediately checks the PID to see if the debug process launch
correctly, but of course we don't get the pid synchronously.

* Pass arguments to bootstrap-fork

* Fully implement streams

Was causing errors because internally the stream would set this.writing
to true and it would never become false, so subsequent messages would
never send.

* Fix serializing errors and streams emitting errors multiple times

* Was emitting close to data

* Fix missing path for spawned processes

* Move evaluation onDispose call

Now it's accurate and runs when the active evaluation has actually
disposed.

* Fix promisifying fs.exists

* Fix some active eval callback issues

* Patch existsSync in debug adapter
This commit is contained in:
Asher
2019-02-19 10:17:03 -06:00
committed by GitHub
parent 73762017c8
commit 4a80bcb42c
39 changed files with 1694 additions and 8731 deletions

View File

@@ -1,349 +0,0 @@
import * as cp from "child_process";
import * as net from "net";
import * as stream from "stream";
import { TextEncoder } from "text-encoding";
import { Logger, logger, field } from "@coder/logger";
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, IdentifySessionMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage, NewServerMessage, ServerEstablishedMessage, NewServerFailureMessage, ServerCloseMessage, ServerConnectionEstablishedMessage } from "../proto";
import { SendableConnection } from "../common/connection";
import { ServerOptions } from "./server";
export interface Process {
stdio?: Array<stream.Readable | stream.Writable>;
stdin?: stream.Writable;
stdout?: stream.Readable;
stderr?: stream.Readable;
send?: (message: string) => void;
pid: number;
killed?: boolean;
on(event: "data" | "message", cb: (data: string) => void): void;
on(event: "exit", listener: (exitCode: number, signal?: number) => void): void;
write(data: string | Uint8Array): void;
resize?(cols: number, rows: number): void;
kill(signal?: string): void;
title?: number;
}
export const handleNewSession = (connection: SendableConnection, newSession: NewSessionMessage, serverOptions: ServerOptions | undefined, onExit: () => void): Process => {
const childLogger = getChildLogger(newSession.getCommand());
childLogger.debug(() => [
newSession.getIsFork() ? "Forking" : "Spawning",
field("command", newSession.getCommand()),
field("args", newSession.getArgsList()),
field("env", newSession.getEnvMap().toObject()),
]);
let process: Process;
let processTitle: string | undefined;
const env: { [key: string]: string } = {};
newSession.getEnvMap().forEach((value, key) => {
env[key] = value;
});
if (newSession.getTtyDimensions()) {
// Spawn with node-pty
const nodePty = require("node-pty") as typeof import("node-pty");
const ptyProc = nodePty.spawn(newSession.getCommand(), newSession.getArgsList(), {
cols: newSession.getTtyDimensions()!.getWidth(),
rows: newSession.getTtyDimensions()!.getHeight(),
cwd: newSession.getCwd(),
env,
});
const timer = setInterval(() => {
if (ptyProc.process !== processTitle) {
processTitle = ptyProc.process;
const id = new IdentifySessionMessage();
id.setId(newSession.getId());
id.setTitle(processTitle!);
const sm = new ServerMessage();
sm.setIdentifySession(id);
connection.send(sm.serializeBinary());
}
}, 200);
ptyProc.on("exit", () => {
clearTimeout(timer);
});
process = ptyProc;
processTitle = ptyProc.process;
} else {
const options = {
cwd: newSession.getCwd(),
env,
};
let proc: cp.ChildProcess;
if (newSession.getIsFork()) {
if (!serverOptions) {
throw new Error("No forkProvider set for bootstrap-fork request");
}
if (!serverOptions.forkProvider) {
throw new Error("No forkProvider set for server options");
}
proc = serverOptions.forkProvider(newSession);
} else {
proc = cp.spawn(newSession.getCommand(), newSession.getArgsList(), options);
}
process = {
stdin: proc.stdin,
stderr: proc.stderr,
stdout: proc.stdout,
stdio: proc.stdio,
send: (message): void => {
proc.send(message);
},
on: (...args: any[]): void => ((proc as any).on)(...args), // tslint:disable-line no-any
write: (d): boolean => proc.stdin.write(d),
kill: (s): void => proc.kill(s || "SIGTERM"),
pid: proc.pid,
};
}
const sendOutput = (_source: SessionOutputMessage.Source, msg: string | Uint8Array): void => {
childLogger.debug(() => {
let data = msg.toString();
if (_source === SessionOutputMessage.Source.IPC) {
// data = Buffer.from(msg.toString(), "base64").toString();
}
return [
_source === SessionOutputMessage.Source.STDOUT
? "stdout"
: (_source === SessionOutputMessage.Source.STDERR ? "stderr" : "ipc"),
field("id", newSession.getId()),
field("data", data),
];
});
const serverMsg = new ServerMessage();
const d = new SessionOutputMessage();
d.setId(newSession.getId());
d.setData(typeof msg === "string" ? new TextEncoder().encode(msg) : msg);
d.setSource(_source);
serverMsg.setSessionOutput(d);
connection.send(serverMsg.serializeBinary());
};
if (process.stdout && process.stderr) {
process.stdout.on("data", (data) => {
sendOutput(SessionOutputMessage.Source.STDOUT, data);
});
process.stderr.on("data", (data) => {
sendOutput(SessionOutputMessage.Source.STDERR, data);
});
} else {
process.on("data", (data) => {
sendOutput(SessionOutputMessage.Source.STDOUT, Buffer.from(data));
});
}
// IPC.
if (process.send) {
process.on("message", (data) => {
sendOutput(SessionOutputMessage.Source.IPC, JSON.stringify(data));
});
}
const id = new IdentifySessionMessage();
id.setId(newSession.getId());
id.setPid(process.pid);
if (processTitle) {
id.setTitle(processTitle);
}
const sm = new ServerMessage();
sm.setIdentifySession(id);
connection.send(sm.serializeBinary());
process.on("exit", (code) => {
childLogger.debug(() => [
"Exited",
field("id", newSession.getId()),
field("command", newSession.getCommand()),
field("args", newSession.getArgsList()),
]);
const serverMsg = new ServerMessage();
const exit = new SessionDoneMessage();
exit.setId(newSession.getId());
exit.setExitStatus(code);
serverMsg.setSessionDone(exit);
connection.send(serverMsg.serializeBinary());
onExit();
});
return process;
};
export const handleNewConnection = (connection: SendableConnection, newConnection: NewConnectionMessage, onExit: () => void): net.Socket => {
const target = newConnection.getPath() || `${newConnection.getPort()}`;
const childLogger = getChildLogger(target, ">");
const id = newConnection.getId();
let socket: net.Socket;
let didConnect = false;
const connectCallback = (): void => {
childLogger.debug("Connected", field("id", newConnection.getId()), field("target", target));
didConnect = true;
const estab = new ConnectionEstablishedMessage();
estab.setId(id);
const servMsg = new ServerMessage();
servMsg.setConnectionEstablished(estab);
connection.send(servMsg.serializeBinary());
};
if (newConnection.getPath()) {
socket = net.createConnection(newConnection.getPath(), connectCallback);
} else if (newConnection.getPort()) {
socket = net.createConnection(newConnection.getPort(), undefined, connectCallback);
} else {
throw new Error("No path or port provided for new connection");
}
socket.addListener("error", (err) => {
childLogger.debug("Error", field("id", newConnection.getId()), field("error", err));
if (!didConnect) {
const errMsg = new NewConnectionFailureMessage();
errMsg.setId(id);
errMsg.setMessage(err.message);
const servMsg = new ServerMessage();
servMsg.setConnectionFailure(errMsg);
connection.send(servMsg.serializeBinary());
onExit();
}
});
socket.addListener("close", () => {
childLogger.debug("Closed", field("id", newConnection.getId()));
if (didConnect) {
const closed = new ConnectionCloseMessage();
closed.setId(id);
const servMsg = new ServerMessage();
servMsg.setConnectionClose(closed);
connection.send(servMsg.serializeBinary());
onExit();
}
});
socket.addListener("data", (data) => {
childLogger.debug(() => [
"ipc",
field("id", newConnection.getId()),
field("data", data),
]);
const dataMsg = new ConnectionOutputMessage();
dataMsg.setId(id);
dataMsg.setData(data);
const servMsg = new ServerMessage();
servMsg.setConnectionOutput(dataMsg);
connection.send(servMsg.serializeBinary());
});
return socket;
};
export const handleNewServer = (connection: SendableConnection, newServer: NewServerMessage, addSocket: (socket: net.Socket) => number, onExit: () => void, onSocketExit: (id: number) => void): net.Server => {
const target = newServer.getPath() || `${newServer.getPort()}`;
const childLogger = getChildLogger(target, "|");
const s = net.createServer();
try {
s.listen(newServer.getPath() ? newServer.getPath() : newServer.getPort(), () => {
childLogger.debug("Listening", field("id", newServer.getId()), field("target", target));
const se = new ServerEstablishedMessage();
se.setId(newServer.getId());
const sm = new ServerMessage();
sm.setServerEstablished(se);
connection.send(sm.serializeBinary());
});
} catch (ex) {
childLogger.debug("Failed to listen", field("id", newServer.getId()), field("target", target));
const sf = new NewServerFailureMessage();
sf.setId(newServer.getId());
const sm = new ServerMessage();
sm.setServerFailure(sf);
connection.send(sm.serializeBinary());
onExit();
}
s.on("close", () => {
childLogger.debug("Stopped listening", field("id", newServer.getId()), field("target", target));
const sc = new ServerCloseMessage();
sc.setId(newServer.getId());
const sm = new ServerMessage();
sm.setServerClose(sc);
connection.send(sm.serializeBinary());
onExit();
});
s.on("connection", (socket) => {
const socketId = addSocket(socket);
childLogger.debug("Got connection", field("id", newServer.getId()), field("socketId", socketId));
const sock = new ServerConnectionEstablishedMessage();
sock.setServerId(newServer.getId());
sock.setConnectionId(socketId);
const sm = new ServerMessage();
sm.setServerConnectionEstablished(sock);
connection.send(sm.serializeBinary());
socket.addListener("data", (data) => {
childLogger.debug(() => [
"ipc",
field("id", newServer.getId()),
field("socketId", socketId),
field("data", data),
]);
const dataMsg = new ConnectionOutputMessage();
dataMsg.setId(socketId);
dataMsg.setData(data);
const servMsg = new ServerMessage();
servMsg.setConnectionOutput(dataMsg);
connection.send(servMsg.serializeBinary());
});
socket.on("error", (error) => {
childLogger.debug("Error", field("id", newServer.getId()), field("socketId", socketId), field("error", error));
onSocketExit(socketId);
});
socket.on("close", () => {
childLogger.debug("Closed", field("id", newServer.getId()), field("socketId", socketId));
onSocketExit(socketId);
});
});
return s;
};
const getChildLogger = (command: string, prefix: string = ""): Logger => {
// TODO: Temporary, for debugging. Should probably ask for a name?
let name: string;
if (command.includes("vscode-ipc") || command.includes("extensionHost")) {
name = "exthost";
} else if (command.includes("vscode-remote")) {
name = "shared";
} else {
const basename = command.split("/").pop()!;
let i = 0;
for (; i < basename.length; i++) {
const character = basename.charAt(i);
if (isNaN(+character) && character === character.toUpperCase()) {
break;
}
}
name = basename.substring(0, i);
}
return logger.named(prefix + name);
};

View File

@@ -1,10 +1,13 @@
import * as vm from "vm";
import { NewEvalMessage, TypedValue, EvalFailedMessage, EvalDoneMessage, ServerMessage, EvalEventMessage } from "../proto";
import { SendableConnection } from "../common/connection";
import { EventEmitter } from "events";
import * as vm from "vm";
import { logger, field } from "@coder/logger";
import { NewEvalMessage, EvalFailedMessage, EvalDoneMessage, ServerMessage, EvalEventMessage } from "../proto";
import { SendableConnection } from "../common/connection";
import { stringify, parse } from "../common/util";
export interface ActiveEvaluation {
onEvent(msg: EvalEventMessage): void;
dispose(): void;
}
declare var __non_webpack_require__: typeof require;
@@ -13,96 +16,117 @@ export const evaluate = (connection: SendableConnection, message: NewEvalMessage
message.getArgsList().forEach((value) => {
argStr.push(value);
});
/**
* Send the response and call onDispose.
*/
// tslint:disable-next-line no-any
const sendResp = (resp: any): void => {
const evalDone = new EvalDoneMessage();
evalDone.setId(message.getId());
const tof = typeof resp;
if (tof !== "undefined") {
const tv = new TypedValue();
let t: TypedValue.Type;
switch (tof) {
case "string":
t = TypedValue.Type.STRING;
break;
case "boolean":
t = TypedValue.Type.BOOLEAN;
break;
case "object":
t = TypedValue.Type.OBJECT;
break;
case "number":
t = TypedValue.Type.NUMBER;
break;
default:
return sendErr(EvalFailedMessage.Reason.EXCEPTION, `unsupported response type ${tof}`);
}
tv.setValue(tof === "string" ? resp : JSON.stringify(resp));
tv.setType(t);
evalDone.setResponse(tv);
}
evalDone.setResponse(stringify(resp));
const serverMsg = new ServerMessage();
serverMsg.setEvalDone(evalDone);
connection.send(serverMsg.serializeBinary());
onDispose();
};
const sendErr = (reason: EvalFailedMessage.Reason, msg: string): void => {
/**
* Send an exception and call onDispose.
*/
const sendException = (error: Error): void => {
const evalFailed = new EvalFailedMessage();
evalFailed.setId(message.getId());
evalFailed.setReason(reason);
evalFailed.setMessage(msg);
evalFailed.setReason(EvalFailedMessage.Reason.EXCEPTION);
evalFailed.setMessage(error.toString() + " " + error.stack);
const serverMsg = new ServerMessage();
serverMsg.setEvalFailed(evalFailed);
connection.send(serverMsg.serializeBinary());
};
let eventEmitter: EventEmitter | undefined;
try {
if (message.getActive()) {
eventEmitter = new EventEmitter();
}
const value = vm.runInNewContext(`(${message.getFunction()})(${eventEmitter ? `eventEmitter, ` : ""}${argStr.join(",")})`, {
eventEmitter: eventEmitter ? {
on: (event: string, cb: (...args: any[]) => void): void => {
eventEmitter!.on(event, cb);
},
emit: (event: string, ...args: any[]): void => {
const eventMsg = new EvalEventMessage();
eventMsg.setEvent(event);
eventMsg.setArgsList(args.filter(a => a).map(a => JSON.stringify(a)));
eventMsg.setId(message.getId());
const serverMsg = new ServerMessage();
serverMsg.setEvalEvent(eventMsg);
connection.send(serverMsg.serializeBinary());
},
} : undefined,
_Buffer: Buffer,
require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require,
_require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require,
setTimeout,
}, {
onDispose();
};
let eventEmitter = message.getActive() ? new EventEmitter(): undefined;
const sandbox = {
eventEmitter: eventEmitter ? {
// tslint:disable no-any
on: (event: string, cb: (...args: any[]) => void): void => {
eventEmitter!.on(event, (...args: any[]) => {
logger.trace(() => [
`${event}`,
field("id", message.getId()),
field("args", args.map(stringify)),
]);
cb(...args);
});
},
emit: (event: string, ...args: any[]): void => {
logger.trace(() => [
`emit ${event}`,
field("id", message.getId()),
field("args", args.map(stringify)),
]);
const eventMsg = new EvalEventMessage();
eventMsg.setEvent(event);
eventMsg.setArgsList(args.map(stringify));
eventMsg.setId(message.getId());
const serverMsg = new ServerMessage();
serverMsg.setEvalEvent(eventMsg);
connection.send(serverMsg.serializeBinary());
},
// tslint:enable no-any
} : undefined,
_Buffer: Buffer,
require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require,
setTimeout,
setInterval,
clearTimeout,
process: {
env: process.env,
},
};
let value: any; // tslint:disable-line no-any
try {
const code = `(${message.getFunction()})(${eventEmitter ? "eventEmitter, " : ""}${argStr.join(",")});`;
value = vm.runInNewContext(code, sandbox, {
// If the code takes longer than this to return, it is killed and throws.
timeout: message.getTimeout() || 15000,
});
if (eventEmitter) {
// Is an active evaluation and should NOT be ended
eventEmitter.on("close", () => onDispose());
eventEmitter.on("error", () => onDispose());
} else {
if ((value as Promise<void>).then) {
// Is promise
(value as Promise<void>).then(r => sendResp(r)).catch(ex => sendErr(EvalFailedMessage.Reason.EXCEPTION, ex.toString()));
} else {
sendResp(value);
}
onDispose();
}
} catch (ex) {
sendErr(EvalFailedMessage.Reason.EXCEPTION, ex.toString() + " " + ex.stack);
sendException(ex);
}
// An evaluation completes when the value it returns resolves. An active
// evaluation completes when it is disposed. Active evaluations are required
// to return disposers so we can know both when it has ended (so we can clean
// up on our end) and how to force end it (for example when the client
// disconnects).
// tslint:disable-next-line no-any
const promise = !eventEmitter ? value as Promise<any> : new Promise((resolve): void => {
value.onDidDispose(resolve);
});
if (promise && promise.then) {
promise.then(sendResp).catch(sendException);
} else {
sendResp(value);
}
return eventEmitter ? {
onEvent: (eventMsg: EvalEventMessage): void => {
eventEmitter!.emit(eventMsg.getEvent(), ...eventMsg.getArgsList().map(a => JSON.parse(a)));
eventEmitter!.emit(eventMsg.getEvent(), ...eventMsg.getArgsList().map(parse));
},
dispose: (): void => {
if (eventEmitter) {
if (value && value.dispose) {
value.dispose();
}
eventEmitter.removeAllListeners();
eventEmitter = undefined;
}
},
} : undefined;
};

View File

@@ -1,32 +1,21 @@
import * as os from "os";
import * as cp from "child_process";
import * as path from "path";
import { mkdir } from "fs";
import { promisify } from "util";
import { TextDecoder } from "text-encoding";
import { logger, field } from "@coder/logger";
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto";
import { ClientMessage, WorkingInitMessage, ServerMessage } from "../proto";
import { evaluate, ActiveEvaluation } from "./evaluate";
import { ReadWriteConnection } from "../common/connection";
import { Process, handleNewSession, handleNewConnection, handleNewServer } from "./command";
import * as net from "net";
export interface ServerOptions {
readonly workingDirectory: string;
readonly dataDirectory: string;
readonly builtInExtensionsDirectory: string;
forkProvider?(message: NewSessionMessage): cp.ChildProcess;
}
export class Server {
private readonly sessions = new Map<number, Process>();
private readonly connections = new Map<number, net.Socket>();
private readonly servers = new Map<number, net.Server>();
private readonly evals = new Map<number, ActiveEvaluation>();
private connectionId = Number.MAX_SAFE_INTEGER;
public constructor(
private readonly connection: ReadWriteConnection,
private readonly options?: ServerOptions,
@@ -42,18 +31,10 @@ export class Server {
}
});
connection.onClose(() => {
this.sessions.forEach((s) => {
s.kill();
});
this.connections.forEach((c) => {
c.destroy();
});
this.servers.forEach((s) => {
s.close();
});
this.evals.forEach((e) => e.dispose());
});
if (!options) {
if (!this.options) {
logger.warn("No server options provided. InitMessage will not be sent.");
return;
@@ -74,16 +55,16 @@ export class Server {
}
}
};
Promise.all([ mkdirP(path.join(options.dataDirectory, "User", "workspaceStorage")) ]).then(() => {
Promise.all([ mkdirP(path.join(this.options.dataDirectory, "User", "workspaceStorage")) ]).then(() => {
logger.info("Created data directory");
}).catch((error) => {
logger.error(error.message, field("error", error));
});
const initMsg = new WorkingInitMessage();
initMsg.setDataDirectory(options.dataDirectory);
initMsg.setWorkingDirectory(options.workingDirectory);
initMsg.setBuiltinExtensionsDir(options.builtInExtensionsDirectory);
initMsg.setDataDirectory(this.options.dataDirectory);
initMsg.setWorkingDirectory(this.options.workingDirectory);
initMsg.setBuiltinExtensionsDir(this.options.builtInExtensionsDirectory);
initMsg.setHomeDirectory(os.homedir());
initMsg.setTmpDirectory(os.tmpdir());
const platform = os.platform();
@@ -113,7 +94,7 @@ export class Server {
private handleMessage(message: ClientMessage): void {
if (message.hasNewEval()) {
const evalMessage = message.getNewEval()!;
logger.debug(() => [
logger.trace(() => [
"EvalMessage",
field("id", evalMessage.getId()),
field("args", evalMessage.getArgsList()),
@@ -121,132 +102,22 @@ export class Server {
]);
const resp = evaluate(this.connection, evalMessage, () => {
this.evals.delete(evalMessage.getId());
logger.trace(() => [
`dispose ${evalMessage.getId()}, ${this.evals.size} left`,
]);
});
if (resp) {
this.evals.set(evalMessage.getId(), resp);
}
} else if (message.hasEvalEvent()) {
const evalEventMessage = message.getEvalEvent()!;
logger.debug("EvalEventMessage", field("id", evalEventMessage.getId()));
const e = this.evals.get(evalEventMessage.getId());
if (!e) {
return;
}
e.onEvent(evalEventMessage);
} else if (message.hasNewSession()) {
const sessionMessage = message.getNewSession()!;
logger.debug("NewSession", field("id", sessionMessage.getId()));
const session = handleNewSession(this.connection, sessionMessage, this.options, () => {
this.sessions.delete(sessionMessage.getId());
});
this.sessions.set(sessionMessage.getId(), session);
} else if (message.hasCloseSessionInput()) {
const closeSessionMessage = message.getCloseSessionInput()!;
logger.debug("CloseSessionInput", field("id", closeSessionMessage.getId()));
const s = this.getSession(closeSessionMessage.getId());
if (!s || !s.stdin) {
return;
}
s.stdin.end();
} else if (message.hasResizeSessionTty()) {
const resizeSessionTtyMessage = message.getResizeSessionTty()!;
logger.debug("ResizeSessionTty", field("id", resizeSessionTtyMessage.getId()));
const s = this.getSession(resizeSessionTtyMessage.getId());
if (!s || !s.resize) {
return;
}
const tty = resizeSessionTtyMessage.getTtyDimensions()!;
s.resize(tty.getWidth(), tty.getHeight());
} else if (message.hasShutdownSession()) {
const shutdownSessionMessage = message.getShutdownSession()!;
logger.debug("ShutdownSession", field("id", shutdownSessionMessage.getId()));
const s = this.getSession(shutdownSessionMessage.getId());
if (!s) {
return;
}
s.kill(shutdownSessionMessage.getSignal());
} else if (message.hasWriteToSession()) {
const writeToSessionMessage = message.getWriteToSession()!;
logger.debug("WriteToSession", field("id", writeToSessionMessage.getId()));
const s = this.getSession(writeToSessionMessage.getId());
if (!s) {
return;
}
const data = new TextDecoder().decode(writeToSessionMessage.getData_asU8());
const source = writeToSessionMessage.getSource();
if (source === WriteToSessionMessage.Source.IPC) {
if (!s.send) {
throw new Error("Cannot send message via IPC to process without IPC");
}
s.send(JSON.parse(data));
} else {
s.write(data);
}
} else if (message.hasNewConnection()) {
const connectionMessage = message.getNewConnection()!;
logger.debug("NewConnection", field("id", connectionMessage.getId()));
if (this.connections.has(connectionMessage.getId())) {
throw new Error(`connect EISCONN ${connectionMessage.getPath() || connectionMessage.getPort()}`);
}
const socket = handleNewConnection(this.connection, connectionMessage, () => {
this.connections.delete(connectionMessage.getId());
});
this.connections.set(connectionMessage.getId(), socket);
} else if (message.hasConnectionOutput()) {
const connectionOutputMessage = message.getConnectionOutput()!;
logger.debug("ConnectionOuput", field("id", connectionOutputMessage.getId()));
const c = this.getConnection(connectionOutputMessage.getId());
if (!c) {
return;
}
c.write(Buffer.from(connectionOutputMessage.getData_asU8()));
} else if (message.hasConnectionClose()) {
const connectionCloseMessage = message.getConnectionClose()!;
logger.debug("ConnectionClose", field("id", connectionCloseMessage.getId()));
const c = this.getConnection(connectionCloseMessage.getId());
if (!c) {
return;
}
c.end();
} else if (message.hasNewServer()) {
const serverMessage = message.getNewServer()!;
logger.debug("NewServer", field("id", serverMessage.getId()));
if (this.servers.has(serverMessage.getId())) {
throw new Error("multiple listeners not supported");
}
const s = handleNewServer(this.connection, serverMessage, (socket) => {
const id = this.connectionId--;
this.connections.set(id, socket);
return id;
}, () => {
this.connections.delete(serverMessage.getId());
}, (id) => {
this.connections.delete(id);
});
this.servers.set(serverMessage.getId(), s);
} else if (message.hasServerClose()) {
const serverCloseMessage = message.getServerClose()!;
logger.debug("ServerClose", field("id", serverCloseMessage.getId()));
const s = this.getServer(serverCloseMessage.getId());
if (!s) {
return;
}
s.close();
} else {
logger.debug("Received unknown message type");
throw new Error("unknown message type");
}
}
private getServer(id: number): net.Server | undefined {
return this.servers.get(id);
}
private getConnection(id: number): net.Socket | undefined {
return this.connections.get(id);
}
private getSession(id: number): Process | undefined {
return this.sessions.get(id);
}
}