Add stdio sources (#17)

This commit is contained in:
Kyle Carberry 2019-01-23 11:52:58 -06:00
parent 704a0defc9
commit 4cd6bed8d2
No known key found for this signature in database
GPG Key ID: A0409BDB6B0B3EDB
10 changed files with 157 additions and 47 deletions

View File

@ -284,8 +284,18 @@ export class Client {
return;
}
const data = new TextDecoder().decode(output.getData_asU8());
const stream = output.getFd() === SessionOutputMessage.FD.STDOUT ? s.stdout : s.stderr;
stream.emit("data", data);
const source = output.getSource();
switch (source) {
case SessionOutputMessage.Source.STDOUT:
case SessionOutputMessage.Source.STDERR:
(source === SessionOutputMessage.Source.STDOUT ? s.stdout : s.stderr).emit("data", data);
break;
case SessionOutputMessage.Source.IPC:
s.emit("message", JSON.parse(data));
break;
default:
throw new Error(`Unknown source ${source}`);
}
} else if (message.hasIdentifySession()) {
const s = this.sessions.get(message.getIdentifySession()!.getId());
if (!s) {

View File

@ -23,8 +23,11 @@ export interface ChildProcess {
readonly pid: number | undefined;
kill(signal?: string): void;
send(message: string | Uint8Array): void;
send(message: string | Uint8Array, ipc?: false): void;
send(message: any, ipc: true): void;
on(event: "message", listener: (data: any) => void): void;
on(event: "error", listener: (err: Error) => void): void;
on(event: "exit", listener: (code: number, signal: string) => void): void;
@ -45,10 +48,6 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess {
private readonly hasTty: boolean = false,
) {
super();
this.connection.onMessage((message) => {
this.emit("message", message);
});
if (!this.hasTty) {
delete this.resize;
}
@ -71,10 +70,15 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess {
this._killed = true;
}
public send(message: string | Uint8Array): void {
public send(message: string | Uint8Array | any, ipc: boolean = false): void {
const send = new WriteToSessionMessage();
send.setId(this.id);
send.setSource(ipc ? WriteToSessionMessage.Source.IPC : WriteToSessionMessage.Source.STDIN);
if (ipc) {
send.setData(new TextEncoder().encode(JSON.stringify(message)));
} else {
send.setData(typeof message === "string" ? new TextEncoder().encode(message) : message);
}
const client = new ClientMessage();
client.setWriteToSession(send);
this.connection.send(client.serializeBinary());

View File

@ -8,6 +8,7 @@ import { SendableConnection } from "../common/connection";
import { ServerOptions } from "./server";
export interface Process {
stdio?: Array<stream.Readable | stream.Writable>;
stdin?: stream.Writable;
stdout?: stream.Readable;
stderr?: stream.Readable;
@ -69,27 +70,34 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
};
}
const sendOutput = (_fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => {
const sendOutput = (_source: SessionOutputMessage.Source, msg: string | Uint8Array): void => {
const serverMsg = new ServerMessage();
const d = new SessionOutputMessage();
d.setId(newSession.getId());
d.setData(typeof msg === "string" ? new TextEncoder().encode(msg) : msg);
d.setFd(SessionOutputMessage.FD.STDOUT);
d.setSource(_source);
serverMsg.setSessionOutput(d);
connection.send(serverMsg.serializeBinary());
};
if (process.stdout && process.stderr) {
process.stdout.on("data", (data) => {
sendOutput(SessionOutputMessage.FD.STDOUT, data);
sendOutput(SessionOutputMessage.Source.STDOUT, data);
});
process.stderr.on("data", (data) => {
sendOutput(SessionOutputMessage.FD.STDERR, data);
sendOutput(SessionOutputMessage.Source.STDERR, data);
});
} else {
process.on("data", (data) => {
sendOutput(SessionOutputMessage.FD.STDOUT, Buffer.from(data));
sendOutput(SessionOutputMessage.Source.STDOUT, Buffer.from(data));
});
}
if (process.stdio && process.stdio[3]) {
// We have ipc fd
process.stdio[3].on("data", (data) => {
sendOutput(SessionOutputMessage.Source.IPC, data);
});
}

View File

@ -1,11 +1,11 @@
import * as os from "os";
import * as cp from "child_process";
import * as path from "path";
import { mkdir } from "fs";
import { mkdir, WriteStream } from "fs";
import { promisify } from "util";
import { TextDecoder } from "text-encoding";
import { logger, field } from "@coder/logger";
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage } from "../proto";
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto";
import { evaluate } from "./evaluate";
import { ReadWriteConnection } from "../common/connection";
import { Process, handleNewSession, handleNewConnection } from "./command";
@ -120,7 +120,16 @@ export class Server {
if (!s) {
return;
}
s.write(new TextDecoder().decode(message.getWriteToSession()!.getData_asU8()));
const data = new TextDecoder().decode(message.getWriteToSession()!.getData_asU8());
const source = message.getWriteToSession()!.getSource();
if (source === WriteToSessionMessage.Source.IPC) {
if (!s.stdio || !s.stdio[3]) {
throw new Error("Cannot send message via IPC to process without IPC");
}
(s.stdio[3] as WriteStream).write(data);
} else {
s.write(data);
}
} else if (message.hasNewConnection()) {
const socket = handleNewConnection(this.connection, message.getNewConnection()!, () => {
this.connections.delete(message.getNewConnection()!.getId());

View File

@ -45,6 +45,11 @@ message IdentifySessionMessage {
message WriteToSessionMessage {
uint64 id = 1;
bytes data = 2;
enum Source {
Stdin = 0;
Ipc = 1;
}
Source source = 3;
}
// Resizes the TTY of the session identified by the id.
@ -67,11 +72,12 @@ message ShutdownSessionMessage {
// SessionOutputMessage carries data read from the stdout or stderr of the session identified by the id.
message SessionOutputMessage {
uint64 id = 1;
enum FD {
enum Source {
Stdout = 0;
Stderr = 1;
Ipc = 2;
}
FD fd = 2;
Source source = 2;
bytes data = 3;
}

View File

@ -144,6 +144,9 @@ export class WriteToSessionMessage extends jspb.Message {
getData_asB64(): string;
setData(value: Uint8Array | string): void;
getSource(): WriteToSessionMessage.Source;
setSource(value: WriteToSessionMessage.Source): void;
serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): WriteToSessionMessage.AsObject;
static toObject(includeInstance: boolean, msg: WriteToSessionMessage): WriteToSessionMessage.AsObject;
@ -158,6 +161,12 @@ export namespace WriteToSessionMessage {
export type AsObject = {
id: number,
data: Uint8Array | string,
source: WriteToSessionMessage.Source,
}
export enum Source {
STDIN = 0,
IPC = 1,
}
}
@ -235,8 +244,8 @@ export class SessionOutputMessage extends jspb.Message {
getId(): number;
setId(value: number): void;
getFd(): SessionOutputMessage.FD;
setFd(value: SessionOutputMessage.FD): void;
getSource(): SessionOutputMessage.Source;
setSource(value: SessionOutputMessage.Source): void;
getData(): Uint8Array | string;
getData_asU8(): Uint8Array;
@ -256,13 +265,14 @@ export class SessionOutputMessage extends jspb.Message {
export namespace SessionOutputMessage {
export type AsObject = {
id: number,
fd: SessionOutputMessage.FD,
source: SessionOutputMessage.Source,
data: Uint8Array | string,
}
export enum FD {
export enum Source {
STDOUT = 0,
STDERR = 1,
IPC = 2,
}
}

View File

@ -22,10 +22,11 @@ goog.exportSymbol('proto.NewSessionMessage', null, global);
goog.exportSymbol('proto.ResizeSessionTTYMessage', null, global);
goog.exportSymbol('proto.SessionDoneMessage', null, global);
goog.exportSymbol('proto.SessionOutputMessage', null, global);
goog.exportSymbol('proto.SessionOutputMessage.FD', null, global);
goog.exportSymbol('proto.SessionOutputMessage.Source', null, global);
goog.exportSymbol('proto.ShutdownSessionMessage', null, global);
goog.exportSymbol('proto.TTYDimensions', null, global);
goog.exportSymbol('proto.WriteToSessionMessage', null, global);
goog.exportSymbol('proto.WriteToSessionMessage.Source', null, global);
/**
* Generated by JsPbCodeGenerator.
@ -1047,7 +1048,8 @@ proto.WriteToSessionMessage.prototype.toObject = function(opt_includeInstance) {
proto.WriteToSessionMessage.toObject = function(includeInstance, msg) {
var f, obj = {
id: msg.getId(),
data: msg.getData_asB64()
data: msg.getData_asB64(),
source: msg.getSource()
};
if (includeInstance) {
@ -1092,6 +1094,10 @@ proto.WriteToSessionMessage.deserializeBinaryFromReader = function(msg, reader)
var value = /** @type {!Uint8Array} */ (reader.readBytes());
msg.setData(value);
break;
case 3:
var value = /** @type {!proto.WriteToSessionMessage.Source} */ (reader.readEnum());
msg.setSource(value);
break;
default:
reader.skipField();
break;
@ -1144,6 +1150,13 @@ proto.WriteToSessionMessage.prototype.serializeBinaryToWriter = function (writer
f
);
}
f = this.getSource();
if (f !== 0.0) {
writer.writeEnum(
3,
f
);
}
};
@ -1210,6 +1223,29 @@ proto.WriteToSessionMessage.prototype.setData = function(value) {
};
/**
* optional Source source = 3;
* @return {!proto.WriteToSessionMessage.Source}
*/
proto.WriteToSessionMessage.prototype.getSource = function() {
return /** @type {!proto.WriteToSessionMessage.Source} */ (jspb.Message.getFieldProto3(this, 3, 0));
};
/** @param {!proto.WriteToSessionMessage.Source} value */
proto.WriteToSessionMessage.prototype.setSource = function(value) {
jspb.Message.setField(this, 3, value);
};
/**
* @enum {number}
*/
proto.WriteToSessionMessage.Source = {
STDIN: 0,
IPC: 1
};
/**
* Generated by JsPbCodeGenerator.
@ -1805,7 +1841,7 @@ proto.SessionOutputMessage.prototype.toObject = function(opt_includeInstance) {
proto.SessionOutputMessage.toObject = function(includeInstance, msg) {
var f, obj = {
id: msg.getId(),
fd: msg.getFd(),
source: msg.getSource(),
data: msg.getData_asB64()
};
@ -1848,8 +1884,8 @@ proto.SessionOutputMessage.deserializeBinaryFromReader = function(msg, reader) {
msg.setId(value);
break;
case 2:
var value = /** @type {!proto.SessionOutputMessage.FD} */ (reader.readEnum());
msg.setFd(value);
var value = /** @type {!proto.SessionOutputMessage.Source} */ (reader.readEnum());
msg.setSource(value);
break;
case 3:
var value = /** @type {!Uint8Array} */ (reader.readBytes());
@ -1900,7 +1936,7 @@ proto.SessionOutputMessage.prototype.serializeBinaryToWriter = function (writer)
f
);
}
f = this.getFd();
f = this.getSource();
if (f !== 0.0) {
writer.writeEnum(
2,
@ -1942,16 +1978,16 @@ proto.SessionOutputMessage.prototype.setId = function(value) {
/**
* optional FD fd = 2;
* @return {!proto.SessionOutputMessage.FD}
* optional Source source = 2;
* @return {!proto.SessionOutputMessage.Source}
*/
proto.SessionOutputMessage.prototype.getFd = function() {
return /** @type {!proto.SessionOutputMessage.FD} */ (jspb.Message.getFieldProto3(this, 2, 0));
proto.SessionOutputMessage.prototype.getSource = function() {
return /** @type {!proto.SessionOutputMessage.Source} */ (jspb.Message.getFieldProto3(this, 2, 0));
};
/** @param {!proto.SessionOutputMessage.FD} value */
proto.SessionOutputMessage.prototype.setFd = function(value) {
/** @param {!proto.SessionOutputMessage.Source} value */
proto.SessionOutputMessage.prototype.setSource = function(value) {
jspb.Message.setField(this, 2, value);
};
@ -1998,9 +2034,10 @@ proto.SessionOutputMessage.prototype.setData = function(value) {
/**
* @enum {number}
*/
proto.SessionOutputMessage.FD = {
proto.SessionOutputMessage.Source = {
STDOUT: 0,
STDERR: 1
STDERR: 1,
IPC: 2
};

View File

@ -1,3 +1,4 @@
import * as cp from "child_process";
import * as net from "net";
import * as os from "os";
import * as path from "path";
@ -8,7 +9,15 @@ import { createClient } from "./helpers";
(<any>global).TextEncoder = TextEncoder;
describe("spawn", () => {
const client = createClient();
const client = createClient({
dataDirectory: "",
workingDirectory: "",
forkProvider: (msg) => {
return cp.spawn(msg.getCommand(), msg.getArgsList(), {
stdio: [null, null, null, "pipe"],
});
},
});
it("should execute command and return output", (done) => {
const proc = client.spawn("echo", ["test"]);
@ -124,11 +133,13 @@ describe("spawn", () => {
proc.on("exit", () => done());
});
it("should fork", (done) => {
it("should fork and echo messages", (done) => {
const proc = client.fork(path.join(__dirname, "forker.js"));
proc.stdout.on("data", (data) => {
expect(data).toEqual("test");
proc.on("message", (msg) => {
expect(msg.bananas).toBeTruthy();
proc.kill();
});
proc.send({ bananas: true }, true);
proc.on("exit", () => done());
});
});

View File

@ -1 +1,3 @@
console.log("test");
process.on("message", (data) => {
process.send(data);
});

View File

@ -1,5 +1,6 @@
import * as cp from "child_process";
import * as fs from "fs";
import * as net from "net";
import * as path from "path";
import { logger, field } from "@coder/logger/src";
@ -8,6 +9,18 @@ declare var __non_webpack_require__: typeof require;
export const requireModule = (modulePath: string): void => {
process.env.AMD_ENTRYPOINT = modulePath;
process.env.VSCODE_ALLOW_IO = "true";
if (!process.send) {
const socket = new net.Socket({ fd: 3 });
socket.on("data", (data) => {
process.emit("message", JSON.parse(data.toString()), undefined);
});
process.send = (message: any): void => {
socket.write(JSON.stringify(message));
};
}
const content = fs.readFileSync(path.join(process.env.BUILD_DIR as string || path.join(__dirname, "../.."), "./build/bootstrap-fork.js"));
eval(content.toString());
};
@ -36,13 +49,13 @@ export const forkModule = (modulePath: string, stdio?: boolean): cp.ChildProcess
let proc: cp.ChildProcess | undefined;
const args = ["--bootstrap-fork", modulePath];
const options: cp.SpawnOptions = {
stdio: [null, null, null, "pipe"],
};
if (process.env.CLI === "true") {
proc = stdio ? cp.spawn(process.execPath, args) : cp.fork(process.execPath, args);
} else if (stdio) {
proc = cp.spawn("npm", ["start", "--scripts-prepend-node-path", "--", ...args]);
proc = stdio ? cp.spawn(process.execPath, args, options) : cp.fork(process.execPath, args, options);
} else {
// TODO: need to fork somehow so we get send/onMessage.
proc = cp.spawn("npm", ["start", "--scripts-prepend-node-path", "--", ...args]);
proc = cp.spawn(process.execArgv[0], ["-r", "tsconfig-paths/register", process.argv[1], ...args], options);
}
proc.stdout.on("data", (message) => {