From c9f91e77cde12f6c83615a169871334951f92821 Mon Sep 17 00:00:00 2001 From: Asher Date: Wed, 24 Apr 2019 10:38:21 -0500 Subject: [PATCH] Fix coping and moving files around using the file tree (#568) * Implement write/read buffers in electron fill This makes cutting and copy files from the file tree work. * Implement fs.createReadStream This is used by the file tree to copy files. * Allow passing proxies back from client to server This makes things like piping streams possible. * Synchronously bind to proxy events This eliminates any chance whatsoever of missing events due to binding too late. * Make it possible to bind some events on demand * Add some protocol documentation --- packages/ide/src/fill/electron.ts | 14 +- packages/protocol/README.md | 47 ++++++ packages/protocol/src/browser/client.ts | 12 +- .../src/browser/modules/child_process.ts | 27 ++- packages/protocol/src/browser/modules/fs.ts | 45 ++++- packages/protocol/src/browser/modules/net.ts | 24 ++- .../protocol/src/browser/modules/node-pty.ts | 20 ++- .../protocol/src/browser/modules/spdlog.ts | 14 +- .../protocol/src/browser/modules/stream.ts | 37 ++-- .../protocol/src/browser/modules/trash.ts | 5 +- packages/protocol/src/common/proxy.ts | 159 ++++++++++++++---- packages/protocol/src/common/util.ts | 30 ++-- .../src/node/modules/child_process.ts | 46 +++-- packages/protocol/src/node/modules/fs.ts | 62 ++++--- packages/protocol/src/node/modules/net.ts | 69 +++----- .../protocol/src/node/modules/node-pty.ts | 30 ++-- packages/protocol/src/node/modules/spdlog.ts | 25 ++- packages/protocol/src/node/modules/stream.ts | 110 ++++++------ packages/protocol/src/node/server.ts | 7 +- packages/protocol/test/child_process.test.ts | 2 +- packages/protocol/test/fs.test.ts | 39 ++++- 21 files changed, 546 insertions(+), 278 deletions(-) create mode 100644 packages/protocol/README.md diff --git a/packages/ide/src/fill/electron.ts b/packages/ide/src/fill/electron.ts index 624b0319..1ef1677b 100644 --- a/packages/ide/src/fill/electron.ts +++ b/packages/ide/src/fill/electron.ts @@ -171,8 +171,10 @@ const newCreateElement = (tagName: K): HT document.createElement = newCreateElement; class Clipboard { - public has(): boolean { - return false; + private readonly buffers = new Map(); + + public has(format: string): boolean { + return this.buffers.has(format); } public readFindText(): string { @@ -190,6 +192,14 @@ class Clipboard { public readText(): Promise { return clipboard.readText(); } + + public writeBuffer(format: string, buffer: Buffer): void { + this.buffers.set(format, buffer); + } + + public readBuffer(format: string): Buffer | undefined { + return this.buffers.get(format); + } } class Shell { diff --git a/packages/protocol/README.md b/packages/protocol/README.md new file mode 100644 index 00000000..a837f0ac --- /dev/null +++ b/packages/protocol/README.md @@ -0,0 +1,47 @@ +# Protocol + +This module provides a way for the browser to run Node modules like `fs`, `net`, +etc. + +## Internals + +### Server-side proxies +The server-side proxies are regular classes that call native Node functions. The +only thing special about them is that they must return promises and they must +return serializable values. + +The only exception to the promise rule are event-related methods such as +`onEvent` and `onDone` (these are synchronous). The server will simply +immediately bind and push all events it can to the client. It doesn't wait for +the client to start listening. This prevents issues with the server not +receiving the client's request to start listening in time. + +However, there is a way to specify events that should not bind immediately and +should wait for the client to request it, because some events (like `data` on a +stream) cannot be bound immediately (because doing so changes how the stream +behaves). + +### Client-side proxies +Client-side proxies are `Proxy` instances. They simply make remote calls for any +method you call on it. The only exception is for events. Each client proxy has a +local emitter which it uses in place of a remote call (this allows the call to +be completed synchronously on the client). Then when an event is received from +the server, it gets emitted on that local emitter. + +When an event is listened to, the proxy also notifies the server so it can start +listening in case it isn't already (see the `data` example above). This only +works for events that only fire after they are bound. + +### Client-side fills +The client-side fills implement the Node API and make calls to the server-side +proxies using the client-side proxies. + +When a proxy returns a proxy (for example `fs.createWriteStream`), that proxy is +a promise (since communicating with the server is asynchronous). We have to +return the fill from `fs.createWriteStream` synchronously, so that means the +fill has to contain a proxy promise. To eliminate the need for calling `then` +and to keep the code looking clean every time you use the proxy, the proxy is +itself wrapped in another proxy which just calls the method after a `then`. This +works since all the methods return promises (aside from the event methods, but +those are not used by the fills directly—they are only used internally to +forward events to the fill if it is an event emitter). diff --git a/packages/protocol/src/browser/client.ts b/packages/protocol/src/browser/client.ts index 31270861..77bba1b6 100644 --- a/packages/protocol/src/browser/client.ts +++ b/packages/protocol/src/browser/client.ts @@ -4,7 +4,7 @@ import { promisify } from "util"; import { Emitter } from "@coder/events"; import { logger, field } from "@coder/logger"; import { ReadWriteConnection, InitData, SharedProcessData } from "../common/connection"; -import { Module, ServerProxy } from "../common/proxy"; +import { ClientServerProxy, Module, ServerProxy } from "../common/proxy"; import { argumentToProto, protoToArgument, moduleToProto, protoToModule, protoToOperatingSystem } from "../common/util"; import { Argument, Ping, ServerMessage, ClientMessage, Method, Event, Callback } from "../proto"; import { FsModule, ChildProcessModule, NetModule, NodePtyModule, SpdlogModule, TrashModule } from "./modules"; @@ -224,7 +224,11 @@ export class Client { field("method", method), ]); - proxyMessage.setArgsList(args.map((a) => argumentToProto(a, storeCallback))); + proxyMessage.setArgsList(args.map((a) => argumentToProto( + a, + storeCallback, + (p) => p.proxyId, + ))); const clientMessage = new ClientMessage(); clientMessage.setMethod(message); @@ -429,7 +433,7 @@ export class Client { /** * Return a proxy that makes remote calls. */ - private createProxy(proxyId: number | Module, promise: Promise = Promise.resolve()): T { + private createProxy(proxyId: number | Module, promise: Promise = Promise.resolve()): T { logger.trace(() => [ "creating proxy", field("proxyId", proxyId), @@ -449,7 +453,7 @@ export class Client { cb(event.event, ...event.args); }); }, - }, { + } as ClientServerProxy, { get: (target: any, name: string): any => { // When resolving a promise with a proxy, it will check for "then". if (name === "then") { diff --git a/packages/protocol/src/browser/modules/child_process.ts b/packages/protocol/src/browser/modules/child_process.ts index b3e53f61..720b9656 100644 --- a/packages/protocol/src/browser/modules/child_process.ts +++ b/packages/protocol/src/browser/modules/child_process.ts @@ -2,13 +2,22 @@ import * as cp from "child_process"; import * as net from "net"; import * as stream from "stream"; import { callbackify } from "util"; -import { ClientProxy } from "../../common/proxy"; -import { ChildProcessModuleProxy, ChildProcessProxy, ChildProcessProxies } from "../../node/modules/child_process"; -import { Readable, Writable } from "./stream"; +import { ClientProxy, ClientServerProxy } from "../../common/proxy"; +import { ChildProcessModuleProxy, ChildProcessProxy } from "../../node/modules/child_process"; +import { ClientWritableProxy, ClientReadableProxy, Readable, Writable } from "./stream"; // tslint:disable completed-docs -export class ChildProcess extends ClientProxy implements cp.ChildProcess { +export interface ClientChildProcessProxy extends ChildProcessProxy, ClientServerProxy {} + +export interface ClientChildProcessProxies { + childProcess: ClientChildProcessProxy; + stdin?: ClientWritableProxy | null; + stdout?: ClientReadableProxy | null; + stderr?: ClientReadableProxy | null; +} + +export class ChildProcess extends ClientProxy implements cp.ChildProcess { public readonly stdin: stream.Writable; public readonly stdout: stream.Readable; public readonly stderr: stream.Readable; @@ -18,7 +27,7 @@ export class ChildProcess extends ClientProxy implements cp.C private _killed: boolean = false; private _pid = -1; - public constructor(proxyPromises: Promise) { + public constructor(proxyPromises: Promise) { super(proxyPromises.then((p) => p.childProcess)); this.stdin = new Writable(proxyPromises.then((p) => p.stdin!)); this.stdout = new Readable(proxyPromises.then((p) => p.stdout!)); @@ -99,8 +108,14 @@ export class ChildProcess extends ClientProxy implements cp.C } } +interface ClientChildProcessModuleProxy extends ChildProcessModuleProxy, ClientServerProxy { + exec(command: string, options?: { encoding?: string | null } & cp.ExecOptions | null, callback?: ((error: cp.ExecException | null, stdin: string | Buffer, stdout: string | Buffer) => void)): Promise; + fork(modulePath: string, args?: string[], options?: cp.ForkOptions): Promise; + spawn(command: string, args?: string[], options?: cp.SpawnOptions): Promise; +} + export class ChildProcessModule { - public constructor(private readonly proxy: ChildProcessModuleProxy) {} + public constructor(private readonly proxy: ClientChildProcessModuleProxy) {} public exec = ( command: string, diff --git a/packages/protocol/src/browser/modules/fs.ts b/packages/protocol/src/browser/modules/fs.ts index 8984120c..b8f95461 100644 --- a/packages/protocol/src/browser/modules/fs.ts +++ b/packages/protocol/src/browser/modules/fs.ts @@ -1,12 +1,11 @@ import * as fs from "fs"; import { callbackify } from "util"; -import { ClientProxy, Batch } from "../../common/proxy"; +import { Batch, ClientProxy, ClientServerProxy } from "../../common/proxy"; import { IEncodingOptions, IEncodingOptionsCallback } from "../../common/util"; -import { FsModuleProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "../../node/modules/fs"; -import { Writable } from "./stream"; +import { FsModuleProxy, ReadStreamProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "../../node/modules/fs"; +import { Readable, Writable } from "./stream"; -// tslint:disable no-any -// tslint:disable completed-docs +// tslint:disable completed-docs no-any class StatBatch extends Batch { public constructor(private readonly proxy: FsModuleProxy) { @@ -38,7 +37,9 @@ class ReaddirBatch extends Batch implements fs.FSWatcher { +interface ClientWatcherProxy extends WatcherProxy, ClientServerProxy {} + +class Watcher extends ClientProxy implements fs.FSWatcher { public close(): void { this.catch(this.proxy.close()); } @@ -48,7 +49,25 @@ class Watcher extends ClientProxy implements fs.FSWatcher { } } -class WriteStream extends Writable implements fs.WriteStream { +interface ClientReadStreamProxy extends ReadStreamProxy, ClientServerProxy {} + +class ReadStream extends Readable implements fs.ReadStream { + public get bytesRead(): number { + throw new Error("not implemented"); + } + + public get path(): string | Buffer { + throw new Error("not implemented"); + } + + public close(): void { + this.catch(this.proxy.close()); + } +} + +interface ClientWriteStreamProxy extends WriteStreamProxy, ClientServerProxy {} + +class WriteStream extends Writable implements fs.WriteStream { public get bytesWritten(): number { throw new Error("not implemented"); } @@ -62,12 +81,18 @@ class WriteStream extends Writable implements fs.WriteStream { } } +interface ClientFsModuleProxy extends FsModuleProxy, ClientServerProxy { + createReadStream(path: fs.PathLike, options?: any): Promise; + createWriteStream(path: fs.PathLike, options?: any): Promise; + watch(filename: fs.PathLike, options?: IEncodingOptions): Promise; +} + export class FsModule { private readonly statBatch: StatBatch; private readonly lstatBatch: LstatBatch; private readonly readdirBatch: ReaddirBatch; - public constructor(private readonly proxy: FsModuleProxy) { + public constructor(private readonly proxy: ClientFsModuleProxy) { this.statBatch = new StatBatch(this.proxy); this.lstatBatch = new LstatBatch(this.proxy); this.readdirBatch = new ReaddirBatch(this.proxy); @@ -110,6 +135,10 @@ export class FsModule { ); } + public createReadStream = (path: fs.PathLike, options?: any): fs.ReadStream => { + return new ReadStream(this.proxy.createReadStream(path, options)); + } + public createWriteStream = (path: fs.PathLike, options?: any): fs.WriteStream => { return new WriteStream(this.proxy.createWriteStream(path, options)); } diff --git a/packages/protocol/src/browser/modules/net.ts b/packages/protocol/src/browser/modules/net.ts index 571d6d9f..732711b6 100644 --- a/packages/protocol/src/browser/modules/net.ts +++ b/packages/protocol/src/browser/modules/net.ts @@ -1,16 +1,18 @@ import * as net from "net"; import { callbackify } from "util"; -import { ClientProxy } from "../../common/proxy"; +import { ClientProxy, ClientServerProxy } from "../../common/proxy"; import { NetModuleProxy, NetServerProxy, NetSocketProxy } from "../../node/modules/net"; import { Duplex } from "./stream"; // tslint:disable completed-docs -export class Socket extends Duplex implements net.Socket { +interface ClientNetSocketProxy extends NetSocketProxy, ClientServerProxy {} + +export class Socket extends Duplex implements net.Socket { private _connecting: boolean = false; private _destroyed: boolean = false; - public constructor(proxyPromise: Promise | NetSocketProxy, connecting?: boolean) { + public constructor(proxyPromise: Promise | ClientNetSocketProxy, connecting?: boolean) { super(proxyPromise); if (connecting) { this._connecting = connecting; @@ -126,12 +128,16 @@ export class Socket extends Duplex implements net.Socket { } } -export class Server extends ClientProxy implements net.Server { +interface ClientNetServerProxy extends NetServerProxy, ClientServerProxy { + onConnection(cb: (proxy: ClientNetSocketProxy) => void): Promise; +} + +export class Server extends ClientProxy implements net.Server { private socketId = 0; private readonly sockets = new Map(); private _listening: boolean = false; - public constructor(proxyPromise: Promise | NetServerProxy) { + public constructor(proxyPromise: Promise | ClientNetServerProxy) { super(proxyPromise); this.catch(this.proxy.onConnection((socketProxy) => { @@ -208,11 +214,17 @@ export class Server extends ClientProxy implements net.Server { type NodeNet = typeof net; +interface ClientNetModuleProxy extends NetModuleProxy, ClientServerProxy { + createSocket(options?: net.SocketConstructorOpts): Promise; + createConnection(target: string | number | net.NetConnectOpts, host?: string): Promise; + createServer(options?: { allowHalfOpen?: boolean, pauseOnConnect?: boolean }): Promise; +} + export class NetModule implements NodeNet { public readonly Socket: typeof net.Socket; public readonly Server: typeof net.Server; - public constructor(private readonly proxy: NetModuleProxy) { + public constructor(private readonly proxy: ClientNetModuleProxy) { // @ts-ignore this is because Socket is missing things from the Stream // namespace but I'm unsure how best to provide them (finished, // finished.__promisify__, pipeline, and some others) or if it even matters. diff --git a/packages/protocol/src/browser/modules/node-pty.ts b/packages/protocol/src/browser/modules/node-pty.ts index 6dd98e32..060de267 100644 --- a/packages/protocol/src/browser/modules/node-pty.ts +++ b/packages/protocol/src/browser/modules/node-pty.ts @@ -1,15 +1,17 @@ import * as pty from "node-pty"; -import { ClientProxy } from "../../common/proxy"; +import { ClientProxy, ClientServerProxy } from "../../common/proxy"; import { NodePtyModuleProxy, NodePtyProcessProxy } from "../../node/modules/node-pty"; // tslint:disable completed-docs -export class NodePtyProcess extends ClientProxy implements pty.IPty { +interface ClientNodePtyProcessProxy extends NodePtyProcessProxy, ClientServerProxy {} + +export class NodePtyProcess extends ClientProxy implements pty.IPty { private _pid = -1; private _process = ""; public constructor( - private readonly moduleProxy: NodePtyModuleProxy, + private readonly moduleProxy: ClientNodePtyModuleProxy, private readonly file: string, private readonly args: string[] | string, private readonly options: pty.IPtyForkOptions, @@ -18,10 +20,12 @@ export class NodePtyProcess extends ClientProxy implements this.on("process", (process) => this._process = process); } - protected initialize(proxyPromise: Promise): void { - super.initialize(proxyPromise); + protected initialize(proxyPromise: Promise): ClientNodePtyProcessProxy { + const proxy = super.initialize(proxyPromise); this.catch(this.proxy.getPid().then((p) => this._pid = p)); this.catch(this.proxy.getProcess().then((p) => this._process = p)); + + return proxy; } public get pid(): number { @@ -53,8 +57,12 @@ export class NodePtyProcess extends ClientProxy implements type NodePty = typeof pty; +interface ClientNodePtyModuleProxy extends NodePtyModuleProxy, ClientServerProxy { + spawn(file: string, args: string[] | string, options: pty.IPtyForkOptions): Promise; +} + export class NodePtyModule implements NodePty { - public constructor(private readonly proxy: NodePtyModuleProxy) {} + public constructor(private readonly proxy: ClientNodePtyModuleProxy) {} public spawn = (file: string, args: string[] | string, options: pty.IPtyForkOptions): pty.IPty => { return new NodePtyProcess(this.proxy, file, args, options); diff --git a/packages/protocol/src/browser/modules/spdlog.ts b/packages/protocol/src/browser/modules/spdlog.ts index 01e1da3d..058630e2 100644 --- a/packages/protocol/src/browser/modules/spdlog.ts +++ b/packages/protocol/src/browser/modules/spdlog.ts @@ -1,12 +1,14 @@ import * as spdlog from "spdlog"; -import { ClientProxy } from "../../common/proxy"; +import { ClientProxy, ClientServerProxy } from "../../common/proxy"; import { RotatingLoggerProxy, SpdlogModuleProxy } from "../../node/modules/spdlog"; // tslint:disable completed-docs -class RotatingLogger extends ClientProxy implements spdlog.RotatingLogger { +interface ClientRotatingLoggerProxy extends RotatingLoggerProxy, ClientServerProxy {} + +class RotatingLogger extends ClientProxy implements spdlog.RotatingLogger { public constructor( - private readonly moduleProxy: SpdlogModuleProxy, + private readonly moduleProxy: ClientSpdlogModuleProxy, private readonly name: string, private readonly filename: string, private readonly filesize: number, @@ -31,10 +33,14 @@ class RotatingLogger extends ClientProxy implements spdlog. } } +interface ClientSpdlogModuleProxy extends SpdlogModuleProxy, ClientServerProxy { + createLogger(name: string, filePath: string, fileSize: number, fileCount: number): Promise; +} + export class SpdlogModule { public readonly RotatingLogger: typeof spdlog.RotatingLogger; - public constructor(private readonly proxy: SpdlogModuleProxy) { + public constructor(private readonly proxy: ClientSpdlogModuleProxy) { this.RotatingLogger = class extends RotatingLogger { public constructor(name: string, filename: string, filesize: number, filecount: number) { super(proxy, name, filename, filesize, filecount); diff --git a/packages/protocol/src/browser/modules/stream.ts b/packages/protocol/src/browser/modules/stream.ts index 22c20cab..856e2789 100644 --- a/packages/protocol/src/browser/modules/stream.ts +++ b/packages/protocol/src/browser/modules/stream.ts @@ -1,11 +1,14 @@ import * as stream from "stream"; import { callbackify } from "util"; -import { ClientProxy } from "../../common/proxy"; -import { DuplexProxy, IReadableProxy, WritableProxy } from "../../node/modules/stream"; +import { ClientProxy, ClientServerProxy } from "../../common/proxy"; +import { isPromise } from "../../common/util"; +import { DuplexProxy, ReadableProxy, WritableProxy } from "../../node/modules/stream"; -// tslint:disable completed-docs +// tslint:disable completed-docs no-any -export class Writable extends ClientProxy implements stream.Writable { +export interface ClientWritableProxy extends WritableProxy, ClientServerProxy {} + +export class Writable extends ClientProxy implements stream.Writable { public get writable(): boolean { throw new Error("not implemented"); } @@ -50,7 +53,6 @@ export class Writable extends ClientPro return this.catch(this.proxy.setDefaultEncoding(encoding)); } - // tslint:disable-next-line no-any public write(chunk: any, encoding?: string | ((error?: Error | null) => void), callback?: (error?: Error | null) => void): boolean { if (typeof encoding === "function") { callback = encoding; @@ -65,7 +67,6 @@ export class Writable extends ClientPro return true; // Always true since we can't get this synchronously. } - // tslint:disable-next-line no-any public end(data?: any | (() => void), encoding?: string | (() => void), callback?: (() => void)): void { if (typeof data === "function") { callback = data; @@ -88,7 +89,9 @@ export class Writable extends ClientPro } } -export class Readable extends ClientProxy implements stream.Readable { +export interface ClientReadableProxy extends ReadableProxy, ClientServerProxy {} + +export class Readable extends ClientProxy implements stream.Readable { public get readable(): boolean { throw new Error("not implemented"); } @@ -141,11 +144,20 @@ export class Readable extends ClientP throw new Error("not implemented"); } - public pipe(): T { - throw new Error("not implemented"); + public pipe

(destination: P, options?: { end?: boolean }): P { + const writableProxy = (destination as any as Writable).proxyPromise; + if (!writableProxy) { + throw new Error("can only pipe stream proxies"); + } + this.catch( + isPromise(writableProxy) + ? writableProxy.then((p) => this.proxy.pipe(p, options)) + : this.proxy.pipe(writableProxy, options), + ); + + return destination; } - // tslint:disable-next-line no-any public [Symbol.asyncIterator](): AsyncIterableIterator { throw new Error("not implemented"); } @@ -164,7 +176,9 @@ export class Readable extends ClientP } } -export class Duplex extends Writable implements stream.Duplex, stream.Readable { +export interface ClientDuplexProxy extends DuplexProxy, ClientServerProxy {} + +export class Duplex extends Writable implements stream.Duplex, stream.Readable { private readonly _readable: Readable; public constructor(proxyPromise: Promise | T) { @@ -228,7 +242,6 @@ export class Duplex extends Writable imp this._readable.unshift(); } - // tslint:disable-next-line no-any public [Symbol.asyncIterator](): AsyncIterableIterator { return this._readable[Symbol.asyncIterator](); } diff --git a/packages/protocol/src/browser/modules/trash.ts b/packages/protocol/src/browser/modules/trash.ts index 06546180..3a11f4d5 100644 --- a/packages/protocol/src/browser/modules/trash.ts +++ b/packages/protocol/src/browser/modules/trash.ts @@ -1,10 +1,13 @@ import * as trash from "trash"; +import { ClientServerProxy } from "../../common/proxy"; import { TrashModuleProxy } from "../../node/modules/trash"; // tslint:disable completed-docs +interface ClientTrashModuleProxy extends TrashModuleProxy, ClientServerProxy {} + export class TrashModule { - public constructor(private readonly proxy: TrashModuleProxy) {} + public constructor(private readonly proxy: ClientTrashModuleProxy) {} public trash = (path: string, options?: trash.Options): Promise => { return this.proxy.trash(path, options); diff --git a/packages/protocol/src/common/proxy.ts b/packages/protocol/src/common/proxy.ts index 8da8131a..6ee895da 100644 --- a/packages/protocol/src/common/proxy.ts +++ b/packages/protocol/src/common/proxy.ts @@ -1,13 +1,13 @@ import { EventEmitter } from "events"; -import { isPromise } from "./util"; +import { isPromise, EventCallback } from "./util"; // tslint:disable no-any /** * Allow using a proxy like it's returned synchronously. This only works because - * all proxy methods return promises. + * all proxy methods must return promises. */ -const unpromisify = (proxyPromise: Promise): T => { +const unpromisify = (proxyPromise: Promise): T => { return new Proxy({}, { get: (target: any, name: string): any => { if (typeof target[name] === "undefined") { @@ -24,23 +24,23 @@ const unpromisify = (proxyPromise: Promise): T => { }; /** - * Client-side emitter that just forwards proxy events to its own emitter. - * It also turns a promisified proxy into a non-promisified proxy so we don't - * need a bunch of `then` calls everywhere. + * Client-side emitter that just forwards server proxy events to its own + * emitter. It also turns a promisified server proxy into a non-promisified + * proxy so we don't need a bunch of `then` calls everywhere. */ -export abstract class ClientProxy extends EventEmitter { - private _proxy: T | undefined; +export abstract class ClientProxy extends EventEmitter { + private _proxy: T; /** * You can specify not to bind events in order to avoid emitting twice for * duplex streams. */ public constructor( - proxyPromise: Promise | T, + private _proxyPromise: Promise | T, private readonly bindEvents: boolean = true, ) { super(); - this.initialize(proxyPromise); + this._proxy = this.initialize(this._proxyPromise); if (this.bindEvents) { this.on("disconnected", (error) => { try { @@ -64,11 +64,34 @@ export abstract class ClientProxy extends EventEmitter { return this; } - protected get proxy(): T { - if (!this._proxy) { - throw new Error("not initialized"); - } + /** + * Bind the event locally and ensure the event is bound on the server. + */ + public addListener(event: string, listener: (...args: any[]) => void): this { + this.catch(this.proxy.bindDelayedEvent(event)); + return super.on(event, listener); + } + + /** + * Alias for `addListener`. + */ + public on(event: string, listener: (...args: any[]) => void): this { + return this.addListener(event, listener); + } + + /** + * Original promise for the server proxy. Can be used to be passed as an + * argument. + */ + public get proxyPromise(): Promise | T { + return this._proxyPromise; + } + + /** + * Server proxy. + */ + protected get proxy(): T { return this._proxy; } @@ -76,13 +99,18 @@ export abstract class ClientProxy extends EventEmitter { * Initialize the proxy by unpromisifying if necessary and binding to its * events. */ - protected initialize(proxyPromise: Promise | T): void { - this._proxy = isPromise(proxyPromise) ? unpromisify(proxyPromise) : proxyPromise; + protected initialize(proxyPromise: Promise | T): T { + this._proxyPromise = proxyPromise; + this._proxy = isPromise(this._proxyPromise) + ? unpromisify(this._proxyPromise) + : this._proxyPromise; if (this.bindEvents) { - this.catch(this.proxy.onEvent((event, ...args): void => { + this.proxy.onEvent((event, ...args): void => { this.emit(event, ...args); - })); + }); } + + return this._proxy; } /** @@ -102,34 +130,107 @@ export abstract class ClientProxy extends EventEmitter { } } +export interface ServerProxyOptions { + /** + * The events to bind immediately. + */ + bindEvents: string[]; + /** + * Events that signal the proxy is done. + */ + doneEvents: string[]; + /** + * Events that should only be bound when asked + */ + delayedEvents?: string[]; + /** + * Whatever is emitting events (stream, child process, etc). + */ + instance: T; +} + /** - * Proxy to the actual instance on the server. Every method must only accept - * serializable arguments and must return promises with serializable values. If - * a proxy itself has proxies on creation (like how ChildProcess has stdin), + * The actual proxy instance on the server. Every method must only accept + * serializable arguments and must return promises with serializable values. + * + * If a proxy itself has proxies on creation (like how ChildProcess has stdin), * then it should return all of those at once, otherwise you will miss events * from those child proxies and fail to dispose them properly. + * + * Events listeners are added client-side (since all events automatically + * forward to the client), so onDone and onEvent do not need to be asynchronous. */ -export interface ServerProxy { +export abstract class ServerProxy { + public readonly instance: T; + + private readonly callbacks = []; + + public constructor(private readonly options: ServerProxyOptions) { + this.instance = options.instance; + } + /** * Dispose the proxy. */ - dispose(): Promise; + public async dispose(): Promise { + this.instance.removeAllListeners(); + } /** * This is used instead of an event to force it to be implemented since there * would be no guarantee the implementation would remember to emit the event. */ - onDone(cb: () => void): Promise; + public onDone(cb: () => void): void { + this.options.doneEvents.forEach((event) => { + this.instance.on(event, cb); + }); + } + + /** + * Bind an event that will not fire without first binding it and shouldn't be + * bound immediately. + + * For example, binding to `data` switches a stream to flowing mode, so we + * don't want to do it until we're asked. Otherwise something like `pipe` + * won't work because potentially some or all of the data will already have + * been flushed out. + */ + public async bindDelayedEvent(event: string): Promise { + if (this.options.delayedEvents + && this.options.delayedEvents.includes(event) + && !this.options.bindEvents.includes(event)) { + this.options.bindEvents.push(event); + this.callbacks.forEach((cb) => { + this.instance.on(event, (...args: any[]) => cb(event, ...args)); + }); + } + } /** * Listen to all possible events. On the client, this is to reduce boilerplate * that would just be a bunch of error-prone forwarding of each individual - * event from the proxy to its own emitter. It also fixes a timing issue - * because we just always send all events from the server, so we never miss - * any due to listening too late. + * event from the proxy to its own emitter. + * + * It also fixes a timing issue because we just always send all events from + * the server, so we never miss any due to listening too late. + * + * This cannot be async because then we can bind to the events too late. */ - // tslint:disable-next-line no-any - onEvent(cb: (event: string, ...args: any[]) => void): Promise; + public onEvent(cb: EventCallback): void { + this.callbacks.push(cb); + this.options.bindEvents.forEach((event) => { + this.instance.on(event, (...args: any[]) => cb(event, ...args)); + }); + } +} + +/** + * A server-side proxy stored on the client. The proxy ID only exists on the + * client-side version of the server proxy. The event listeners are handled by + * the client and the remaining methods are proxied to the server. + */ +export interface ClientServerProxy extends ServerProxy { + proxyId: number | Module; } /** diff --git a/packages/protocol/src/common/util.ts b/packages/protocol/src/common/util.ts index 35d3221a..789bc3cd 100644 --- a/packages/protocol/src/common/util.ts +++ b/packages/protocol/src/common/util.ts @@ -1,6 +1,6 @@ import { Argument, Module as ProtoModule, WorkingInit } from "../proto"; import { OperatingSystem } from "../common/connection"; -import { Module, ServerProxy } from "./proxy"; +import { ClientServerProxy, Module, ServerProxy } from "./proxy"; // tslint:disable no-any @@ -19,6 +19,8 @@ export const escapePath = (path: string): string => { return `'${path.replace(/'/g, "'\\''")}'`; }; +export type EventCallback = (event: string, ...args: any[]) => void; + export type IEncodingOptions = { encoding?: BufferEncoding | null; flag?: string; @@ -34,15 +36,26 @@ export type IEncodingOptionsCallback = IEncodingOptions | ((err: NodeJS.ErrnoExc * If sending a function is possible, provide `storeFunction`. * If sending a proxy is possible, provide `storeProxy`. */ -export const argumentToProto = ( +export const argumentToProto =

( value: any, storeFunction?: (fn: () => void) => number, - storeProxy?: (proxy: ServerProxy) => number, + storeProxy?: (proxy: P) => number | Module, ): Argument => { const convert = (currentValue: any): Argument => { const message = new Argument(); - if (currentValue instanceof Error + if (isProxy

(currentValue)) { + if (!storeProxy) { + throw new Error("no way to serialize proxy"); + } + const arg = new Argument.ProxyValue(); + const id = storeProxy(currentValue); + if (typeof id === "string") { + throw new Error("unable to serialize module proxy"); + } + arg.setId(id); + message.setProxy(arg); + } else if (currentValue instanceof Error || (currentValue && typeof currentValue.message !== "undefined" && typeof currentValue.stack !== "undefined")) { const arg = new Argument.ErrorValue(); @@ -58,13 +71,6 @@ export const argumentToProto = ( const arg = new Argument.ArrayValue(); arg.setDataList(currentValue.map(convert)); message.setArray(arg); - } else if (isProxy(currentValue)) { - if (!storeProxy) { - throw new Error("no way to serialize proxy"); - } - const arg = new Argument.ProxyValue(); - arg.setId(storeProxy(currentValue)); - message.setProxy(arg); } else if (currentValue instanceof Date || (currentValue && typeof currentValue.getTime === "function")) { const arg = new Argument.DateValue(); @@ -218,7 +224,7 @@ export const platformToProto = (platform: NodeJS.Platform): WorkingInit.Operatin } }; -export const isProxy = (value: any): value is ServerProxy => { +export const isProxy =

(value: any): value is P => { return value && typeof value === "object" && typeof value.onEvent === "function"; }; diff --git a/packages/protocol/src/node/modules/child_process.ts b/packages/protocol/src/node/modules/child_process.ts index b6a31a36..0b0c9eb0 100644 --- a/packages/protocol/src/node/modules/child_process.ts +++ b/packages/protocol/src/node/modules/child_process.ts @@ -7,29 +7,35 @@ import { WritableProxy, ReadableProxy } from "./stream"; export type ForkProvider = (modulePath: string, args?: string[], options?: cp.ForkOptions) => cp.ChildProcess; -export class ChildProcessProxy implements ServerProxy { - public constructor(private readonly process: cp.ChildProcess) {} +export class ChildProcessProxy extends ServerProxy { + public constructor(instance: cp.ChildProcess) { + super({ + bindEvents: ["close", "disconnect", "error", "exit", "message"], + doneEvents: ["close"], + instance, + }); + } public async kill(signal?: string): Promise { - this.process.kill(signal); + this.instance.kill(signal); } public async disconnect(): Promise { - this.process.disconnect(); + this.instance.disconnect(); } public async ref(): Promise { - this.process.ref(); + this.instance.ref(); } public async unref(): Promise { - this.process.unref(); + this.instance.unref(); } // tslint:disable-next-line no-any public async send(message: any): Promise { return new Promise((resolve, reject): void => { - this.process.send(message, (error) => { + this.instance.send(message, (error) => { if (error) { reject(error); } else { @@ -40,25 +46,13 @@ export class ChildProcessProxy implements ServerProxy { } public async getPid(): Promise { - return this.process.pid; - } - - public async onDone(cb: () => void): Promise { - this.process.on("close", cb); + return this.instance.pid; } public async dispose(): Promise { - this.process.kill(); - setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap. - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - this.process.on("close", (code, signal) => cb("close", code, signal)); - this.process.on("disconnect", () => cb("disconnect")); - this.process.on("error", (error) => cb("error", error)); - this.process.on("exit", (exitCode, signal) => cb("exit", exitCode, signal)); - this.process.on("message", (message) => cb("message", message)); + this.instance.kill(); + setTimeout(() => this.instance.kill("SIGKILL"), 5000); // Double tap. + await super.dispose(); } } @@ -98,8 +92,10 @@ export class ChildProcessModuleProxy { return { childProcess: new ChildProcessProxy(process), stdin: process.stdin && new WritableProxy(process.stdin), - stdout: process.stdout && new ReadableProxy(process.stdout), - stderr: process.stderr && new ReadableProxy(process.stderr), + // Child processes streams appear to immediately flow so we need to bind + // to the data event right away. + stdout: process.stdout && new ReadableProxy(process.stdout, ["data"]), + stderr: process.stderr && new ReadableProxy(process.stderr, ["data"]), }; } } diff --git a/packages/protocol/src/node/modules/fs.ts b/packages/protocol/src/node/modules/fs.ts index b59a6728..f93452be 100644 --- a/packages/protocol/src/node/modules/fs.ts +++ b/packages/protocol/src/node/modules/fs.ts @@ -2,9 +2,9 @@ import * as fs from "fs"; import { promisify } from "util"; import { ServerProxy } from "../../common/proxy"; import { IEncodingOptions } from "../../common/util"; -import { WritableProxy } from "./stream"; +import { ReadableProxy, WritableProxy } from "./stream"; -// tslint:disable completed-docs +// tslint:disable completed-docs no-any /** * A serializable version of fs.Stats. @@ -37,45 +37,52 @@ export interface Stats { _isSocket: boolean; } -export class WriteStreamProxy extends WritableProxy { +export class ReadStreamProxy extends ReadableProxy { + public constructor(stream: fs.ReadStream) { + super(stream, ["open"]); + } + public async close(): Promise { - this.stream.close(); + this.instance.close(); } public async dispose(): Promise { + this.instance.close(); await super.dispose(); - this.stream.close(); - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - await super.onEvent(cb); - this.stream.on("open", (fd) => cb("open", fd)); } } -export class WatcherProxy implements ServerProxy { - public constructor(private readonly watcher: fs.FSWatcher) {} +export class WriteStreamProxy extends WritableProxy { + public constructor(stream: fs.WriteStream) { + super(stream, ["open"]); + } public async close(): Promise { - this.watcher.close(); + this.instance.close(); } public async dispose(): Promise { - this.watcher.close(); - this.watcher.removeAllListeners(); + this.instance.close(); + await super.dispose(); + } +} + +export class WatcherProxy extends ServerProxy { + public constructor(watcher: fs.FSWatcher) { + super({ + bindEvents: ["change", "close", "error"], + doneEvents: ["close", "error"], + instance: watcher, + }); } - public async onDone(cb: () => void): Promise { - this.watcher.on("close", cb); - this.watcher.on("error", cb); + public async close(): Promise { + this.instance.close(); } - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - this.watcher.on("change", (event, filename) => cb("change", event, filename)); - this.watcher.on("close", () => cb("close")); - this.watcher.on("error", (error) => cb("error", error)); + public async dispose(): Promise { + this.instance.close(); + await super.dispose(); } } @@ -84,7 +91,6 @@ export class FsModuleProxy { return promisify(fs.access)(path, mode); } - // tslint:disable-next-line no-any public appendFile(file: fs.PathLike | number, data: any, options?: fs.WriteFileOptions): Promise { return promisify(fs.appendFile)(file, data, options); } @@ -105,7 +111,10 @@ export class FsModuleProxy { return promisify(fs.copyFile)(src, dest, flags); } - // tslint:disable-next-line no-any + public async createReadStream(path: fs.PathLike, options?: any): Promise { + return new ReadStreamProxy(fs.createReadStream(path, options)); + } + public async createWriteStream(path: fs.PathLike, options?: any): Promise { return new WriteStreamProxy(fs.createWriteStream(path, options)); } @@ -236,7 +245,6 @@ export class FsModuleProxy { return promisify(fs.write)(fd, buffer, offset, length, position); } - // tslint:disable-next-line no-any public writeFile (path: fs.PathLike | number, data: any, options: IEncodingOptions): Promise { return promisify(fs.writeFile)(path, data, options); } diff --git a/packages/protocol/src/node/modules/net.ts b/packages/protocol/src/node/modules/net.ts index 3c2ef26f..28ffa52e 100644 --- a/packages/protocol/src/node/modules/net.ts +++ b/packages/protocol/src/node/modules/net.ts @@ -2,78 +2,65 @@ import * as net from "net"; import { ServerProxy } from "../../common/proxy"; import { DuplexProxy } from "./stream"; -// tslint:disable completed-docs +// tslint:disable completed-docs no-any export class NetSocketProxy extends DuplexProxy { + public constructor(socket: net.Socket) { + super(socket, ["connect", "lookup", "timeout"]); + } + public async connect(options: number | string | net.SocketConnectOpts, host?: string): Promise { - this.stream.connect(options as any, host as any); // tslint:disable-line no-any this works fine + this.instance.connect(options as any, host as any); } public async unref(): Promise { - this.stream.unref(); + this.instance.unref(); } public async ref(): Promise { - this.stream.ref(); + this.instance.ref(); } public async dispose(): Promise { - this.stream.removeAllListeners(); - this.stream.end(); - this.stream.destroy(); - this.stream.unref(); - } - - public async onDone(cb: () => void): Promise { - this.stream.on("close", cb); - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - await super.onEvent(cb); - this.stream.on("connect", () => cb("connect")); - this.stream.on("lookup", (error, address, family, host) => cb("lookup", error, address, family, host)); - this.stream.on("timeout", () => cb("timeout")); + this.instance.end(); + this.instance.destroy(); + this.instance.unref(); + await super.dispose(); } } -export class NetServerProxy implements ServerProxy { - public constructor(private readonly server: net.Server) {} +export class NetServerProxy extends ServerProxy { + public constructor(instance: net.Server) { + super({ + bindEvents: ["close", "error", "listening"], + doneEvents: ["close"], + instance, + }); + } public async listen(handle?: net.ListenOptions | number | string, hostname?: string | number, backlog?: number): Promise { - this.server.listen(handle, hostname as any, backlog as any); // tslint:disable-line no-any this is fine + this.instance.listen(handle, hostname as any, backlog as any); } public async ref(): Promise { - this.server.ref(); + this.instance.ref(); } public async unref(): Promise { - this.server.unref(); + this.instance.unref(); } public async close(): Promise { - this.server.close(); + this.instance.close(); } public async onConnection(cb: (proxy: NetSocketProxy) => void): Promise { - this.server.on("connection", (socket) => cb(new NetSocketProxy(socket))); + this.instance.on("connection", (socket) => cb(new NetSocketProxy(socket))); } public async dispose(): Promise { - this.server.close(); - this.server.removeAllListeners(); - } - - public async onDone(cb: () => void): Promise { - this.server.on("close", cb); - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - this.server.on("close", () => cb("close")); - this.server.on("error", (error) => cb("error", error)); - this.server.on("listening", () => cb("listening")); + this.instance.close(); + this.instance.removeAllListeners(); } } @@ -83,7 +70,7 @@ export class NetModuleProxy { } public async createConnection(target: string | number | net.NetConnectOpts, host?: string): Promise { - return new NetSocketProxy(net.createConnection(target as any, host)); // tslint:disable-line no-any defeat stubborness + return new NetSocketProxy(net.createConnection(target as any, host)); } public async createServer(options?: { allowHalfOpen?: boolean, pauseOnConnect?: boolean }): Promise { diff --git a/packages/protocol/src/node/modules/node-pty.ts b/packages/protocol/src/node/modules/node-pty.ts index 6a455b24..c5f2581c 100644 --- a/packages/protocol/src/node/modules/node-pty.ts +++ b/packages/protocol/src/node/modules/node-pty.ts @@ -9,18 +9,25 @@ import { preserveEnv } from "../../common/util"; /** * Server-side IPty proxy. */ -export class NodePtyProcessProxy implements ServerProxy { - private readonly emitter = new EventEmitter(); - +export class NodePtyProcessProxy extends ServerProxy { public constructor(private readonly process: pty.IPty) { + super({ + bindEvents: ["process", "data", "exit"], + doneEvents: ["exit"], + instance: new EventEmitter(), + }); + + this.process.on("data", (data) => this.instance.emit("data", data)); + this.process.on("exit", (exitCode, signal) => this.instance.emit("exit", exitCode, signal)); + let name = process.process; setTimeout(() => { // Need to wait for the caller to listen to the event. - this.emitter.emit("process", name); + this.instance.emit("process", name); }, 1); const timer = setInterval(() => { if (process.process !== name) { name = process.process; - this.emitter.emit("process", name); + this.instance.emit("process", name); } }, 200); @@ -47,21 +54,10 @@ export class NodePtyProcessProxy implements ServerProxy { this.process.write(data); } - public async onDone(cb: () => void): Promise { - this.process.on("exit", cb); - } - public async dispose(): Promise { this.process.kill(); setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap. - this.emitter.removeAllListeners(); - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - this.emitter.on("process", (process) => cb("process", process)); - this.process.on("data", (data) => cb("data", data)); - this.process.on("exit", (exitCode, signal) => cb("exit", exitCode, signal)); + await super.dispose(); } } diff --git a/packages/protocol/src/node/modules/spdlog.ts b/packages/protocol/src/node/modules/spdlog.ts index a18ec6a3..9023f0ae 100644 --- a/packages/protocol/src/node/modules/spdlog.ts +++ b/packages/protocol/src/node/modules/spdlog.ts @@ -5,10 +5,14 @@ import { ServerProxy } from "../../common/proxy"; // tslint:disable completed-docs -export class RotatingLoggerProxy implements ServerProxy { - private readonly emitter = new EventEmitter(); - - public constructor(private readonly logger: spdlog.RotatingLogger) {} +export class RotatingLoggerProxy extends ServerProxy { + public constructor(private readonly logger: spdlog.RotatingLogger) { + super({ + bindEvents: [], + doneEvents: ["dispose"], + instance: new EventEmitter(), + }); + } public async trace (message: string): Promise { this.logger.trace(message); } public async debug (message: string): Promise { this.logger.debug(message); } @@ -21,19 +25,10 @@ export class RotatingLoggerProxy implements ServerProxy { public async flush (): Promise { this.logger.flush(); } public async drop (): Promise { this.logger.drop(); } - public async onDone(cb: () => void): Promise { - this.emitter.on("dispose", cb); - } - public async dispose(): Promise { await this.flush(); - this.emitter.emit("dispose"); - this.emitter.removeAllListeners(); - } - - // tslint:disable-next-line no-any - public async onEvent(_cb: (event: string, ...args: any[]) => void): Promise { - // No events. + this.instance.emit("dispose"); + await super.dispose(); } } diff --git a/packages/protocol/src/node/modules/stream.ts b/packages/protocol/src/node/modules/stream.ts index d3840928..e64ec38d 100644 --- a/packages/protocol/src/node/modules/stream.ts +++ b/packages/protocol/src/node/modules/stream.ts @@ -1,32 +1,38 @@ +import { EventEmitter } from "events"; import * as stream from "stream"; import { ServerProxy } from "../../common/proxy"; -// tslint:disable completed-docs +// tslint:disable completed-docs no-any -export class WritableProxy implements ServerProxy { - public constructor(protected readonly stream: T) {} - - public async destroy(): Promise { - this.stream.destroy(); +export class WritableProxy extends ServerProxy { + public constructor(instance: T, bindEvents: string[] = [], delayedEvents?: string[]) { + super({ + bindEvents: ["close", "drain", "error", "finish"].concat(bindEvents), + doneEvents: ["close"], + delayedEvents, + instance, + }); + } + + public async destroy(): Promise { + this.instance.destroy(); } - // tslint:disable-next-line no-any public async end(data?: any, encoding?: string): Promise { return new Promise((resolve): void => { - this.stream.end(data, encoding, () => { + this.instance.end(data, encoding, () => { resolve(); }); }); } public async setDefaultEncoding(encoding: string): Promise { - this.stream.setDefaultEncoding(encoding); + this.instance.setDefaultEncoding(encoding); } - // tslint:disable-next-line no-any public async write(data: any, encoding?: string): Promise { return new Promise((resolve, reject): void => { - this.stream.write(data, encoding, (error) => { + this.instance.write(data, encoding, (error) => { if (error) { reject(error); } else { @@ -37,22 +43,8 @@ export class WritableProxy implemen } public async dispose(): Promise { - this.stream.end(); - this.stream.removeAllListeners(); - } - - public async onDone(cb: () => void): Promise { - this.stream.on("close", cb); - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - // Sockets have an extra argument on "close". - // tslint:disable-next-line no-any - this.stream.on("close", (...args: any[]) => cb("close", ...args)); - this.stream.on("drain", () => cb("drain")); - this.stream.on("error", (error) => cb("error", error)); - this.stream.on("finish", () => cb("finish")); + this.instance.end(); + await super.dispose(); } } @@ -60,50 +52,58 @@ export class WritableProxy implemen * This noise is because we can't do multiple extends and we also can't seem to * do `extends WritableProxy implement ReadableProxy` (for `DuplexProxy`). */ -export interface IReadableProxy extends ServerProxy { - destroy(): Promise; +export interface IReadableProxy extends ServerProxy { + pipe

(destination: P, options?: { end?: boolean; }): Promise; setEncoding(encoding: string): Promise; - dispose(): Promise; - onDone(cb: () => void): Promise; } -export class ReadableProxy implements IReadableProxy { - public constructor(protected readonly stream: T) {} +export class ReadableProxy extends ServerProxy implements IReadableProxy { + public constructor(instance: T, bindEvents: string[] = []) { + super({ + bindEvents: ["close", "end", "error"].concat(bindEvents), + doneEvents: ["close"], + delayedEvents: ["data"], + instance, + }); + } + + public async pipe

(destination: P, options?: { end?: boolean; }): Promise { + this.instance.pipe(destination.instance, options); + // `pipe` switches the stream to flowing mode and makes data start emitting. + await this.bindDelayedEvent("data"); + } public async destroy(): Promise { - this.stream.destroy(); + this.instance.destroy(); } public async setEncoding(encoding: string): Promise { - this.stream.setEncoding(encoding); + this.instance.setEncoding(encoding); } public async dispose(): Promise { - this.stream.destroy(); - } - - public async onDone(cb: () => void): Promise { - this.stream.on("close", cb); - } - - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - this.stream.on("close", () => cb("close")); - this.stream.on("data", (chunk) => cb("data", chunk)); - this.stream.on("end", () => cb("end")); - this.stream.on("error", (error) => cb("error", error)); + this.instance.destroy(); + await super.dispose(); } } -export class DuplexProxy extends WritableProxy implements IReadableProxy { +export class DuplexProxy extends WritableProxy implements IReadableProxy { + public constructor(stream: T, bindEvents: string[] = []) { + super(stream, ["end"].concat(bindEvents), ["data"]); + } + + public async pipe

(destination: P, options?: { end?: boolean; }): Promise { + this.instance.pipe(destination.instance, options); + // `pipe` switches the stream to flowing mode and makes data start emitting. + await this.bindDelayedEvent("data"); + } + public async setEncoding(encoding: string): Promise { - this.stream.setEncoding(encoding); + this.instance.setEncoding(encoding); } - // tslint:disable-next-line no-any - public async onEvent(cb: (event: string, ...args: any[]) => void): Promise { - await super.onEvent(cb); - this.stream.on("data", (chunk) => cb("data", chunk)); - this.stream.on("end", () => cb("end")); + public async dispose(): Promise { + this.instance.destroy(); + await super.dispose(); } } diff --git a/packages/protocol/src/node/server.ts b/packages/protocol/src/node/server.ts index b0952ad2..be4abbf8 100644 --- a/packages/protocol/src/node/server.ts +++ b/packages/protocol/src/node/server.ts @@ -136,6 +136,7 @@ export class Server { const args = proxyMessage.getArgsList().map((a) => protoToArgument( a, (id, args) => this.sendCallback(proxyId, id, args), + (id) => this.getProxy(id).instance, )); logger.trace(() => [ @@ -241,9 +242,7 @@ export class Server { this.proxies.set(proxyId, { instance }); if (isProxy(instance)) { - instance.onEvent((event, ...args) => this.sendEvent(proxyId, event, ...args)).catch((error) => { - logger.error(error.message); - }); + instance.onEvent((event, ...args) => this.sendEvent(proxyId, event, ...args)); instance.onDone(() => { // It might have finished because we disposed it due to a disconnect. if (!this.disconnected) { @@ -255,8 +254,6 @@ export class Server { this.removeProxy(proxyId); }, this.responseTimeout); } - }).catch((error) => { - logger.error(error.message); }); } diff --git a/packages/protocol/test/child_process.test.ts b/packages/protocol/test/child_process.test.ts index d49d3877..782e92d3 100644 --- a/packages/protocol/test/child_process.test.ts +++ b/packages/protocol/test/child_process.test.ts @@ -10,7 +10,7 @@ describe("child_process", () => { const cp = client.modules[Module.ChildProcess]; const getStdout = async (proc: ChildProcess): Promise => { - return new Promise((r): Readable => proc.stdout!.on("data", r)) + return new Promise((r): Readable => proc.stdout!.once("data", r)) .then((s) => s.toString()); }; diff --git a/packages/protocol/test/fs.test.ts b/packages/protocol/test/fs.test.ts index d3c29e31..483f144b 100644 --- a/packages/protocol/test/fs.test.ts +++ b/packages/protocol/test/fs.test.ts @@ -131,6 +131,42 @@ describe("fs", () => { }); }); + describe("createReadStream", () => { + it("should read a file", async () => { + const file = helper.tmpFile(); + const content = "foobar"; + await util.promisify(nativeFs.writeFile)(file, content); + + const reader = fs.createReadStream(file); + + await expect(new Promise((resolve, reject): void => { + let data = ""; + reader.once("error", reject); + reader.once("end", () => resolve(data)); + reader.on("data", (d) => data += d.toString()); + })).resolves.toBe(content); + }); + + it("should pipe to a writable stream", async () => { + const source = helper.tmpFile(); + const content = "foo"; + await util.promisify(nativeFs.writeFile)(source, content); + + const destination = helper.tmpFile(); + const reader = fs.createReadStream(source); + const writer = fs.createWriteStream(destination); + + await new Promise((resolve, reject): void => { + reader.once("error", reject); + writer.once("error", reject); + writer.once("close", resolve); + reader.pipe(writer); + }); + + await expect(util.promisify(nativeFs.readFile)(destination, "utf8")).resolves.toBe(content); + }); + }); + describe("exists", () => { it("should output file exists", async () => { await expect(util.promisify(fs.exists)(__filename)) @@ -279,10 +315,9 @@ describe("fs", () => { .resolves.toBeUndefined(); }); - // TODO: Doesn't fail on my system? it("should fail to lchown nonexistent file", async () => { await expect(util.promisify(fs.lchown)(helper.tmpFile(), 1, 1)) - .resolves.toBeUndefined(); + .rejects.toThrow("ENOENT"); }); });