Implement fs module (#3)
* Implements the fs module * Add stats object * Add not implemented to createWriteStream * Update mkdtemp to use tmp dir * Unexport Stats * Add client web socket for commands and restructure
This commit is contained in:
105
packages/protocol/src/node/command.ts
Normal file
105
packages/protocol/src/node/command.ts
Normal file
@@ -0,0 +1,105 @@
|
||||
import * as cp from "child_process";
|
||||
import * as nodePty from "node-pty";
|
||||
import * as stream from "stream";
|
||||
import { TextEncoder } from "text-encoding";
|
||||
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, ShutdownSessionMessage, IdentifySessionMessage, ClientMessage } from "../proto";
|
||||
import { SendableConnection } from "../common/connection";
|
||||
|
||||
export interface Process {
|
||||
stdin?: stream.Writable;
|
||||
stdout?: stream.Readable;
|
||||
stderr?: stream.Readable;
|
||||
|
||||
pid: number;
|
||||
killed?: boolean;
|
||||
|
||||
on(event: "data", cb: (data: string) => void): void;
|
||||
on(event: 'exit', listener: (exitCode: number, signal?: number) => void): void;
|
||||
write(data: string | Uint8Array): void;
|
||||
resize?(cols: number, rows: number): void;
|
||||
kill(signal?: string): void;
|
||||
title?: number;
|
||||
}
|
||||
|
||||
export const handleNewSession = (connection: SendableConnection, newSession: NewSessionMessage, onExit: () => void): Process => {
|
||||
let process: Process;
|
||||
|
||||
const env = {} as any;
|
||||
newSession.getEnvMap().forEach((value: any, key: any) => {
|
||||
env[key] = value;
|
||||
});
|
||||
if (newSession.getTtyDimensions()) {
|
||||
// Spawn with node-pty
|
||||
process = nodePty.spawn(newSession.getCommand(), newSession.getArgsList(), {
|
||||
cols: newSession.getTtyDimensions()!.getWidth(),
|
||||
rows: newSession.getTtyDimensions()!.getHeight(),
|
||||
cwd: newSession.getCwd(),
|
||||
env,
|
||||
});
|
||||
} else {
|
||||
const options = {
|
||||
cwd: newSession.getCwd(),
|
||||
env,
|
||||
};
|
||||
let proc: cp.ChildProcess;
|
||||
if (newSession.getIsFork()) {
|
||||
proc = cp.fork(newSession.getCommand(), newSession.getArgsList());
|
||||
} else {
|
||||
proc = cp.spawn(newSession.getCommand(), newSession.getArgsList(), options);
|
||||
}
|
||||
|
||||
process = {
|
||||
stdin: proc.stdin,
|
||||
stderr: proc.stderr,
|
||||
stdout: proc.stdout,
|
||||
on: (...args: any[]) => (<any>proc.on)(...args),
|
||||
write: (d) => proc.stdin.write(d),
|
||||
kill: (s) => proc.kill(s || "SIGTERM"),
|
||||
pid: proc.pid,
|
||||
};
|
||||
}
|
||||
|
||||
const sendOutput = (fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => {
|
||||
const serverMsg = new ServerMessage();
|
||||
const d = new SessionOutputMessage();
|
||||
d.setId(newSession.getId());
|
||||
d.setData(typeof msg === "string" ? new TextEncoder().encode(msg) : msg);
|
||||
d.setFd(SessionOutputMessage.FD.STDOUT);
|
||||
serverMsg.setSessionOutput(d);
|
||||
connection.send(serverMsg.serializeBinary());
|
||||
};
|
||||
|
||||
if (process.stdout && process.stderr) {
|
||||
process.stdout.on("data", (data) => {
|
||||
sendOutput(SessionOutputMessage.FD.STDOUT, data);
|
||||
});
|
||||
|
||||
process.stderr.on("data", (data) => {
|
||||
sendOutput(SessionOutputMessage.FD.STDERR, data);
|
||||
});
|
||||
} else {
|
||||
process.on("data", (data) => {
|
||||
sendOutput(SessionOutputMessage.FD.STDOUT, Buffer.from(data));
|
||||
});
|
||||
}
|
||||
|
||||
const id = new IdentifySessionMessage();
|
||||
id.setId(newSession.getId());
|
||||
id.setPid(process.pid);
|
||||
const sm = new ServerMessage();
|
||||
sm.setIdentifySession(id);
|
||||
connection.send(sm.serializeBinary());
|
||||
|
||||
process.on("exit", (code, signal) => {
|
||||
const serverMsg = new ServerMessage();
|
||||
const exit = new SessionDoneMessage();
|
||||
exit.setId(newSession.getId());
|
||||
exit.setExitStatus(code);
|
||||
serverMsg.setSessionDone(exit);
|
||||
connection.send(serverMsg.serializeBinary());
|
||||
|
||||
onExit();
|
||||
});
|
||||
|
||||
return process;
|
||||
};
|
||||
61
packages/protocol/src/node/evaluate.ts
Normal file
61
packages/protocol/src/node/evaluate.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import * as vm from "vm";
|
||||
import { NewEvalMessage, TypedValue, EvalFailedMessage, EvalDoneMessage, ServerMessage } from "../proto";
|
||||
import { SendableConnection } from "../common/connection";
|
||||
|
||||
export const evaluate = async (connection: SendableConnection, message: NewEvalMessage): Promise<void> => {
|
||||
const argStr: string[] = [];
|
||||
message.getArgsList().forEach((value) => {
|
||||
argStr.push(value);
|
||||
});
|
||||
const sendResp = (resp: any): void => {
|
||||
const evalDone = new EvalDoneMessage();
|
||||
evalDone.setId(message.getId());
|
||||
const tof = typeof resp;
|
||||
if (tof !== "undefined") {
|
||||
const tv = new TypedValue();
|
||||
let t: TypedValue.Type;
|
||||
switch (tof) {
|
||||
case "string":
|
||||
t = TypedValue.Type.STRING;
|
||||
break;
|
||||
case "boolean":
|
||||
t = TypedValue.Type.BOOLEAN;
|
||||
break;
|
||||
case "object":
|
||||
t = TypedValue.Type.OBJECT;
|
||||
break;
|
||||
case "number":
|
||||
t = TypedValue.Type.NUMBER;
|
||||
break;
|
||||
default:
|
||||
sendErr(EvalFailedMessage.Reason.EXCEPTION, `unsupported response type ${tof}`);
|
||||
return;
|
||||
}
|
||||
tv.setValue(tof === "string" ? resp : JSON.stringify(resp));
|
||||
tv.setType(t);
|
||||
evalDone.setResponse(tv);
|
||||
}
|
||||
|
||||
const serverMsg = new ServerMessage();
|
||||
serverMsg.setEvalDone(evalDone);
|
||||
connection.send(serverMsg.serializeBinary());
|
||||
};
|
||||
const sendErr = (reason: EvalFailedMessage.Reason, msg: string): void => {
|
||||
const evalFailed = new EvalFailedMessage();
|
||||
evalFailed.setId(message.getId());
|
||||
evalFailed.setReason(reason);
|
||||
evalFailed.setMessage(msg);
|
||||
|
||||
const serverMsg = new ServerMessage();
|
||||
serverMsg.setEvalFailed(evalFailed);
|
||||
connection.send(serverMsg.serializeBinary());
|
||||
};
|
||||
try {
|
||||
const value = vm.runInNewContext(`(${message.getFunction()})(${argStr.join(",")})`, { Buffer, require, setTimeout }, {
|
||||
timeout: message.getTimeout() || 30000,
|
||||
});
|
||||
sendResp(await value);
|
||||
} catch (ex) {
|
||||
sendErr(EvalFailedMessage.Reason.EXCEPTION, ex.toString());
|
||||
}
|
||||
};
|
||||
67
packages/protocol/src/node/server.ts
Normal file
67
packages/protocol/src/node/server.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { logger, field } from "@coder/logger";
|
||||
import { TextDecoder } from "text-encoding";
|
||||
import { ClientMessage } from "../proto";
|
||||
import { evaluate } from "./evaluate";
|
||||
import { ReadWriteConnection } from "../common/connection";
|
||||
import { Process, handleNewSession } from "./command";
|
||||
|
||||
export class Server {
|
||||
|
||||
private readonly sessions: Map<number, Process>;
|
||||
|
||||
public constructor(
|
||||
private readonly connection: ReadWriteConnection,
|
||||
) {
|
||||
this.sessions = new Map();
|
||||
|
||||
connection.onMessage((data) => {
|
||||
try {
|
||||
this.handleMessage(ClientMessage.deserializeBinary(data));
|
||||
} catch (ex) {
|
||||
logger.error("Failed to handle client message", field("length", data.byteLength), field("exception", ex));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private handleMessage(message: ClientMessage): void {
|
||||
if (message.hasNewEval()) {
|
||||
evaluate(this.connection, message.getNewEval()!);
|
||||
} else if (message.hasNewSession()) {
|
||||
const session = handleNewSession(this.connection, message.getNewSession()!, () => {
|
||||
this.sessions.delete(message.getNewSession()!.getId());
|
||||
});
|
||||
|
||||
this.sessions.set(message.getNewSession()!.getId(), session);
|
||||
} else if (message.hasCloseSessionInput()) {
|
||||
const s = this.getSession(message.getCloseSessionInput()!.getId());
|
||||
if (!s || !s.stdin) {
|
||||
return;
|
||||
}
|
||||
s.stdin.end();
|
||||
} else if (message.hasResizeSessionTty()) {
|
||||
const s = this.getSession(message.getResizeSessionTty()!.getId());
|
||||
if (!s || !s.resize) {
|
||||
return;
|
||||
}
|
||||
const tty = message.getResizeSessionTty()!.getTtyDimensions()!;
|
||||
s.resize(tty.getWidth(), tty.getHeight());
|
||||
} else if (message.hasShutdownSession()) {
|
||||
const s = this.getSession(message.getShutdownSession()!.getId());
|
||||
if (!s) {
|
||||
return;
|
||||
}
|
||||
s.kill(message.getShutdownSession()!.getSignal());
|
||||
} else if (message.hasWriteToSession()) {
|
||||
const s = this.getSession(message.getWriteToSession()!.getId());
|
||||
if (!s) {
|
||||
return;
|
||||
}
|
||||
s.write(new TextDecoder().decode(message.getWriteToSession()!.getData_asU8()));
|
||||
}
|
||||
}
|
||||
|
||||
private getSession(id: number): Process | undefined {
|
||||
return this.sessions.get(id);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user