Getting the client to run (#12)
* Clean up workbench and integrate initialization data * Uncomment Electron fill * Run server & client together * Clean up Electron fill & patch * Bind fs methods This makes them usable with the promise form: `promisify(access)(...)`. * Add space between tag and title to browser logger * Add typescript dep to server and default __dirname for path * Serve web files from server * Adjust some dev options * Rework workbench a bit to use a class and catch unexpected errors * No mkdirs for now, fix util fill, use bash with exec * More fills, make general client abstract * More fills * Fix cp.exec * Fix require calls in fs fill being aliased * Create data and storage dir * Implement fs.watch Using exec for now. * Implement storage database fill * Fix os export and homedir * Add comment to use navigator.sendBeacon * Fix fs callbacks (some args are optional) * Make sure data directory exists when passing it back * Update patch * Target es5 * More fills * Add APIs required for bootstrap-fork to function (#15) * Add bootstrap-fork execution * Add createConnection * Bundle bootstrap-fork into cli * Remove .node directory created from spdlog * Fix npm start * Remove unnecessary comment * Add webpack-hot-middleware if CLI env is not set * Add restarting to shared process * Fix starting with yarn
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
import * as cp from "child_process";
|
||||
import * as net from "net";
|
||||
import * as nodePty from "node-pty";
|
||||
import * as stream from "stream";
|
||||
import { TextEncoder } from "text-encoding";
|
||||
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, ShutdownSessionMessage, IdentifySessionMessage, ClientMessage } from "../proto";
|
||||
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, ShutdownSessionMessage, IdentifySessionMessage, ClientMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage } from "../proto";
|
||||
import { SendableConnection } from "../common/connection";
|
||||
|
||||
export interface Process {
|
||||
@@ -59,7 +60,7 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
|
||||
};
|
||||
}
|
||||
|
||||
const sendOutput = (fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => {
|
||||
const sendOutput = (_fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => {
|
||||
const serverMsg = new ServerMessage();
|
||||
const d = new SessionOutputMessage();
|
||||
d.setId(newSession.getId());
|
||||
@@ -90,7 +91,7 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
|
||||
sm.setIdentifySession(id);
|
||||
connection.send(sm.serializeBinary());
|
||||
|
||||
process.on("exit", (code, signal) => {
|
||||
process.on("exit", (code) => {
|
||||
const serverMsg = new ServerMessage();
|
||||
const exit = new SessionDoneMessage();
|
||||
exit.setId(newSession.getId());
|
||||
@@ -103,3 +104,61 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
|
||||
|
||||
return process;
|
||||
};
|
||||
|
||||
export const handleNewConnection = (connection: SendableConnection, newConnection: NewConnectionMessage, onExit: () => void): net.Socket => {
|
||||
const id = newConnection.getId();
|
||||
let socket: net.Socket;
|
||||
let didConnect = false;
|
||||
const connectCallback = () => {
|
||||
didConnect = true;
|
||||
const estab = new ConnectionEstablishedMessage();
|
||||
estab.setId(id);
|
||||
const servMsg = new ServerMessage();
|
||||
servMsg.setConnectionEstablished(estab);
|
||||
connection.send(servMsg.serializeBinary());
|
||||
};
|
||||
|
||||
if (newConnection.getPath()) {
|
||||
socket = net.createConnection(newConnection.getPath(), connectCallback);
|
||||
} else if (newConnection.getPort()) {
|
||||
socket = net.createConnection(newConnection.getPort(), undefined, connectCallback);
|
||||
} else {
|
||||
throw new Error("No path or port provided for new connection");
|
||||
}
|
||||
|
||||
socket.addListener("error", (err) => {
|
||||
if (!didConnect) {
|
||||
const errMsg = new NewConnectionFailureMessage();
|
||||
errMsg.setId(id);
|
||||
errMsg.setMessage(err.message);
|
||||
const servMsg = new ServerMessage();
|
||||
servMsg.setConnectionFailure(errMsg);
|
||||
connection.send(servMsg.serializeBinary());
|
||||
|
||||
onExit();
|
||||
}
|
||||
});
|
||||
|
||||
socket.addListener("close", () => {
|
||||
if (didConnect) {
|
||||
const closed = new ConnectionCloseMessage();
|
||||
closed.setId(id);
|
||||
const servMsg = new ServerMessage();
|
||||
servMsg.setConnectionClose(closed);
|
||||
connection.send(servMsg.serializeBinary());
|
||||
|
||||
onExit();
|
||||
}
|
||||
});
|
||||
|
||||
socket.addListener("data", (data) => {
|
||||
const dataMsg = new ConnectionOutputMessage();
|
||||
dataMsg.setId(id);
|
||||
dataMsg.setData(data);
|
||||
const servMsg = new ServerMessage();
|
||||
servMsg.setConnectionOutput(dataMsg);
|
||||
connection.send(servMsg.serializeBinary());
|
||||
});
|
||||
|
||||
return socket;
|
||||
}
|
||||
@@ -29,8 +29,7 @@ export const evaluate = async (connection: SendableConnection, message: NewEvalM
|
||||
t = TypedValue.Type.NUMBER;
|
||||
break;
|
||||
default:
|
||||
sendErr(EvalFailedMessage.Reason.EXCEPTION, `unsupported response type ${tof}`);
|
||||
return;
|
||||
return sendErr(EvalFailedMessage.Reason.EXCEPTION, `unsupported response type ${tof}`);
|
||||
}
|
||||
tv.setValue(tof === "string" ? resp : JSON.stringify(resp));
|
||||
tv.setType(t);
|
||||
@@ -52,11 +51,17 @@ export const evaluate = async (connection: SendableConnection, message: NewEvalM
|
||||
connection.send(serverMsg.serializeBinary());
|
||||
};
|
||||
try {
|
||||
const value = vm.runInNewContext(`(${message.getFunction()})(${argStr.join(",")})`, { Buffer, require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require, setTimeout }, {
|
||||
const value = vm.runInNewContext(`(${message.getFunction()})(${argStr.join(",")})`, {
|
||||
Buffer,
|
||||
require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require,
|
||||
_require: typeof __non_webpack_require__ !== "undefined" ? __non_webpack_require__ : require,
|
||||
tslib_1: require("tslib"), // TODO: is there a better way to do this?
|
||||
setTimeout,
|
||||
}, {
|
||||
timeout: message.getTimeout() || 30000,
|
||||
});
|
||||
sendResp(await value);
|
||||
} catch (ex) {
|
||||
sendErr(EvalFailedMessage.Reason.EXCEPTION, ex.toString());
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import { logger, field } from "@coder/logger";
|
||||
import * as os from "os";
|
||||
import * as path from "path";
|
||||
import { mkdir } from "fs";
|
||||
import { promisify } from "util";
|
||||
import { TextDecoder } from "text-encoding";
|
||||
import { ClientMessage, InitMessage, ServerMessage } from "../proto";
|
||||
import { logger, field } from "@coder/logger";
|
||||
import { ClientMessage, WorkingInitMessage, ServerMessage } from "../proto";
|
||||
import { evaluate } from "./evaluate";
|
||||
import { ReadWriteConnection } from "../common/connection";
|
||||
import { Process, handleNewSession } from "./command";
|
||||
import { Process, handleNewSession, handleNewConnection } from "./command";
|
||||
import * as net from "net";
|
||||
|
||||
export interface ServerOptions {
|
||||
readonly workingDirectory: string;
|
||||
@@ -13,14 +17,13 @@ export interface ServerOptions {
|
||||
|
||||
export class Server {
|
||||
|
||||
private readonly sessions: Map<number, Process>;
|
||||
private readonly sessions: Map<number, Process> = new Map();
|
||||
private readonly connections: Map<number, net.Socket> = new Map();
|
||||
|
||||
public constructor(
|
||||
private readonly connection: ReadWriteConnection,
|
||||
options?: ServerOptions,
|
||||
) {
|
||||
this.sessions = new Map();
|
||||
|
||||
connection.onMessage((data) => {
|
||||
try {
|
||||
this.handleMessage(ClientMessage.deserializeBinary(data));
|
||||
@@ -35,22 +38,43 @@ export class Server {
|
||||
return;
|
||||
}
|
||||
|
||||
const initMsg = new InitMessage();
|
||||
// Ensure the data directory exists.
|
||||
const mkdirP = async (path: string): Promise<void> => {
|
||||
const split = path.replace(/^\/*|\/*$/g, "").split("/");
|
||||
let dir = "";
|
||||
while (split.length > 0) {
|
||||
dir += "/" + split.shift();
|
||||
try {
|
||||
await promisify(mkdir)(dir);
|
||||
} catch (error) {
|
||||
if (error.code !== "EEXIST") {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Promise.all([ mkdirP(path.join(options.dataDirectory, "User", "workspaceStorage")) ]).then(() => {
|
||||
logger.info("Created data directory");
|
||||
}).catch((error) => {
|
||||
logger.error(error.message, field("error", error));
|
||||
});
|
||||
|
||||
const initMsg = new WorkingInitMessage();
|
||||
initMsg.setDataDirectory(options.dataDirectory);
|
||||
initMsg.setWorkingDirectory(options.workingDirectory);
|
||||
initMsg.setHomeDirectory(os.homedir());
|
||||
initMsg.setTmpDirectory(os.tmpdir());
|
||||
const platform = os.platform();
|
||||
let operatingSystem: InitMessage.OperatingSystem;
|
||||
let operatingSystem: WorkingInitMessage.OperatingSystem;
|
||||
switch (platform) {
|
||||
case "win32":
|
||||
operatingSystem = InitMessage.OperatingSystem.WINDOWS;
|
||||
operatingSystem = WorkingInitMessage.OperatingSystem.WINDOWS;
|
||||
break;
|
||||
case "linux":
|
||||
operatingSystem = InitMessage.OperatingSystem.LINUX;
|
||||
operatingSystem = WorkingInitMessage.OperatingSystem.LINUX;
|
||||
break;
|
||||
case "darwin":
|
||||
operatingSystem = InitMessage.OperatingSystem.MAC;
|
||||
operatingSystem = WorkingInitMessage.OperatingSystem.MAC;
|
||||
break;
|
||||
default:
|
||||
throw new Error(`unrecognized platform "${platform}"`);
|
||||
@@ -66,9 +90,8 @@ export class Server {
|
||||
evaluate(this.connection, message.getNewEval()!);
|
||||
} else if (message.hasNewSession()) {
|
||||
const session = handleNewSession(this.connection, message.getNewSession()!, () => {
|
||||
this.sessions.delete(message.getNewSession()!.getId());
|
||||
this.sessions.delete(message.getNewSession()!.getId());
|
||||
});
|
||||
|
||||
this.sessions.set(message.getNewSession()!.getId(), session);
|
||||
} else if (message.hasCloseSessionInput()) {
|
||||
const s = this.getSession(message.getCloseSessionInput()!.getId());
|
||||
@@ -95,9 +118,30 @@ export class Server {
|
||||
return;
|
||||
}
|
||||
s.write(new TextDecoder().decode(message.getWriteToSession()!.getData_asU8()));
|
||||
} else if (message.hasNewConnection()) {
|
||||
const socket = handleNewConnection(this.connection, message.getNewConnection()!, () => {
|
||||
this.connections.delete(message.getNewConnection()!.getId());
|
||||
});
|
||||
this.connections.set(message.getNewConnection()!.getId(), socket);
|
||||
} else if (message.hasConnectionOutput()) {
|
||||
const c = this.getConnection(message.getConnectionOutput()!.getId());
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
c.write(Buffer.from(message.getConnectionOutput()!.getData_asU8()));
|
||||
} else if (message.hasConnectionClose()) {
|
||||
const c = this.getConnection(message.getConnectionClose()!.getId());
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
c.end();
|
||||
}
|
||||
}
|
||||
|
||||
private getConnection(id: number): net.Socket | undefined {
|
||||
return this.connections.get(id);
|
||||
}
|
||||
|
||||
private getSession(id: number): Process | undefined {
|
||||
return this.sessions.get(id);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user