Add createServer (#18)

This commit is contained in:
Kyle Carberry
2019-01-23 18:00:38 -06:00
parent ec909bdd0c
commit 6c178d615d
11 changed files with 1671 additions and 50 deletions

View File

@@ -1,8 +1,8 @@
import { ReadWriteConnection, InitData, OperatingSystem, ISharedProcessData } from "../common/connection";
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage } from "../proto";
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage, NewServerMessage } from "../proto";
import { Emitter, Event } from "@coder/events";
import { logger, field } from "@coder/logger";
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket } from "./command";
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command";
/**
* Client accepts an arbitrary connection intended to communicate with the Server.
@@ -18,6 +18,9 @@ export class Client {
private connectionId: number = 0;
private readonly connections: Map<number, ServerSocket> = new Map();
private serverId: number = 0;
private readonly servers: Map<number, ServerListener> = new Map();
private _initData: InitData | undefined;
private initDataEmitter = new Emitter<InitData>();
private initDataPromise: Promise<InitData>;
@@ -189,6 +192,14 @@ export class Client {
return socket;
}
public createServer(callback?: () => void): Server {
const id = this.serverId++;
const server = new ServerListener(this.connection, id, callback);
this.servers.set(id, server);
return server;
}
private doSpawn(command: string, args: string[] = [], options?: SpawnOptions, isFork: boolean = false, isBootstrapFork: boolean = true): ChildProcess {
const id = this.sessionId++;
const newSess = new NewSessionMessage();
@@ -333,6 +344,36 @@ export class Client {
this.sharedProcessActiveEmitter.emit({
socketPath: message.getSharedProcessActive()!.getSocketPath(),
});
} else if (message.hasServerEstablished()) {
const s = this.servers.get(message.getServerEstablished()!.getId());
if (!s) {
return;
}
s.emit("connect");
} else if (message.hasServerConnectionEstablished()) {
const s = this.servers.get(message.getServerConnectionEstablished()!.getServerId());
if (!s) {
return;
}
const conId = message.getServerConnectionEstablished()!.getConnectionId();
const serverSocket = new ServerSocket(this.connection, conId);
serverSocket.emit("connect");
this.connections.set(conId, serverSocket);
s.emit("connection", serverSocket);
} else if (message.getServerFailure()) {
const s = this.servers.get(message.getServerFailure()!.getId());
if (!s) {
return;
}
s.emit("error", new Error(message.getNewSessionFailure()!.getReason().toString()));
this.servers.delete(message.getNewSessionFailure()!.getId());
} else if (message.hasServerClose()) {
const s = this.servers.get(message.getServerClose()!.getId());
if (!s) {
return;
}
s.emit("close");
this.servers.delete(message.getServerClose()!.getId());
}
}
}

View File

@@ -1,7 +1,7 @@
import * as events from "events";
import * as stream from "stream";
import { ReadWriteConnection } from "../common/connection";
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage } from "../proto";
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage, ServerCloseMessage, NewServerMessage } from "../proto";
export interface TTYDimensions {
readonly columns: number;
@@ -237,3 +237,83 @@ export class ServerSocket extends events.EventEmitter implements Socket {
throw new Error("Method not implemented.");
}
}
export interface Server {
addListener(event: "close", listener: () => void): this;
addListener(event: "connect", listener: (socket: Socket) => void): this;
addListener(event: "error", listener: (err: Error) => void): this;
on(event: "close", listener: () => void): this;
on(event: "connection", listener: (socket: Socket) => void): this;
on(event: "error", listener: (err: Error) => void): this;
once(event: "close", listener: () => void): this;
once(event: "connection", listener: (socket: Socket) => void): this;
once(event: "error", listener: (err: Error) => void): this;
removeListener(event: "close", listener: () => void): this;
removeListener(event: "connection", listener: (socket: Socket) => void): this;
removeListener(event: "error", listener: (err: Error) => void): this;
emit(event: "close"): boolean;
emit(event: "connection"): boolean;
emit(event: "error"): boolean;
listen(path: string, listeningListener?: () => void): this;
close(callback?: () => void): this;
readonly listening: boolean;
}
export class ServerListener extends events.EventEmitter implements Server {
private _listening: boolean = false;
public constructor(
private readonly connection: ReadWriteConnection,
private readonly id: number,
connectCallback?: () => void,
) {
super();
this.on("connect", () => {
this._listening = true;
if (connectCallback) {
connectCallback();
}
});
}
public get listening(): boolean {
return this._listening;
}
public listen(path: string, listener?: () => void): this {
const ns = new NewServerMessage();
ns.setId(this.id);
ns.setPath(path!);
const cm = new ClientMessage();
cm.setNewServer(ns);
this.connection.send(cm.serializeBinary());
if (typeof listener !== "undefined") {
this.once("connect", listener);
}
return this;
}
public close(callback?: Function | undefined): this {
const closeMsg = new ServerCloseMessage();
closeMsg.setId(this.id);
closeMsg.setReason("Manually closed");
const clientMsg = new ClientMessage();
clientMsg.setServerClose(closeMsg);
this.connection.send(clientMsg.serializeBinary());
if (callback) {
callback();
}
return this;
}
}