From 20f5d8eeed01d2b7e066e2589f3e9f814d347436 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 29 Jan 2019 18:48:02 -0600 Subject: [PATCH] Add active evals (#25) * Add active evals * Convert type of stats to date or string * Fix generic overloads for run * Lower evaluate timeout * Add comment for createWriteStream --- packages/protocol/src/browser/client.ts | 62 ++++- packages/protocol/src/browser/command.ts | 8 + packages/protocol/src/browser/modules/fs.ts | 58 ++++- packages/protocol/src/node/evaluate.ts | 54 +++- packages/protocol/src/node/server.ts | 16 +- packages/protocol/src/proto/client.proto | 6 +- packages/protocol/src/proto/client_pb.d.ts | 18 +- packages/protocol/src/proto/client_pb.js | 124 ++++++++-- packages/protocol/src/proto/node.proto | 9 + packages/protocol/src/proto/node_pb.d.ts | 34 +++ packages/protocol/src/proto/node_pb.js | 260 +++++++++++++++++++- packages/protocol/test/evaluate.test.ts | 14 ++ packages/protocol/test/modules/fs.test.ts | 17 ++ 13 files changed, 640 insertions(+), 40 deletions(-) diff --git a/packages/protocol/src/browser/client.ts b/packages/protocol/src/browser/client.ts index e4f5e6f6..6fea49f0 100644 --- a/packages/protocol/src/browser/client.ts +++ b/packages/protocol/src/browser/client.ts @@ -1,8 +1,9 @@ import { ReadWriteConnection, InitData, OperatingSystem, ISharedProcessData } from "../common/connection"; -import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage } from "../proto"; +import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, EvalEventMessage } from "../proto"; import { Emitter, Event } from "@coder/events"; import { logger, field } from "@coder/logger"; -import { ChildProcess, SpawnOptions, ForkOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command"; +import { ChildProcess, SpawnOptions, ForkOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server, ActiveEval } from "./command"; +import { EventEmitter } from "events"; /** * Client accepts an arbitrary connection intended to communicate with the Server. @@ -14,6 +15,7 @@ export class Client { private evalId: number = 0; private evalDoneEmitter: Emitter = new Emitter(); private evalFailedEmitter: Emitter = new Emitter(); + private evalEventEmitter: Emitter = new Emitter(); private sessionId: number = 0; private readonly sessions: Map = new Map(); @@ -66,6 +68,47 @@ export class Client { return this.sharedProcessActiveEmitter.event; } + public run(func: (ae: ActiveEval) => void | Promise): ActiveEval; + public run(func: (ae: ActiveEval, a1: T1) => void | Promise, a1: T1): ActiveEval; + public run(func: (ae: ActiveEval, a1: T1, a2: T2) => void | Promise, a1: T1, a2: T2): ActiveEval; + public run(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3) => void | Promise, a1: T1, a2: T2, a3: T3): ActiveEval; + public run(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3, a4: T4) => void | Promise, a1: T1, a2: T2, a3: T3, a4: T4): ActiveEval; + public run(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5) => void | Promise, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5): ActiveEval; + public run(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6) => void | Promise, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6): ActiveEval; + + public run(func: (ae: ActiveEval, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6) => void | Promise, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6): ActiveEval { + const doEval = this.doEvaluate(func, a1, a2, a3, a4, a5, a6, true); + const eventEmitter = new EventEmitter(); + const d1 = this.evalEventEmitter.event((msg) => { + if (msg.getId() !== doEval.id) { + return; + } + + eventEmitter.emit(msg.getEvent(), ...msg.getArgsList().filter(a => a).map(s => JSON.parse(s))); + }); + + doEval.completed.then(() => { + d1.dispose(); + eventEmitter.emit("close"); + }).catch((ex) => { + d1.dispose(); + eventEmitter.emit("error", ex); + }); + + return { + on: (event: string, cb: (...args: any[]) => void) => eventEmitter.on(event, cb), + emit: (event: string, ...args: any[]) => { + const eventsMsg = new EvalEventMessage(); + eventsMsg.setId(doEval.id); + eventsMsg.setEvent(event); + eventsMsg.setArgsList(args.filter(a => a).map(a => JSON.stringify(a))); + const clientMsg = new ClientMessage(); + clientMsg.setEvalEvent(eventsMsg); + this.connection.send(clientMsg.serializeBinary()); + }, + }; + } + public evaluate(func: () => R | Promise): Promise; public evaluate(func: (a1: T1) => R | Promise, a1: T1): Promise; public evaluate(func: (a1: T1, a2: T2) => R | Promise, a1: T1, a2: T2): Promise; @@ -86,9 +129,17 @@ export class Client { * @returns Promise rejected or resolved from the evaluated function */ public evaluate(func: (a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6) => R | Promise, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6): Promise { + return this.doEvaluate(func, a1, a2, a3, a4, a5, a6, false).completed; + } + + private doEvaluate(func: (...args: any[]) => void | Promise | R | Promise, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6, active: boolean = false): { + readonly completed: Promise; + readonly id: number; + } { const newEval = new NewEvalMessage(); const id = this.evalId++; newEval.setId(id); + newEval.setActive(active); newEval.setArgsList([a1, a2, a3, a4, a5, a6].filter(a => typeof a !== "undefined").map(a => JSON.stringify(a))); newEval.setFunction(func.toString()); @@ -148,7 +199,10 @@ export class Client { } }); - return prom; + return { + completed: prom, + id, + }; } /** @@ -279,6 +333,8 @@ export class Client { this.evalDoneEmitter.emit(message.getEvalDone()!); } else if (message.hasEvalFailed()) { this.evalFailedEmitter.emit(message.getEvalFailed()!); + } else if (message.hasEvalEvent()) { + this.evalEventEmitter.emit(message.getEvalEvent()!); } else if (message.hasNewSessionFailure()) { const s = this.sessions.get(message.getNewSessionFailure()!.getId()); if (!s) { diff --git a/packages/protocol/src/browser/command.ts b/packages/protocol/src/browser/command.ts index c053c166..ce19e289 100644 --- a/packages/protocol/src/browser/command.ts +++ b/packages/protocol/src/browser/command.ts @@ -360,3 +360,11 @@ export class ServerListener extends events.EventEmitter implements Server { } } + +export interface ActiveEval { + emit(event: string, ...args: any[]): void; + + on(event: "close", cb: () => void): void; + on(event: "error", cb: (err: Error) => void): void; + on(event: string, cb: (...args: any[]) => void): void; +} \ No newline at end of file diff --git a/packages/protocol/src/browser/modules/fs.ts b/packages/protocol/src/browser/modules/fs.ts index 95dd1046..16cb3555 100644 --- a/packages/protocol/src/browser/modules/fs.ts +++ b/packages/protocol/src/browser/modules/fs.ts @@ -1,6 +1,7 @@ import { exec, ChildProcess } from "child_process"; import { EventEmitter } from "events"; import * as fs from "fs"; +import * as stream from "stream"; import { IEncodingOptions, IEncodingOptionsCallback, escapePath, useBuffer } from "../../common/util"; import { Client } from "../client"; @@ -112,8 +113,51 @@ export class FS { }); } - public createWriteStream = (): void => { - throw new Error("createWriteStream not implemented"); + /** + * This should NOT be used for long-term writes. + * The runnable will be killed after the timeout specified in evaluate.ts + */ + public createWriteStream = (path: fs.PathLike, options?: any): fs.WriteStream => { + const ae = this.client.run((ae, path, options) => { + const fs = _require("fs") as typeof import("fs"); + const str = fs.createWriteStream(path, options); + ae.on("write", (d, e) => str.write(Buffer.from(d, e))); + ae.on("close", () => str.close()); + str.on("close", () => ae.emit("close")); + str.on("open", (fd) => ae.emit("open", fd)); + str.on("error", (err) => ae.emit(err)); + }, path, options); + + return new (class WriteStream extends stream.Writable implements fs.WriteStream { + + private _bytesWritten: number = 0; + + public constructor() { + super({ + write: (data, encoding, cb) => { + this._bytesWritten += data.length; + ae.emit("write", Buffer.from(data, encoding), encoding); + cb(); + }, + }); + + ae.on("open", (a) => this.emit("open", a)); + ae.on("close", () => this.emit("close")); + } + + public get bytesWritten(): number { + return this._bytesWritten; + } + + public get path(): string | Buffer { + return ""; + } + + public close(): void { + ae.emit("close"); + } + + }) as fs.WriteStream; } public exists = (path: fs.PathLike, callback: (exists: boolean) => void): void => { @@ -667,10 +711,10 @@ interface IStats { mtimeMs: number; ctimeMs: number; birthtimeMs: number; - atime: string; - mtime: string; - ctime: string; - birthtime: string; + atime: Date | string; + mtime: Date | string; + ctime: Date | string; + birthtime: Date | string; _isFile: boolean; _isDirectory: boolean; _isBlockDevice: boolean; @@ -687,7 +731,7 @@ class Stats implements fs.Stats { public readonly ctime: Date; public readonly birthtime: Date; - private constructor(private readonly stats: IStats) { + public constructor(private readonly stats: IStats) { this.atime = new Date(stats.atime); this.mtime = new Date(stats.mtime); this.ctime = new Date(stats.ctime); diff --git a/packages/protocol/src/node/evaluate.ts b/packages/protocol/src/node/evaluate.ts index e715ab13..f9c7c31b 100644 --- a/packages/protocol/src/node/evaluate.ts +++ b/packages/protocol/src/node/evaluate.ts @@ -1,9 +1,14 @@ import * as vm from "vm"; -import { NewEvalMessage, TypedValue, EvalFailedMessage, EvalDoneMessage, ServerMessage } from "../proto"; +import { NewEvalMessage, TypedValue, EvalFailedMessage, EvalDoneMessage, ServerMessage, EvalEventMessage, ClientMessage } from "../proto"; import { SendableConnection } from "../common/connection"; +import { EventEmitter } from "events"; + +export interface ActiveEvaluation { + onEvent(msg: EvalEventMessage): void; +} declare var __non_webpack_require__: typeof require; -export const evaluate = async (connection: SendableConnection, message: NewEvalMessage): Promise => { +export const evaluate = (connection: SendableConnection, message: NewEvalMessage, onDispose: () => void): ActiveEvaluation | void => { const argStr: string[] = []; message.getArgsList().forEach((value) => { argStr.push(value); @@ -50,18 +55,55 @@ export const evaluate = async (connection: SendableConnection, message: NewEvalM serverMsg.setEvalFailed(evalFailed); connection.send(serverMsg.serializeBinary()); }; + let eventEmitter: EventEmitter | undefined; try { - const value = vm.runInNewContext(`(${message.getFunction()})(${argStr.join(",")})`, { + if (message.getActive()) { + eventEmitter = new EventEmitter(); + } + + const value = vm.runInNewContext(`(${message.getFunction()})(${eventEmitter ? `eventEmitter, ` : ""}${argStr.join(",")})`, { + eventEmitter: eventEmitter ? { + on: (event: string, cb: (...args: any[]) => void): void => { + eventEmitter!.on(event, cb); + }, + emit: (event: string, ...args: any[]): void => { + const eventMsg = new EvalEventMessage(); + eventMsg.setEvent(event); + eventMsg.setArgsList(args.filter(a => a).map(a => JSON.stringify(a))); + eventMsg.setId(message.getId()); + const serverMsg = new ServerMessage(); + serverMsg.setEvalEvent(eventMsg); + connection.send(serverMsg.serializeBinary()); + }, + } : undefined, Buffer, require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require, _require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require, tslib_1: require("tslib"), // TODO: is there a better way to do this? setTimeout, }, { - timeout: message.getTimeout() || 30000, - }); - sendResp(await value); + timeout: message.getTimeout() || 15000, + }); + if (eventEmitter) { + // Is an active evaluation and should NOT be ended + eventEmitter.on("close", () => onDispose()); + eventEmitter.on("error", () => onDispose()); + } else { + if ((value as Promise).then) { + // Is promise + (value as Promise).then(r => sendResp(r)).catch(ex => sendErr(EvalFailedMessage.Reason.EXCEPTION, ex.toString())); + } else { + sendResp(value); + } + onDispose(); + } } catch (ex) { sendErr(EvalFailedMessage.Reason.EXCEPTION, ex.toString()); } + + return eventEmitter ? { + onEvent: (eventMsg: EvalEventMessage): void => { + eventEmitter!.emit(eventMsg.getEvent(), ...eventMsg.getArgsList().map(a => JSON.parse(a))); + }, + } : undefined; }; diff --git a/packages/protocol/src/node/server.ts b/packages/protocol/src/node/server.ts index 48be96c1..3120eaba 100644 --- a/packages/protocol/src/node/server.ts +++ b/packages/protocol/src/node/server.ts @@ -6,7 +6,7 @@ import { promisify } from "util"; import { TextDecoder } from "text-encoding"; import { logger, field } from "@coder/logger"; import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto"; -import { evaluate } from "./evaluate"; +import { evaluate, ActiveEvaluation } from "./evaluate"; import { ReadWriteConnection } from "../common/connection"; import { Process, handleNewSession, handleNewConnection, handleNewServer } from "./command"; import * as net from "net"; @@ -23,6 +23,7 @@ export class Server { private readonly sessions: Map = new Map(); private readonly connections: Map = new Map(); private readonly servers: Map = new Map(); + private readonly evals: Map = new Map(); private connectionId: number = Number.MAX_SAFE_INTEGER; @@ -114,7 +115,18 @@ export class Server { field("args", evalMessage.getArgsList()), field("function", evalMessage.getFunction()), ]); - evaluate(this.connection, evalMessage); + const resp = evaluate(this.connection, evalMessage, () => { + this.evals.delete(evalMessage.getId()); + }); + if (resp) { + this.evals.set(evalMessage.getId(), resp); + } + } else if (message.hasEvalEvent()) { + const e = this.evals.get(message.getEvalEvent()!.getId()); + if (!e) { + return; + } + e.onEvent(message.getEvalEvent()!); } else if (message.hasNewSession()) { const sessionMessage = message.getNewSession()!; logger.debug("NewSession", field("id", sessionMessage.getId())); diff --git a/packages/protocol/src/proto/client.proto b/packages/protocol/src/proto/client.proto index a1ca0f89..2c6dfca3 100644 --- a/packages/protocol/src/proto/client.proto +++ b/packages/protocol/src/proto/client.proto @@ -19,6 +19,7 @@ message ClientMessage { // node.proto NewEvalMessage new_eval = 11; + EvalEventMessage eval_event = 12; } } @@ -41,11 +42,12 @@ message ServerMessage { // node.proto EvalFailedMessage eval_failed = 13; EvalDoneMessage eval_done = 14; + EvalEventMessage eval_event = 15; - WorkingInitMessage init = 15; + WorkingInitMessage init = 16; // vscode.proto - SharedProcessActiveMessage shared_process_active = 16; + SharedProcessActiveMessage shared_process_active = 17; } } diff --git a/packages/protocol/src/proto/client_pb.d.ts b/packages/protocol/src/proto/client_pb.d.ts index b0eab263..a81b1fbb 100644 --- a/packages/protocol/src/proto/client_pb.d.ts +++ b/packages/protocol/src/proto/client_pb.d.ts @@ -62,6 +62,11 @@ export class ClientMessage extends jspb.Message { getNewEval(): node_pb.NewEvalMessage | undefined; setNewEval(value?: node_pb.NewEvalMessage): void; + hasEvalEvent(): boolean; + clearEvalEvent(): void; + getEvalEvent(): node_pb.EvalEventMessage | undefined; + setEvalEvent(value?: node_pb.EvalEventMessage): void; + getMsgCase(): ClientMessage.MsgCase; serializeBinary(): Uint8Array; toObject(includeInstance?: boolean): ClientMessage.AsObject; @@ -86,6 +91,7 @@ export namespace ClientMessage { newServer?: command_pb.NewServerMessage.AsObject, serverClose?: command_pb.ServerCloseMessage.AsObject, newEval?: node_pb.NewEvalMessage.AsObject, + evalEvent?: node_pb.EvalEventMessage.AsObject, } export enum MsgCase { @@ -101,6 +107,7 @@ export namespace ClientMessage { NEW_SERVER = 9, SERVER_CLOSE = 10, NEW_EVAL = 11, + EVAL_EVENT = 12, } } @@ -175,6 +182,11 @@ export class ServerMessage extends jspb.Message { getEvalDone(): node_pb.EvalDoneMessage | undefined; setEvalDone(value?: node_pb.EvalDoneMessage): void; + hasEvalEvent(): boolean; + clearEvalEvent(): void; + getEvalEvent(): node_pb.EvalEventMessage | undefined; + setEvalEvent(value?: node_pb.EvalEventMessage): void; + hasInit(): boolean; clearInit(): void; getInit(): WorkingInitMessage | undefined; @@ -212,6 +224,7 @@ export namespace ServerMessage { serverConnectionEstablished?: command_pb.ServerConnectionEstablishedMessage.AsObject, evalFailed?: node_pb.EvalFailedMessage.AsObject, evalDone?: node_pb.EvalDoneMessage.AsObject, + evalEvent?: node_pb.EvalEventMessage.AsObject, init?: WorkingInitMessage.AsObject, sharedProcessActive?: vscode_pb.SharedProcessActiveMessage.AsObject, } @@ -232,8 +245,9 @@ export namespace ServerMessage { SERVER_CONNECTION_ESTABLISHED = 12, EVAL_FAILED = 13, EVAL_DONE = 14, - INIT = 15, - SHARED_PROCESS_ACTIVE = 16, + EVAL_EVENT = 15, + INIT = 16, + SHARED_PROCESS_ACTIVE = 17, } } diff --git a/packages/protocol/src/proto/client_pb.js b/packages/protocol/src/proto/client_pb.js index 05effb8a..7f6453e0 100644 --- a/packages/protocol/src/proto/client_pb.js +++ b/packages/protocol/src/proto/client_pb.js @@ -42,7 +42,7 @@ if (goog.DEBUG && !COMPILED) { * @private {!Array>} * @const */ -proto.ClientMessage.oneofGroups_ = [[1,2,3,4,5,6,7,8,9,10,11]]; +proto.ClientMessage.oneofGroups_ = [[1,2,3,4,5,6,7,8,9,10,11,12]]; /** * @enum {number} @@ -59,7 +59,8 @@ proto.ClientMessage.MsgCase = { CONNECTION_CLOSE: 8, NEW_SERVER: 9, SERVER_CLOSE: 10, - NEW_EVAL: 11 + NEW_EVAL: 11, + EVAL_EVENT: 12 }; /** @@ -107,7 +108,8 @@ proto.ClientMessage.toObject = function(includeInstance, msg) { connectionClose: (f = msg.getConnectionClose()) && command_pb.ConnectionCloseMessage.toObject(includeInstance, f), newServer: (f = msg.getNewServer()) && command_pb.NewServerMessage.toObject(includeInstance, f), serverClose: (f = msg.getServerClose()) && command_pb.ServerCloseMessage.toObject(includeInstance, f), - newEval: (f = msg.getNewEval()) && node_pb.NewEvalMessage.toObject(includeInstance, f) + newEval: (f = msg.getNewEval()) && node_pb.NewEvalMessage.toObject(includeInstance, f), + evalEvent: (f = msg.getEvalEvent()) && node_pb.EvalEventMessage.toObject(includeInstance, f) }; if (includeInstance) { @@ -199,6 +201,11 @@ proto.ClientMessage.deserializeBinaryFromReader = function(msg, reader) { reader.readMessage(value,node_pb.NewEvalMessage.deserializeBinaryFromReader); msg.setNewEval(value); break; + case 12: + var value = new node_pb.EvalEventMessage; + reader.readMessage(value,node_pb.EvalEventMessage.deserializeBinaryFromReader); + msg.setEvalEvent(value); + break; default: reader.skipField(); break; @@ -325,6 +332,14 @@ proto.ClientMessage.prototype.serializeBinaryToWriter = function (writer) { node_pb.NewEvalMessage.serializeBinaryToWriter ); } + f = this.getEvalEvent(); + if (f != null) { + writer.writeMessage( + 12, + f, + node_pb.EvalEventMessage.serializeBinaryToWriter + ); + } }; @@ -667,6 +682,36 @@ proto.ClientMessage.prototype.hasNewEval = function() { }; +/** + * optional EvalEventMessage eval_event = 12; + * @return {proto.EvalEventMessage} + */ +proto.ClientMessage.prototype.getEvalEvent = function() { + return /** @type{proto.EvalEventMessage} */ ( + jspb.Message.getWrapperField(this, node_pb.EvalEventMessage, 12)); +}; + + +/** @param {proto.EvalEventMessage|undefined} value */ +proto.ClientMessage.prototype.setEvalEvent = function(value) { + jspb.Message.setOneofWrapperField(this, 12, proto.ClientMessage.oneofGroups_[0], value); +}; + + +proto.ClientMessage.prototype.clearEvalEvent = function() { + this.setEvalEvent(undefined); +}; + + +/** + * Returns whether this field is set. + * @return{!boolean} + */ +proto.ClientMessage.prototype.hasEvalEvent = function() { + return jspb.Message.getField(this, 12) != null; +}; + + /** * Generated by JsPbCodeGenerator. @@ -693,7 +738,7 @@ if (goog.DEBUG && !COMPILED) { * @private {!Array>} * @const */ -proto.ServerMessage.oneofGroups_ = [[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]]; +proto.ServerMessage.oneofGroups_ = [[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]]; /** * @enum {number} @@ -714,8 +759,9 @@ proto.ServerMessage.MsgCase = { SERVER_CONNECTION_ESTABLISHED: 12, EVAL_FAILED: 13, EVAL_DONE: 14, - INIT: 15, - SHARED_PROCESS_ACTIVE: 16 + EVAL_EVENT: 15, + INIT: 16, + SHARED_PROCESS_ACTIVE: 17 }; /** @@ -767,6 +813,7 @@ proto.ServerMessage.toObject = function(includeInstance, msg) { serverConnectionEstablished: (f = msg.getServerConnectionEstablished()) && command_pb.ServerConnectionEstablishedMessage.toObject(includeInstance, f), evalFailed: (f = msg.getEvalFailed()) && node_pb.EvalFailedMessage.toObject(includeInstance, f), evalDone: (f = msg.getEvalDone()) && node_pb.EvalDoneMessage.toObject(includeInstance, f), + evalEvent: (f = msg.getEvalEvent()) && node_pb.EvalEventMessage.toObject(includeInstance, f), init: (f = msg.getInit()) && proto.WorkingInitMessage.toObject(includeInstance, f), sharedProcessActive: (f = msg.getSharedProcessActive()) && vscode_pb.SharedProcessActiveMessage.toObject(includeInstance, f) }; @@ -876,11 +923,16 @@ proto.ServerMessage.deserializeBinaryFromReader = function(msg, reader) { msg.setEvalDone(value); break; case 15: + var value = new node_pb.EvalEventMessage; + reader.readMessage(value,node_pb.EvalEventMessage.deserializeBinaryFromReader); + msg.setEvalEvent(value); + break; + case 16: var value = new proto.WorkingInitMessage; reader.readMessage(value,proto.WorkingInitMessage.deserializeBinaryFromReader); msg.setInit(value); break; - case 16: + case 17: var value = new vscode_pb.SharedProcessActiveMessage; reader.readMessage(value,vscode_pb.SharedProcessActiveMessage.deserializeBinaryFromReader); msg.setSharedProcessActive(value); @@ -1035,18 +1087,26 @@ proto.ServerMessage.prototype.serializeBinaryToWriter = function (writer) { node_pb.EvalDoneMessage.serializeBinaryToWriter ); } - f = this.getInit(); + f = this.getEvalEvent(); if (f != null) { writer.writeMessage( 15, f, + node_pb.EvalEventMessage.serializeBinaryToWriter + ); + } + f = this.getInit(); + if (f != null) { + writer.writeMessage( + 16, + f, proto.WorkingInitMessage.serializeBinaryToWriter ); } f = this.getSharedProcessActive(); if (f != null) { writer.writeMessage( - 16, + 17, f, vscode_pb.SharedProcessActiveMessage.serializeBinaryToWriter ); @@ -1484,18 +1544,48 @@ proto.ServerMessage.prototype.hasEvalDone = function() { /** - * optional WorkingInitMessage init = 15; + * optional EvalEventMessage eval_event = 15; + * @return {proto.EvalEventMessage} + */ +proto.ServerMessage.prototype.getEvalEvent = function() { + return /** @type{proto.EvalEventMessage} */ ( + jspb.Message.getWrapperField(this, node_pb.EvalEventMessage, 15)); +}; + + +/** @param {proto.EvalEventMessage|undefined} value */ +proto.ServerMessage.prototype.setEvalEvent = function(value) { + jspb.Message.setOneofWrapperField(this, 15, proto.ServerMessage.oneofGroups_[0], value); +}; + + +proto.ServerMessage.prototype.clearEvalEvent = function() { + this.setEvalEvent(undefined); +}; + + +/** + * Returns whether this field is set. + * @return{!boolean} + */ +proto.ServerMessage.prototype.hasEvalEvent = function() { + return jspb.Message.getField(this, 15) != null; +}; + + +/** + * optional WorkingInitMessage init = 16; * @return {proto.WorkingInitMessage} */ proto.ServerMessage.prototype.getInit = function() { return /** @type{proto.WorkingInitMessage} */ ( - jspb.Message.getWrapperField(this, proto.WorkingInitMessage, 15)); + jspb.Message.getWrapperField(this, proto.WorkingInitMessage, 16)); }; /** @param {proto.WorkingInitMessage|undefined} value */ proto.ServerMessage.prototype.setInit = function(value) { - jspb.Message.setOneofWrapperField(this, 15, proto.ServerMessage.oneofGroups_[0], value); + jspb.Message.setOneofWrapperField(this, 16, proto.ServerMessage.oneofGroups_[0], value); }; @@ -1509,23 +1599,23 @@ proto.ServerMessage.prototype.clearInit = function() { * @return{!boolean} */ proto.ServerMessage.prototype.hasInit = function() { - return jspb.Message.getField(this, 15) != null; + return jspb.Message.getField(this, 16) != null; }; /** - * optional SharedProcessActiveMessage shared_process_active = 16; + * optional SharedProcessActiveMessage shared_process_active = 17; * @return {proto.SharedProcessActiveMessage} */ proto.ServerMessage.prototype.getSharedProcessActive = function() { return /** @type{proto.SharedProcessActiveMessage} */ ( - jspb.Message.getWrapperField(this, vscode_pb.SharedProcessActiveMessage, 16)); + jspb.Message.getWrapperField(this, vscode_pb.SharedProcessActiveMessage, 17)); }; /** @param {proto.SharedProcessActiveMessage|undefined} value */ proto.ServerMessage.prototype.setSharedProcessActive = function(value) { - jspb.Message.setOneofWrapperField(this, 16, proto.ServerMessage.oneofGroups_[0], value); + jspb.Message.setOneofWrapperField(this, 17, proto.ServerMessage.oneofGroups_[0], value); }; @@ -1539,7 +1629,7 @@ proto.ServerMessage.prototype.clearSharedProcessActive = function() { * @return{!boolean} */ proto.ServerMessage.prototype.hasSharedProcessActive = function() { - return jspb.Message.getField(this, 16) != null; + return jspb.Message.getField(this, 17) != null; }; diff --git a/packages/protocol/src/proto/node.proto b/packages/protocol/src/proto/node.proto index 3a9449d5..d305dc3c 100644 --- a/packages/protocol/src/proto/node.proto +++ b/packages/protocol/src/proto/node.proto @@ -17,6 +17,15 @@ message NewEvalMessage { repeated string args = 3; // Timeout in ms uint32 timeout = 4; + // Create active eval message. + // Allows for dynamic communication for an eval + bool active = 5; +} + +message EvalEventMessage { + uint64 id = 1; + string event = 2; + repeated string args = 3; } message EvalFailedMessage { diff --git a/packages/protocol/src/proto/node_pb.d.ts b/packages/protocol/src/proto/node_pb.d.ts index 6b68db93..ef4052a7 100644 --- a/packages/protocol/src/proto/node_pb.d.ts +++ b/packages/protocol/src/proto/node_pb.d.ts @@ -49,6 +49,9 @@ export class NewEvalMessage extends jspb.Message { getTimeout(): number; setTimeout(value: number): void; + getActive(): boolean; + setActive(value: boolean): void; + serializeBinary(): Uint8Array; toObject(includeInstance?: boolean): NewEvalMessage.AsObject; static toObject(includeInstance: boolean, msg: NewEvalMessage): NewEvalMessage.AsObject; @@ -65,6 +68,37 @@ export namespace NewEvalMessage { pb_function: string, argsList: Array, timeout: number, + active: boolean, + } +} + +export class EvalEventMessage extends jspb.Message { + getId(): number; + setId(value: number): void; + + getEvent(): string; + setEvent(value: string): void; + + clearArgsList(): void; + getArgsList(): Array; + setArgsList(value: Array): void; + addArgs(value: string, index?: number): string; + + serializeBinary(): Uint8Array; + toObject(includeInstance?: boolean): EvalEventMessage.AsObject; + static toObject(includeInstance: boolean, msg: EvalEventMessage): EvalEventMessage.AsObject; + static extensions: {[key: number]: jspb.ExtensionFieldInfo}; + static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo}; + static serializeBinaryToWriter(message: EvalEventMessage, writer: jspb.BinaryWriter): void; + static deserializeBinary(bytes: Uint8Array): EvalEventMessage; + static deserializeBinaryFromReader(message: EvalEventMessage, reader: jspb.BinaryReader): EvalEventMessage; +} + +export namespace EvalEventMessage { + export type AsObject = { + id: number, + event: string, + argsList: Array, } } diff --git a/packages/protocol/src/proto/node_pb.js b/packages/protocol/src/proto/node_pb.js index d5907d58..e15bbef0 100644 --- a/packages/protocol/src/proto/node_pb.js +++ b/packages/protocol/src/proto/node_pb.js @@ -10,6 +10,7 @@ var goog = jspb; var global = Function('return this')(); goog.exportSymbol('proto.EvalDoneMessage', null, global); +goog.exportSymbol('proto.EvalEventMessage', null, global); goog.exportSymbol('proto.EvalFailedMessage', null, global); goog.exportSymbol('proto.EvalFailedMessage.Reason', null, global); goog.exportSymbol('proto.NewEvalMessage', null, global); @@ -267,7 +268,8 @@ proto.NewEvalMessage.toObject = function(includeInstance, msg) { id: msg.getId(), pb_function: msg.getFunction(), argsList: jspb.Message.getField(msg, 3), - timeout: msg.getTimeout() + timeout: msg.getTimeout(), + active: msg.getActive() }; if (includeInstance) { @@ -321,6 +323,10 @@ proto.NewEvalMessage.deserializeBinaryFromReader = function(msg, reader) { var value = /** @type {number} */ (reader.readUint32()); msg.setTimeout(value); break; + case 5: + var value = /** @type {boolean} */ (reader.readBool()); + msg.setActive(value); + break; default: reader.skipField(); break; @@ -387,6 +393,13 @@ proto.NewEvalMessage.prototype.serializeBinaryToWriter = function (writer) { f ); } + f = this.getActive(); + if (f) { + writer.writeBool( + 5, + f + ); + } }; @@ -466,6 +479,251 @@ proto.NewEvalMessage.prototype.setTimeout = function(value) { }; +/** + * optional bool active = 5; + * Note that Boolean fields may be set to 0/1 when serialized from a Java server. + * You should avoid comparisons like {@code val === true/false} in those cases. + * @return {boolean} + */ +proto.NewEvalMessage.prototype.getActive = function() { + return /** @type {boolean} */ (jspb.Message.getFieldProto3(this, 5, false)); +}; + + +/** @param {boolean} value */ +proto.NewEvalMessage.prototype.setActive = function(value) { + jspb.Message.setField(this, 5, value); +}; + + + +/** + * Generated by JsPbCodeGenerator. + * @param {Array=} opt_data Optional initial data array, typically from a + * server response, or constructed directly in Javascript. The array is used + * in place and becomes part of the constructed object. It is not cloned. + * If no data is provided, the constructed object will be empty, but still + * valid. + * @extends {jspb.Message} + * @constructor + */ +proto.EvalEventMessage = function(opt_data) { + jspb.Message.initialize(this, opt_data, 0, -1, proto.EvalEventMessage.repeatedFields_, null); +}; +goog.inherits(proto.EvalEventMessage, jspb.Message); +if (goog.DEBUG && !COMPILED) { + proto.EvalEventMessage.displayName = 'proto.EvalEventMessage'; +} +/** + * List of repeated fields within this message type. + * @private {!Array} + * @const + */ +proto.EvalEventMessage.repeatedFields_ = [3]; + + + +if (jspb.Message.GENERATE_TO_OBJECT) { +/** + * Creates an object representation of this proto suitable for use in Soy templates. + * Field names that are reserved in JavaScript and will be renamed to pb_name. + * To access a reserved field use, foo.pb_, eg, foo.pb_default. + * For the list of reserved names please see: + * com.google.apps.jspb.JsClassTemplate.JS_RESERVED_WORDS. + * @param {boolean=} opt_includeInstance Whether to include the JSPB instance + * for transitional soy proto support: http://goto/soy-param-migration + * @return {!Object} + */ +proto.EvalEventMessage.prototype.toObject = function(opt_includeInstance) { + return proto.EvalEventMessage.toObject(opt_includeInstance, this); +}; + + +/** + * Static version of the {@see toObject} method. + * @param {boolean|undefined} includeInstance Whether to include the JSPB + * instance for transitional soy proto support: + * http://goto/soy-param-migration + * @param {!proto.EvalEventMessage} msg The msg instance to transform. + * @return {!Object} + */ +proto.EvalEventMessage.toObject = function(includeInstance, msg) { + var f, obj = { + id: msg.getId(), + event: msg.getEvent(), + argsList: jspb.Message.getField(msg, 3) + }; + + if (includeInstance) { + obj.$jspbMessageInstance = msg; + } + return obj; +}; +} + + +/** + * Deserializes binary data (in protobuf wire format). + * @param {jspb.ByteSource} bytes The bytes to deserialize. + * @return {!proto.EvalEventMessage} + */ +proto.EvalEventMessage.deserializeBinary = function(bytes) { + var reader = new jspb.BinaryReader(bytes); + var msg = new proto.EvalEventMessage; + return proto.EvalEventMessage.deserializeBinaryFromReader(msg, reader); +}; + + +/** + * Deserializes binary data (in protobuf wire format) from the + * given reader into the given message object. + * @param {!proto.EvalEventMessage} msg The message object to deserialize into. + * @param {!jspb.BinaryReader} reader The BinaryReader to use. + * @return {!proto.EvalEventMessage} + */ +proto.EvalEventMessage.deserializeBinaryFromReader = function(msg, reader) { + while (reader.nextField()) { + if (reader.isEndGroup()) { + break; + } + var field = reader.getFieldNumber(); + switch (field) { + case 1: + var value = /** @type {number} */ (reader.readUint64()); + msg.setId(value); + break; + case 2: + var value = /** @type {string} */ (reader.readString()); + msg.setEvent(value); + break; + case 3: + var value = /** @type {string} */ (reader.readString()); + msg.getArgsList().push(value); + msg.setArgsList(msg.getArgsList()); + break; + default: + reader.skipField(); + break; + } + } + return msg; +}; + + +/** + * Class method variant: serializes the given message to binary data + * (in protobuf wire format), writing to the given BinaryWriter. + * @param {!proto.EvalEventMessage} message + * @param {!jspb.BinaryWriter} writer + */ +proto.EvalEventMessage.serializeBinaryToWriter = function(message, writer) { + message.serializeBinaryToWriter(writer); +}; + + +/** + * Serializes the message to binary data (in protobuf wire format). + * @return {!Uint8Array} + */ +proto.EvalEventMessage.prototype.serializeBinary = function() { + var writer = new jspb.BinaryWriter(); + this.serializeBinaryToWriter(writer); + return writer.getResultBuffer(); +}; + + +/** + * Serializes the message to binary data (in protobuf wire format), + * writing to the given BinaryWriter. + * @param {!jspb.BinaryWriter} writer + */ +proto.EvalEventMessage.prototype.serializeBinaryToWriter = function (writer) { + var f = undefined; + f = this.getId(); + if (f !== 0) { + writer.writeUint64( + 1, + f + ); + } + f = this.getEvent(); + if (f.length > 0) { + writer.writeString( + 2, + f + ); + } + f = this.getArgsList(); + if (f.length > 0) { + writer.writeRepeatedString( + 3, + f + ); + } +}; + + +/** + * Creates a deep clone of this proto. No data is shared with the original. + * @return {!proto.EvalEventMessage} The clone. + */ +proto.EvalEventMessage.prototype.cloneMessage = function() { + return /** @type {!proto.EvalEventMessage} */ (jspb.Message.cloneMessage(this)); +}; + + +/** + * optional uint64 id = 1; + * @return {number} + */ +proto.EvalEventMessage.prototype.getId = function() { + return /** @type {number} */ (jspb.Message.getFieldProto3(this, 1, 0)); +}; + + +/** @param {number} value */ +proto.EvalEventMessage.prototype.setId = function(value) { + jspb.Message.setField(this, 1, value); +}; + + +/** + * optional string event = 2; + * @return {string} + */ +proto.EvalEventMessage.prototype.getEvent = function() { + return /** @type {string} */ (jspb.Message.getFieldProto3(this, 2, "")); +}; + + +/** @param {string} value */ +proto.EvalEventMessage.prototype.setEvent = function(value) { + jspb.Message.setField(this, 2, value); +}; + + +/** + * repeated string args = 3; + * If you change this array by adding, removing or replacing elements, or if you + * replace the array itself, then you must call the setter to update it. + * @return {!Array.} + */ +proto.EvalEventMessage.prototype.getArgsList = function() { + return /** @type {!Array.} */ (jspb.Message.getField(this, 3)); +}; + + +/** @param {Array.} value */ +proto.EvalEventMessage.prototype.setArgsList = function(value) { + jspb.Message.setField(this, 3, value || []); +}; + + +proto.EvalEventMessage.prototype.clearArgsList = function() { + jspb.Message.setField(this, 3, []); +}; + + /** * Generated by JsPbCodeGenerator. diff --git a/packages/protocol/test/evaluate.test.ts b/packages/protocol/test/evaluate.test.ts index f8dfe926..46ef9996 100644 --- a/packages/protocol/test/evaluate.test.ts +++ b/packages/protocol/test/evaluate.test.ts @@ -55,4 +55,18 @@ describe("Evaluate", () => { expect(value).toEqual("donkey"); }, 250); + + it("should do active process", (done) => { + const runner = client.run((ae) => { + ae.on("1", () => { + ae.emit("2"); + ae.on("3", () => { + ae.emit("close"); + }); + }); + }); + runner.emit("1"); + runner.on("2", () => runner.emit("3")); + runner.on("close", () => done()); + }); }); \ No newline at end of file diff --git a/packages/protocol/test/modules/fs.test.ts b/packages/protocol/test/modules/fs.test.ts index a2879e11..839f23b7 100644 --- a/packages/protocol/test/modules/fs.test.ts +++ b/packages/protocol/test/modules/fs.test.ts @@ -117,6 +117,23 @@ describe("fs", () => { }); }); + describe("createWriteStream", () => { + it("should write to file", (done) => { + const file = tmpFile(); + const content = "howdy\nhow\nr\nu"; + const stream = fs.createWriteStream(file); + stream.on("open", (fd) => { + expect(fd).toBeDefined(); + stream.write(content); + stream.close(); + }); + stream.on("close", () => { + expect(nativeFs.readFileSync(file).toString()).toEqual(content); + done(); + }); + }); + }); + describe("exists", () => { it("should output file exists", (done) => { fs.exists(testFile, (exists) => {