Convert fully to protobuf (was partially JSON) (#402)

* Convert fully to protobuf (was partially JSON)

* Handle all floating promises

* Remove stringified proto from trace logging

It wasn't proving to be very useful.
This commit is contained in:
Asher
2019-04-02 17:44:28 -05:00
committed by Kyle Carberry
parent f484781693
commit 3a672d725a
31 changed files with 5788 additions and 3277 deletions

View File

@@ -3,6 +3,8 @@ import { ServerProxy } from "../../common/proxy";
import { preserveEnv } from "../../common/util";
import { WritableProxy, ReadableProxy } from "./stream";
// tslint:disable completed-docs
export type ForkProvider = (modulePath: string, args?: string[], options?: cp.ForkOptions) => cp.ChildProcess;
export class ChildProcessProxy implements ServerProxy {
@@ -26,7 +28,7 @@ export class ChildProcessProxy implements ServerProxy {
// tslint:disable-next-line no-any
public async send(message: any): Promise<void> {
return new Promise((resolve, reject) => {
return new Promise((resolve, reject): void => {
this.process.send(message, (error) => {
if (error) {
reject(error);
@@ -46,8 +48,8 @@ export class ChildProcessProxy implements ServerProxy {
}
public async dispose(): Promise<void> {
this.kill();
setTimeout(() => this.kill("SIGKILL"), 5000); // Double tap.
this.process.kill();
setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap.
}
// tslint:disable-next-line no-any
@@ -62,9 +64,9 @@ export class ChildProcessProxy implements ServerProxy {
export interface ChildProcessProxies {
childProcess: ChildProcessProxy;
stdin?: WritableProxy;
stdout?: ReadableProxy;
stderr?: ReadableProxy;
stdin?: WritableProxy | null;
stdout?: ReadableProxy | null;
stderr?: ReadableProxy | null;
}
export class ChildProcessModuleProxy {

View File

@@ -4,6 +4,8 @@ import { ServerProxy } from "../../common/proxy";
import { IEncodingOptions } from "../../common/util";
import { WritableProxy } from "./stream";
// tslint:disable completed-docs
/**
* A serializable version of fs.Stats.
*/
@@ -41,13 +43,13 @@ export class WriteStreamProxy extends WritableProxy<fs.WriteStream> {
}
public async dispose(): Promise<void> {
super.dispose();
await super.dispose();
this.stream.close();
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
super.onEvent(cb);
await super.onEvent(cb);
this.stream.on("open", (fd) => cb("open", fd));
}
}
@@ -109,7 +111,7 @@ export class FsModuleProxy {
}
public exists(path: fs.PathLike): Promise<boolean> {
return promisify(fs.exists)(path);
return promisify(fs.exists)(path); // tslint:disable-line deprecation
}
public fchmod(fd: number, mode: string | number): Promise<void> {
@@ -173,7 +175,7 @@ export class FsModuleProxy {
}
public read(fd: number, length: number, position: number | null): Promise<{ bytesRead: number, buffer: Buffer }> {
const buffer = new Buffer(length);
const buffer = Buffer.alloc(length);
return promisify(fs.read)(fd, buffer, 0, length, position);
}

View File

@@ -2,6 +2,8 @@ import * as net from "net";
import { ServerProxy } from "../../common/proxy";
import { DuplexProxy } from "./stream";
// tslint:disable completed-docs
export class NetSocketProxy extends DuplexProxy<net.Socket> {
public async connect(options: number | string | net.SocketConnectOpts, host?: string): Promise<void> {
this.stream.connect(options as any, host as any); // tslint:disable-line no-any this works fine
@@ -28,7 +30,7 @@ export class NetSocketProxy extends DuplexProxy<net.Socket> {
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
super.onEvent(cb);
await super.onEvent(cb);
this.stream.on("connect", () => cb("connect"));
this.stream.on("lookup", (error, address, family, host) => cb("lookup", error, address, family, host));
this.stream.on("timeout", () => cb("timeout"));

View File

@@ -4,6 +4,8 @@ import * as pty from "node-pty";
import { ServerProxy } from "../../common/proxy";
import { preserveEnv } from "../../common/util";
// tslint:disable completed-docs
/**
* Server-side IPty proxy.
*/
@@ -22,7 +24,7 @@ export class NodePtyProcessProxy implements ServerProxy {
}
}, 200);
this.onDone(() => clearInterval(timer));
this.process.on("exit", () => clearInterval(timer));
}
public async getPid(): Promise<number> {
@@ -50,8 +52,8 @@ export class NodePtyProcessProxy implements ServerProxy {
}
public async dispose(): Promise<void> {
this.kill();
setTimeout(() => this.kill("SIGKILL"), 5000); // Double tap.
this.process.kill();
setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap.
this.emitter.removeAllListeners();
}

View File

@@ -3,6 +3,8 @@ import { EventEmitter } from "events";
import * as spdlog from "spdlog";
import { ServerProxy } from "../../common/proxy";
// tslint:disable completed-docs
export class RotatingLoggerProxy implements ServerProxy {
private readonly emitter = new EventEmitter();
@@ -24,7 +26,7 @@ export class RotatingLoggerProxy implements ServerProxy {
}
public async dispose(): Promise<void> {
this.flush();
await this.flush();
this.emitter.emit("dispose");
this.emitter.removeAllListeners();
}

View File

@@ -1,6 +1,8 @@
import * as stream from "stream";
import { ServerProxy } from "../../common/proxy";
// tslint:disable completed-docs
export class WritableProxy<T extends stream.Writable = stream.Writable> implements ServerProxy {
public constructor(protected readonly stream: T) {}
@@ -100,7 +102,7 @@ export class DuplexProxy<T extends stream.Duplex = stream.Duplex> extends Writab
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
super.onEvent(cb);
await super.onEvent(cb);
this.stream.on("data", (chunk) => cb("data", chunk));
this.stream.on("end", () => cb("end"));
}

View File

@@ -1,5 +1,7 @@
import * as trash from "trash";
// tslint:disable completed-docs
export class TrashModuleProxy {
public async trash(path: string, options?: trash.Options): Promise<void> {
return trash(path, options);

View File

@@ -3,8 +3,8 @@ import * as os from "os";
import { field, logger} from "@coder/logger";
import { ReadWriteConnection } from "../common/connection";
import { Module, ServerProxy } from "../common/proxy";
import { isPromise, isProxy, moduleToProto, parse, platformToProto, protoToModule, stringify } from "../common/util";
import { CallbackMessage, ClientMessage, EventMessage, FailMessage, MethodMessage, NamedCallbackMessage, NamedEventMessage, NumberedCallbackMessage, NumberedEventMessage, Pong, ServerMessage, SuccessMessage, WorkingInitMessage } from "../proto";
import { isPromise, isProxy, moduleToProto, protoToArgument, platformToProto, protoToModule, argumentToProto } from "../common/util";
import { Argument, Callback, ClientMessage, Event, Method, Pong, ServerMessage, WorkingInit } from "../proto";
import { ChildProcessModuleProxy, ForkProvider, FsModuleProxy, NetModuleProxy, NodePtyModuleProxy, SpdlogModuleProxy, TrashModuleProxy } from "./modules";
// tslint:disable no-any
@@ -22,11 +22,14 @@ interface ProxyData {
instance: any;
}
/**
* Handle messages from the client.
*/
export class Server {
private proxyId = 0;
private readonly proxies = new Map<number | Module, ProxyData>();
private disconnected: boolean = false;
private responseTimeout = 10000;
private readonly responseTimeout = 10000;
public constructor(
private readonly connection: ReadWriteConnection,
@@ -57,7 +60,9 @@ export class Server {
this.proxies.forEach((proxy, proxyId) => {
if (isProxy(proxy.instance)) {
proxy.instance.dispose();
proxy.instance.dispose().catch((error) => {
logger.error(error.message);
});
}
this.removeProxy(proxyId);
});
@@ -84,14 +89,14 @@ export class Server {
logger.error(error.message, field("error", error));
});
const initMsg = new WorkingInitMessage();
const initMsg = new WorkingInit();
initMsg.setDataDirectory(this.options.dataDirectory);
initMsg.setWorkingDirectory(this.options.workingDirectory);
initMsg.setBuiltinExtensionsDir(this.options.builtInExtensionsDirectory);
initMsg.setHomeDirectory(os.homedir());
initMsg.setTmpDirectory(os.tmpdir());
initMsg.setOperatingSystem(platformToProto(os.platform()));
initMsg.setShell(os.userInfo().shell || global.process.env.SHELL);
initMsg.setShell(os.userInfo().shell || global.process.env.SHELL || "");
const srvMsg = new ServerMessage();
srvMsg.setInit(initMsg);
connection.send(srvMsg.serializeBinary());
@@ -101,29 +106,32 @@ export class Server {
* Handle all messages from the client.
*/
private async handleMessage(message: ClientMessage): Promise<void> {
if (message.hasMethod()) {
await this.runMethod(message.getMethod()!);
} else if (message.hasPing()) {
logger.trace("ping");
const srvMsg = new ServerMessage();
srvMsg.setPong(new Pong());
this.connection.send(srvMsg.serializeBinary());
} else {
throw new Error("unknown message type");
switch (message.getMsgCase()) {
case ClientMessage.MsgCase.METHOD:
await this.runMethod(message.getMethod()!);
break;
case ClientMessage.MsgCase.PING:
logger.trace("ping");
const srvMsg = new ServerMessage();
srvMsg.setPong(new Pong());
this.connection.send(srvMsg.serializeBinary());
break;
default:
throw new Error("unknown message type");
}
}
/**
* Run a method on a proxy.
*/
private async runMethod(message: MethodMessage): Promise<void> {
private async runMethod(message: Method): Promise<void> {
const proxyMessage = message.getNamedProxy()! || message.getNumberedProxy()!;
const id = proxyMessage.getId();
const proxyId = message.hasNamedProxy()
? protoToModule(message.getNamedProxy()!.getModule())
: message.getNumberedProxy()!.getProxyId();
const method = proxyMessage.getMethod();
const args = proxyMessage.getArgsList().map((a) => parse(
const args = proxyMessage.getArgsList().map((a) => protoToArgument(
a,
(id, args) => this.sendCallback(proxyId, id, args),
));
@@ -133,7 +141,6 @@ export class Server {
field("id", id),
field("proxyId", proxyId),
field("method", method),
field("args", proxyMessage.getArgsList()),
]);
let response: any;
@@ -153,7 +160,7 @@ export class Server {
// Proxies must always return promises.
if (!isPromise(response)) {
throw new Error('"${method}" must return a promise');
throw new Error(`"${method}" must return a promise`);
}
} catch (error) {
logger.error(
@@ -175,27 +182,25 @@ export class Server {
* Send a callback to the client.
*/
private sendCallback(proxyId: number | Module, callbackId: number, args: any[]): void {
const stringifiedArgs = args.map((a) => this.stringify(a));
logger.trace(() => [
"sending callback",
field("proxyId", proxyId),
field("callbackId", callbackId),
field("args", stringifiedArgs),
]);
const message = new CallbackMessage();
let callbackMessage: NamedCallbackMessage | NumberedCallbackMessage;
const message = new Callback();
let callbackMessage: Callback.Named | Callback.Numbered;
if (typeof proxyId === "string") {
callbackMessage = new NamedCallbackMessage();
callbackMessage = new Callback.Named();
callbackMessage.setModule(moduleToProto(proxyId));
message.setNamedCallback(callbackMessage);
} else {
callbackMessage = new NumberedCallbackMessage();
callbackMessage = new Callback.Numbered();
callbackMessage.setProxyId(proxyId);
message.setNumberedCallback(callbackMessage);
}
callbackMessage.setCallbackId(callbackId);
callbackMessage.setArgsList(stringifiedArgs);
callbackMessage.setArgsList(args.map((a) => this.argumentToProto(a)));
const serverMessage = new ServerMessage();
serverMessage.setCallback(message);
@@ -203,15 +208,23 @@ export class Server {
}
/**
* Store a proxy and bind events to send them back to the client.
* Store a numbered proxy and bind events to send them back to the client.
*/
private storeProxy(instance: ServerProxy): number;
/**
* Store a unique proxy and bind events to send them back to the client.
*/
private storeProxy(instance: any, moduleProxyId: Module): Module;
/**
* Store a proxy and bind events to send them back to the client.
*/
private storeProxy(instance: ServerProxy | any, moduleProxyId?: Module): number | Module {
// In case we disposed while waiting for a function to return.
if (this.disconnected) {
if (isProxy(instance)) {
instance.dispose();
instance.dispose().catch((error) => {
logger.error(error.message);
});
}
throw new Error("disposed");
@@ -226,16 +239,22 @@ export class Server {
this.proxies.set(proxyId, { instance });
if (isProxy(instance)) {
instance.onEvent((event, ...args) => this.sendEvent(proxyId, event, ...args));
instance.onEvent((event, ...args) => this.sendEvent(proxyId, event, ...args)).catch((error) => {
logger.error(error.message);
});
instance.onDone(() => {
// It might have finished because we disposed it due to a disconnect.
if (!this.disconnected) {
this.sendEvent(proxyId, "done");
this.getProxy(proxyId).disposeTimeout = setTimeout(() => {
instance.dispose();
instance.dispose().catch((error) => {
logger.error(error.message);
});
this.removeProxy(proxyId);
}, this.responseTimeout);
}
}).catch((error) => {
logger.error(error.message);
});
}
@@ -246,27 +265,25 @@ export class Server {
* Send an event to the client.
*/
private sendEvent(proxyId: number | Module, event: string, ...args: any[]): void {
const stringifiedArgs = args.map((a) => this.stringify(a));
logger.trace(() => [
"sending event",
field("proxyId", proxyId),
field("event", event),
field("args", stringifiedArgs),
]);
const message = new EventMessage();
let eventMessage: NamedEventMessage | NumberedEventMessage;
const message = new Event();
let eventMessage: Event.Named | Event.Numbered;
if (typeof proxyId === "string") {
eventMessage = new NamedEventMessage();
eventMessage = new Event.Named();
eventMessage.setModule(moduleToProto(proxyId));
message.setNamedEvent(eventMessage);
} else {
eventMessage = new NumberedEventMessage();
eventMessage = new Event.Numbered();
eventMessage.setProxyId(proxyId);
message.setNumberedEvent(eventMessage);
}
eventMessage.setEvent(event);
eventMessage.setArgsList(stringifiedArgs);
eventMessage.setArgsList(args.map((a) => this.argumentToProto(a)));
const serverMessage = new ServerMessage();
serverMessage.setEvent(message);
@@ -277,16 +294,14 @@ export class Server {
* Send a response back to the client.
*/
private sendResponse(id: number, response: any): void {
const stringifiedResponse = this.stringify(response);
logger.trace(() => [
"sending resolve",
field("id", id),
field("response", stringifiedResponse),
]);
const successMessage = new SuccessMessage();
const successMessage = new Method.Success();
successMessage.setId(id);
successMessage.setResponse(stringifiedResponse);
successMessage.setResponse(this.argumentToProto(response));
const serverMessage = new ServerMessage();
serverMessage.setSuccess(successMessage);
@@ -297,16 +312,14 @@ export class Server {
* Send an exception back to the client.
*/
private sendException(id: number, error: Error): void {
const stringifiedError = stringify(error);
logger.trace(() => [
"sending reject",
field("id", id) ,
field("response", stringifiedError),
]);
const failedMessage = new FailMessage();
const failedMessage = new Method.Fail();
failedMessage.setId(id);
failedMessage.setResponse(stringifiedError);
failedMessage.setResponse(argumentToProto(error));
const serverMessage = new ServerMessage();
serverMessage.setFail(failedMessage);
@@ -327,10 +340,16 @@ export class Server {
]);
}
private stringify(value: any): string {
return stringify(value, undefined, (p) => this.storeProxy(p));
/**
* Same as argumentToProto but provides storeProxy.
*/
private argumentToProto(value: any): Argument {
return argumentToProto(value, undefined, (p) => this.storeProxy(p));
}
/**
* Get a proxy. Error if it doesn't exist.
*/
private getProxy(proxyId: number | Module): ProxyData {
if (!this.proxies.has(proxyId)) {
throw new Error(`proxy ${proxyId} disposed too early`);