Add active evals (#25)

* Add active evals

* Convert type of stats to date or string

* Fix generic overloads for run

* Lower evaluate timeout

* Add comment for createWriteStream
This commit is contained in:
Kyle Carberry
2019-01-29 18:48:02 -06:00
parent 3a88ae5fb2
commit 20f5d8eeed
13 changed files with 640 additions and 40 deletions

View File

@@ -1,8 +1,9 @@
import { ReadWriteConnection, InitData, OperatingSystem, ISharedProcessData } from "../common/connection";
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage } from "../proto";
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, EvalEventMessage } from "../proto";
import { Emitter, Event } from "@coder/events";
import { logger, field } from "@coder/logger";
import { ChildProcess, SpawnOptions, ForkOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command";
import { ChildProcess, SpawnOptions, ForkOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server, ActiveEval } from "./command";
import { EventEmitter } from "events";
/**
* Client accepts an arbitrary connection intended to communicate with the Server.
@@ -14,6 +15,7 @@ export class Client {
private evalId: number = 0;
private evalDoneEmitter: Emitter<EvalDoneMessage> = new Emitter();
private evalFailedEmitter: Emitter<EvalFailedMessage> = new Emitter();
private evalEventEmitter: Emitter<EvalEventMessage> = new Emitter();
private sessionId: number = 0;
private readonly sessions: Map<number, ServerProcess> = new Map();
@@ -66,6 +68,47 @@ export class Client {
return this.sharedProcessActiveEmitter.event;
}
public run(func: (ae: ActiveEval) => void | Promise<void>): ActiveEval;
public run<T1>(func: (ae: ActiveEval, a1: T1) => void | Promise<void>, a1: T1): ActiveEval;
public run<T1, T2>(func: (ae: ActiveEval, a1: T1, a2: T2) => void | Promise<void>, a1: T1, a2: T2): ActiveEval;
public run<T1, T2, T3>(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3) => void | Promise<void>, a1: T1, a2: T2, a3: T3): ActiveEval;
public run<T1, T2, T3, T4>(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3, a4: T4) => void | Promise<void>, a1: T1, a2: T2, a3: T3, a4: T4): ActiveEval;
public run<T1, T2, T3, T4, T5>(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5) => void | Promise<void>, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5): ActiveEval;
public run<T1, T2, T3, T4, T5, T6>(func: (ae: ActiveEval, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6) => void | Promise<void>, a1: T1, a2: T2, a3: T3, a4: T4, a5: T5, a6: T6): ActiveEval;
public run<T1, T2, T3, T4, T5, T6>(func: (ae: ActiveEval, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6) => void | Promise<void>, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6): ActiveEval {
const doEval = this.doEvaluate(func, a1, a2, a3, a4, a5, a6, true);
const eventEmitter = new EventEmitter();
const d1 = this.evalEventEmitter.event((msg) => {
if (msg.getId() !== doEval.id) {
return;
}
eventEmitter.emit(msg.getEvent(), ...msg.getArgsList().filter(a => a).map(s => JSON.parse(s)));
});
doEval.completed.then(() => {
d1.dispose();
eventEmitter.emit("close");
}).catch((ex) => {
d1.dispose();
eventEmitter.emit("error", ex);
});
return {
on: (event: string, cb: (...args: any[]) => void) => eventEmitter.on(event, cb),
emit: (event: string, ...args: any[]) => {
const eventsMsg = new EvalEventMessage();
eventsMsg.setId(doEval.id);
eventsMsg.setEvent(event);
eventsMsg.setArgsList(args.filter(a => a).map(a => JSON.stringify(a)));
const clientMsg = new ClientMessage();
clientMsg.setEvalEvent(eventsMsg);
this.connection.send(clientMsg.serializeBinary());
},
};
}
public evaluate<R>(func: () => R | Promise<R>): Promise<R>;
public evaluate<R, T1>(func: (a1: T1) => R | Promise<R>, a1: T1): Promise<R>;
public evaluate<R, T1, T2>(func: (a1: T1, a2: T2) => R | Promise<R>, a1: T1, a2: T2): Promise<R>;
@@ -86,9 +129,17 @@ export class Client {
* @returns Promise rejected or resolved from the evaluated function
*/
public evaluate<R, T1, T2, T3, T4, T5, T6>(func: (a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6) => R | Promise<R>, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6): Promise<R> {
return this.doEvaluate(func, a1, a2, a3, a4, a5, a6, false).completed;
}
private doEvaluate<R, T1, T2, T3, T4, T5, T6>(func: (...args: any[]) => void | Promise<void> | R | Promise<R>, a1?: T1, a2?: T2, a3?: T3, a4?: T4, a5?: T5, a6?: T6, active: boolean = false): {
readonly completed: Promise<R>;
readonly id: number;
} {
const newEval = new NewEvalMessage();
const id = this.evalId++;
newEval.setId(id);
newEval.setActive(active);
newEval.setArgsList([a1, a2, a3, a4, a5, a6].filter(a => typeof a !== "undefined").map(a => JSON.stringify(a)));
newEval.setFunction(func.toString());
@@ -148,7 +199,10 @@ export class Client {
}
});
return prom;
return {
completed: prom,
id,
};
}
/**
@@ -279,6 +333,8 @@ export class Client {
this.evalDoneEmitter.emit(message.getEvalDone()!);
} else if (message.hasEvalFailed()) {
this.evalFailedEmitter.emit(message.getEvalFailed()!);
} else if (message.hasEvalEvent()) {
this.evalEventEmitter.emit(message.getEvalEvent()!);
} else if (message.hasNewSessionFailure()) {
const s = this.sessions.get(message.getNewSessionFailure()!.getId());
if (!s) {

View File

@@ -360,3 +360,11 @@ export class ServerListener extends events.EventEmitter implements Server {
}
}
export interface ActiveEval {
emit(event: string, ...args: any[]): void;
on(event: "close", cb: () => void): void;
on(event: "error", cb: (err: Error) => void): void;
on(event: string, cb: (...args: any[]) => void): void;
}

View File

@@ -1,6 +1,7 @@
import { exec, ChildProcess } from "child_process";
import { EventEmitter } from "events";
import * as fs from "fs";
import * as stream from "stream";
import { IEncodingOptions, IEncodingOptionsCallback, escapePath, useBuffer } from "../../common/util";
import { Client } from "../client";
@@ -112,8 +113,51 @@ export class FS {
});
}
public createWriteStream = (): void => {
throw new Error("createWriteStream not implemented");
/**
* This should NOT be used for long-term writes.
* The runnable will be killed after the timeout specified in evaluate.ts
*/
public createWriteStream = (path: fs.PathLike, options?: any): fs.WriteStream => {
const ae = this.client.run((ae, path, options) => {
const fs = _require("fs") as typeof import("fs");
const str = fs.createWriteStream(path, options);
ae.on("write", (d, e) => str.write(Buffer.from(d, e)));
ae.on("close", () => str.close());
str.on("close", () => ae.emit("close"));
str.on("open", (fd) => ae.emit("open", fd));
str.on("error", (err) => ae.emit(err));
}, path, options);
return new (class WriteStream extends stream.Writable implements fs.WriteStream {
private _bytesWritten: number = 0;
public constructor() {
super({
write: (data, encoding, cb) => {
this._bytesWritten += data.length;
ae.emit("write", Buffer.from(data, encoding), encoding);
cb();
},
});
ae.on("open", (a) => this.emit("open", a));
ae.on("close", () => this.emit("close"));
}
public get bytesWritten(): number {
return this._bytesWritten;
}
public get path(): string | Buffer {
return "";
}
public close(): void {
ae.emit("close");
}
}) as fs.WriteStream;
}
public exists = (path: fs.PathLike, callback: (exists: boolean) => void): void => {
@@ -667,10 +711,10 @@ interface IStats {
mtimeMs: number;
ctimeMs: number;
birthtimeMs: number;
atime: string;
mtime: string;
ctime: string;
birthtime: string;
atime: Date | string;
mtime: Date | string;
ctime: Date | string;
birthtime: Date | string;
_isFile: boolean;
_isDirectory: boolean;
_isBlockDevice: boolean;
@@ -687,7 +731,7 @@ class Stats implements fs.Stats {
public readonly ctime: Date;
public readonly birthtime: Date;
private constructor(private readonly stats: IStats) {
public constructor(private readonly stats: IStats) {
this.atime = new Date(stats.atime);
this.mtime = new Date(stats.mtime);
this.ctime = new Date(stats.ctime);