Merge pull request #3178 from code-asher/connections

Minor connections refactor
This commit is contained in:
Asher 2021-04-21 12:22:45 -05:00 committed by GitHub
commit b9c80b8520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 193 additions and 108 deletions

View File

@ -743,6 +743,11 @@ export class PersistentProtocol implements IMessagePassingProtocol {
}, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5); }, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5);
} }
// NOTE@coder: Set the socket without initiating a reconnect.
public setSocket(socket: ISocket): void {
this._socket = socket;
}
public getSocket(): ISocket { public getSocket(): ISocket {
return this._socket; return this._socket;
} }

View File

@ -6,7 +6,9 @@ import { logger } from 'vs/server/node/logger';
import { enableCustomMarketplace } from 'vs/server/node/marketplace'; import { enableCustomMarketplace } from 'vs/server/node/marketplace';
import { Vscode } from 'vs/server/node/server'; import { Vscode } from 'vs/server/node/server';
setUnexpectedErrorHandler((error) => logger.warn(error instanceof Error ? error.message : error)); setUnexpectedErrorHandler((error) => {
logger.warn('Uncaught error', field('error', error instanceof Error ? error.message : error));
});
enableCustomMarketplace(); enableCustomMarketplace();
proxyAgent.monkeyPatch(true); proxyAgent.monkeyPatch(true);

View File

@ -3,31 +3,49 @@ import * as cp from 'child_process';
import { VSBuffer } from 'vs/base/common/buffer'; import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter } from 'vs/base/common/event'; import { Emitter } from 'vs/base/common/event';
import { FileAccess } from 'vs/base/common/network'; import { FileAccess } from 'vs/base/common/network';
import { ISocket } from 'vs/base/parts/ipc/common/ipc.net';
import { WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { INativeEnvironmentService } from 'vs/platform/environment/common/environment'; import { INativeEnvironmentService } from 'vs/platform/environment/common/environment';
import { IRemoteExtensionHostStartParams } from 'vs/platform/remote/common/remoteAgentConnection';
import { getNlsConfiguration } from 'vs/server/node/nls'; import { getNlsConfiguration } from 'vs/server/node/nls';
import { Protocol } from 'vs/server/node/protocol'; import { Protocol } from 'vs/server/node/protocol';
import { IExtHostReadyMessage } from 'vs/workbench/services/extensions/common/extensionHostProtocol'; import { IExtHostReadyMessage } from 'vs/workbench/services/extensions/common/extensionHostProtocol';
export abstract class Connection { export abstract class Connection {
private readonly _onClose = new Emitter<void>(); private readonly _onClose = new Emitter<void>();
/**
* Fire when the connection is closed (not just disconnected). This should
* only happen when the connection is offline and old or has an error.
*/
public readonly onClose = this._onClose.event; public readonly onClose = this._onClose.event;
private disposed = false; private disposed = false;
private _offline: number | undefined; private _offline: number | undefined;
public constructor(protected protocol: Protocol, public readonly token: string) {} protected readonly logger: Logger;
public constructor(
protected readonly protocol: Protocol,
public readonly name: string,
) {
this.logger = logger.named(
this.name,
field('token', this.protocol.options.reconnectionToken),
);
this.logger.debug('Connecting...');
this.onClose(() => this.logger.debug('Closed'));
}
public get offline(): number | undefined { public get offline(): number | undefined {
return this._offline; return this._offline;
} }
public reconnect(socket: ISocket, buffer: VSBuffer): void { public reconnect(protocol: Protocol): void {
this.logger.debug('Reconnecting...');
this._offline = undefined; this._offline = undefined;
this.doReconnect(socket, buffer); this.doReconnect(protocol);
} }
public dispose(): void { public dispose(reason?: string): void {
this.logger.debug('Disposing...', field('reason', reason));
if (!this.disposed) { if (!this.disposed) {
this.disposed = true; this.disposed = true;
this.doDispose(); this.doDispose();
@ -36,6 +54,7 @@ export abstract class Connection {
} }
protected setOffline(): void { protected setOffline(): void {
this.logger.debug('Disconnected');
if (!this._offline) { if (!this._offline) {
this._offline = Date.now(); this._offline = Date.now();
} }
@ -44,7 +63,11 @@ export abstract class Connection {
/** /**
* Set up the connection on a new socket. * Set up the connection on a new socket.
*/ */
protected abstract doReconnect(socket: ISocket, buffer: VSBuffer): void; protected abstract doReconnect(protcol: Protocol): void;
/**
* Dispose/destroy everything permanently.
*/
protected abstract doDispose(): void; protected abstract doDispose(): void;
} }
@ -52,21 +75,22 @@ export abstract class Connection {
* Used for all the IPC channels. * Used for all the IPC channels.
*/ */
export class ManagementConnection extends Connection { export class ManagementConnection extends Connection {
public constructor(protected protocol: Protocol, token: string) { public constructor(protocol: Protocol) {
super(protocol, token); super(protocol, 'management');
protocol.onDidDispose(() => this.dispose()); // Explicit close. protocol.onDidDispose(() => this.dispose()); // Explicit close.
protocol.onSocketClose(() => this.setOffline()); // Might reconnect. protocol.onSocketClose(() => this.setOffline()); // Might reconnect.
protocol.sendMessage({ type: 'ok' });
} }
protected doDispose(): void { protected doDispose(): void {
this.protocol.sendDisconnect(); this.protocol.destroy();
this.protocol.dispose();
this.protocol.getUnderlyingSocket().destroy();
} }
protected doReconnect(socket: ISocket, buffer: VSBuffer): void { protected doReconnect(protocol: Protocol): void {
this.protocol.beginAcceptReconnection(socket, buffer); protocol.sendMessage({ type: 'ok' });
this.protocol.beginAcceptReconnection(protocol.getSocket(), protocol.readEntireBuffer());
this.protocol.endAcceptReconnection(); this.protocol.endAcceptReconnection();
protocol.dispose();
} }
} }
@ -85,55 +109,62 @@ type ExtHostMessage = DisconnectedMessage | ConsoleMessage | IExtHostReadyMessag
export class ExtensionHostConnection extends Connection { export class ExtensionHostConnection extends Connection {
private process?: cp.ChildProcess; private process?: cp.ChildProcess;
private readonly logger: Logger;
public constructor( public constructor(
locale: string, protocol: Protocol, buffer: VSBuffer, token: string, protocol: Protocol,
private readonly params: IRemoteExtensionHostStartParams,
private readonly environment: INativeEnvironmentService, private readonly environment: INativeEnvironmentService,
) { ) {
super(protocol, token); super(protocol, 'exthost');
this.logger = logger.named('exthost', field('token', token));
this.protocol.dispose(); protocol.sendMessage({ debugPort: this.params.port });
this.spawn(locale, buffer).then((p) => this.process = p); const buffer = protocol.readEntireBuffer();
this.protocol.getUnderlyingSocket().pause(); const inflateBytes = protocol.inflateBytes;
protocol.dispose();
protocol.getUnderlyingSocket().pause();
this.spawn(buffer, inflateBytes).then((p) => this.process = p);
} }
protected doDispose(): void { protected doDispose(): void {
this.protocol.destroy();
if (this.process) { if (this.process) {
this.process.kill(); this.process.kill();
} }
this.protocol.getUnderlyingSocket().destroy();
} }
protected doReconnect(socket: ISocket, buffer: VSBuffer): void { protected doReconnect(protocol: Protocol): void {
// This is just to set the new socket. protocol.sendMessage({ debugPort: this.params.port });
this.protocol.beginAcceptReconnection(socket, null); const buffer = protocol.readEntireBuffer();
this.protocol.dispose(); const inflateBytes = protocol.inflateBytes;
this.sendInitMessage(buffer); protocol.dispose();
protocol.getUnderlyingSocket().pause();
this.protocol.setSocket(protocol.getSocket());
this.sendInitMessage(buffer, inflateBytes);
} }
private sendInitMessage(buffer: VSBuffer): void { private sendInitMessage(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): void {
const socket = this.protocol.getUnderlyingSocket(); if (!this.process) {
socket.pause(); throw new Error('Tried to initialize VS Code before spawning');
}
const wrapperSocket = this.protocol.getSocket(); this.logger.debug('Sending socket');
this.logger.trace('Sending socket'); // TODO: Do something with the debug port.
this.process!.send({ // Process must be set at this point. this.process.send({
type: 'VSCODE_EXTHOST_IPC_SOCKET', type: 'VSCODE_EXTHOST_IPC_SOCKET',
initialDataChunk: Buffer.from(buffer.buffer).toString('base64'), initialDataChunk: Buffer.from(buffer.buffer).toString('base64'),
skipWebSocketFrames: !(wrapperSocket instanceof WebSocketNodeSocket), skipWebSocketFrames: this.protocol.options.skipWebSocketFrames,
permessageDeflate: this.protocol.options.permessageDeflate, permessageDeflate: this.protocol.options.permessageDeflate,
inflateBytes: wrapperSocket instanceof WebSocketNodeSocket inflateBytes: inflateBytes ? Buffer.from(inflateBytes).toString('base64') : undefined,
? Buffer.from(wrapperSocket.recordedInflateBytes.buffer).toString('base64') }, this.protocol.getUnderlyingSocket());
: undefined,
}, socket);
} }
private async spawn(locale: string, buffer: VSBuffer): Promise<cp.ChildProcess> { private async spawn(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): Promise<cp.ChildProcess> {
this.logger.trace('Getting NLS configuration...'); this.logger.debug('Getting NLS configuration...');
const config = await getNlsConfiguration(locale, this.environment.userDataPath); const config = await getNlsConfiguration(this.params.language, this.environment.userDataPath);
this.logger.trace('Spawning extension host...'); this.logger.debug('Spawning extension host...');
const proc = cp.fork( const proc = cp.fork(
FileAccess.asFileUri('bootstrap-fork', require).fsPath, FileAccess.asFileUri('bootstrap-fork', require).fsPath,
// While not technically necessary, makes it easier to tell which process // While not technically necessary, makes it easier to tell which process
@ -162,7 +193,7 @@ export class ExtensionHostConnection extends Connection {
this.dispose(); this.dispose();
}); });
proc.on('exit', (code) => { proc.on('exit', (code) => {
this.logger.trace('Exited', field('code', code)); this.logger.debug('Exited', field('code', code));
this.dispose(); this.dispose();
}); });
if (proc.stdout && proc.stderr) { if (proc.stdout && proc.stderr) {
@ -181,12 +212,12 @@ export class ExtensionHostConnection extends Connection {
} }
break; break;
case 'VSCODE_EXTHOST_DISCONNECTED': case 'VSCODE_EXTHOST_DISCONNECTED':
this.logger.trace('Going offline'); this.logger.debug('Got disconnected message');
this.setOffline(); this.setOffline();
break; break;
case 'VSCODE_EXTHOST_IPC_READY': case 'VSCODE_EXTHOST_IPC_READY':
this.logger.trace('Got ready message'); this.logger.debug('Handshake completed');
this.sendInitMessage(buffer); this.sendInitMessage(buffer, inflateBytes);
break; break;
default: default:
this.logger.error('Unexpected message', field('event', event)); this.logger.error('Unexpected message', field('event', event));
@ -194,7 +225,7 @@ export class ExtensionHostConnection extends Connection {
} }
}); });
this.logger.trace('Waiting for handshake...'); this.logger.debug('Waiting for handshake...');
return proc; return proc;
} }
} }

View File

@ -1,21 +1,31 @@
import { field } from '@coder/logger'; import { field, logger, Logger } from '@coder/logger';
import * as net from 'net'; import * as net from 'net';
import { VSBuffer } from 'vs/base/common/buffer'; import { VSBuffer } from 'vs/base/common/buffer';
import { PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net'; import { PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net';
import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net'; import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { AuthRequest, ConnectionTypeRequest, HandshakeMessage } from 'vs/platform/remote/common/remoteAgentConnection'; import { AuthRequest, ConnectionTypeRequest, HandshakeMessage } from 'vs/platform/remote/common/remoteAgentConnection';
import { logger } from 'vs/server/node/logger';
export interface SocketOptions { export interface SocketOptions {
/** The token is how we identify and connect to existing sessions. */
readonly reconnectionToken: string; readonly reconnectionToken: string;
/** Specifies that the client is trying to reconnect. */
readonly reconnection: boolean; readonly reconnection: boolean;
/** If true assume this is not a web socket (always false for code-server). */
readonly skipWebSocketFrames: boolean; readonly skipWebSocketFrames: boolean;
/** Whether to support compression (web socket only). */
readonly permessageDeflate?: boolean; readonly permessageDeflate?: boolean;
/**
* Seed zlib with these bytes (web socket only). If parts of inflating was
* done in a different zlib instance we need to pass all those bytes into zlib
* otherwise the inflate might hit an inflated portion referencing a distance
* too far back.
*/
readonly inflateBytes?: VSBuffer; readonly inflateBytes?: VSBuffer;
readonly recordInflateBytes?: boolean;
} }
export class Protocol extends PersistentProtocol { export class Protocol extends PersistentProtocol {
private readonly logger: Logger;
public constructor(socket: net.Socket, public readonly options: SocketOptions) { public constructor(socket: net.Socket, public readonly options: SocketOptions) {
super( super(
options.skipWebSocketFrames options.skipWebSocketFrames
@ -24,9 +34,12 @@ export class Protocol extends PersistentProtocol {
new NodeSocket(socket), new NodeSocket(socket),
options.permessageDeflate || false, options.permessageDeflate || false,
options.inflateBytes || null, options.inflateBytes || null,
options.recordInflateBytes || false, // Always record inflate bytes if using permessage-deflate.
options.permessageDeflate || false,
), ),
); );
this.logger = logger.named('protocol', field('token', this.options.reconnectionToken));
} }
public getUnderlyingSocket(): net.Socket { public getUnderlyingSocket(): net.Socket {
@ -40,31 +53,44 @@ export class Protocol extends PersistentProtocol {
* Perform a handshake to get a connection request. * Perform a handshake to get a connection request.
*/ */
public handshake(): Promise<ConnectionTypeRequest> { public handshake(): Promise<ConnectionTypeRequest> {
logger.trace('Protocol handshake', field('token', this.options.reconnectionToken)); this.logger.debug('Initiating handshake...');
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const cleanup = () => {
handler.dispose();
onClose.dispose();
clearTimeout(timeout);
};
const onClose = this.onSocketClose(() => {
cleanup();
this.logger.debug('Handshake failed');
reject(new Error('Protocol socket closed unexpectedly'));
});
const timeout = setTimeout(() => { const timeout = setTimeout(() => {
logger.error('Handshake timed out', field('token', this.options.reconnectionToken)); cleanup();
reject(new Error('timed out')); this.logger.debug('Handshake timed out');
reject(new Error('Protocol handshake timed out'));
}, 10000); // Matches the client timeout. }, 10000); // Matches the client timeout.
const handler = this.onControlMessage((rawMessage) => { const handler = this.onControlMessage((rawMessage) => {
try { try {
const raw = rawMessage.toString(); const raw = rawMessage.toString();
logger.trace('Protocol message', field('token', this.options.reconnectionToken), field('message', raw)); this.logger.trace('Got message', field('message', raw));
const message = JSON.parse(raw); const message = JSON.parse(raw);
switch (message.type) { switch (message.type) {
case 'auth': case 'auth':
return this.authenticate(message); return this.authenticate(message);
case 'connectionType': case 'connectionType':
handler.dispose(); cleanup();
clearTimeout(timeout); this.logger.debug('Handshake completed');
return resolve(message); return resolve(message);
default: default:
throw new Error('Unrecognized message type'); throw new Error('Unrecognized message type');
} }
} catch (error) { } catch (error) {
handler.dispose(); cleanup();
clearTimeout(timeout);
reject(error); reject(error);
} }
}); });
@ -90,10 +116,38 @@ export class Protocol extends PersistentProtocol {
} }
/** /**
* Send a handshake message. In the case of the extension host, it just sends * Send a handshake message. In the case of the extension host it should just
* back a debug port. * send a debug port.
*/ */
public sendMessage(message: HandshakeMessage | { debugPort?: number } ): void { public sendMessage(message: HandshakeMessage | { debugPort?: number | null } ): void {
this.sendControl(VSBuffer.fromString(JSON.stringify(message))); this.sendControl(VSBuffer.fromString(JSON.stringify(message)));
} }
/**
* Disconnect and dispose everything including the underlying socket.
*/
public destroy(reason?: string): void {
try {
if (reason) {
this.sendMessage({ type: 'error', reason });
}
// If still connected try notifying the client.
this.sendDisconnect();
} catch (error) {
// I think the write might fail if already disconnected.
this.logger.warn(error.message || error);
}
this.dispose(); // This disposes timers and socket event handlers.
this.getSocket().dispose(); // This will destroy() the socket.
}
/**
* Get inflateBytes from the current socket.
*/
public get inflateBytes(): Uint8Array | undefined {
const socket = this.getSocket();
return socket instanceof WebSocketNodeSocket
? socket.recordedInflateBytes.buffer
: undefined;
}
} }

View File

@ -1,4 +1,3 @@
import { field } from '@coder/logger';
import * as fs from 'fs'; import * as fs from 'fs';
import * as net from 'net'; import * as net from 'net';
import { release } from 'os'; import { release } from 'os';
@ -123,14 +122,11 @@ export class Vscode {
reconnection: query.reconnection === 'true', reconnection: query.reconnection === 'true',
skipWebSocketFrames: query.skipWebSocketFrames === 'true', skipWebSocketFrames: query.skipWebSocketFrames === 'true',
permessageDeflate, permessageDeflate,
recordInflateBytes: permessageDeflate,
}); });
try { try {
await this.connect(await protocol.handshake(), protocol); await this.connect(await protocol.handshake(), protocol);
} catch (error) { } catch (error) {
protocol.sendMessage({ type: 'error', reason: error.message }); protocol.destroy(error.message);
protocol.dispose();
protocol.getSocket().dispose();
} }
return true; return true;
} }
@ -143,56 +139,61 @@ export class Vscode {
switch (message.desiredConnectionType) { switch (message.desiredConnectionType) {
case ConnectionType.ExtensionHost: case ConnectionType.ExtensionHost:
case ConnectionType.Management: case ConnectionType.Management:
// Initialize connection map for this type of connection.
if (!this.connections.has(message.desiredConnectionType)) { if (!this.connections.has(message.desiredConnectionType)) {
this.connections.set(message.desiredConnectionType, new Map()); this.connections.set(message.desiredConnectionType, new Map());
} }
const connections = this.connections.get(message.desiredConnectionType)!; const connections = this.connections.get(message.desiredConnectionType)!;
const ok = async () => {
return message.desiredConnectionType === ConnectionType.ExtensionHost
? { debugPort: await this.getDebugPort() }
: { type: 'ok' };
};
const token = protocol.options.reconnectionToken; const token = protocol.options.reconnectionToken;
if (protocol.options.reconnection && connections.has(token)) { let connection = connections.get(token);
protocol.sendMessage(await ok()); if (protocol.options.reconnection && connection) {
const buffer = protocol.readEntireBuffer(); return connection.reconnect(protocol);
protocol.dispose();
return connections.get(token)!.reconnect(protocol.getSocket(), buffer);
} else if (protocol.options.reconnection || connections.has(token)) {
throw new Error(protocol.options.reconnection
? 'Unrecognized reconnection token'
: 'Duplicate reconnection token'
);
} }
logger.debug('New connection', field('token', token)); // This probably means the process restarted so the session was lost
protocol.sendMessage(await ok()); // while the browser remained open.
if (protocol.options.reconnection) {
throw new Error(`Unable to reconnect; session no longer exists (${token})`);
}
let connection: Connection; // This will probably never happen outside a chance collision.
if (connection) {
throw new Error('Unable to connect; token is already in use');
}
// Now that the initial exchange has completed we can create the actual
// connection on top of the protocol then send it to whatever uses it.
if (message.desiredConnectionType === ConnectionType.Management) { if (message.desiredConnectionType === ConnectionType.Management) {
connection = new ManagementConnection(protocol, token); // The management connection is used by firing onDidClientConnect
// which makes the IPC server become aware of the connection.
connection = new ManagementConnection(protocol);
this._onDidClientConnect.fire({ this._onDidClientConnect.fire({
protocol, onDidClientDisconnect: connection.onClose, protocol,
onDidClientDisconnect: connection.onClose,
}); });
} else { } else {
const buffer = protocol.readEntireBuffer(); // The extension host connection is used by spawning an extension host
// and passing the socket into it.
connection = new ExtensionHostConnection( connection = new ExtensionHostConnection(
message.args ? message.args.language : 'en', protocol,
protocol, buffer, token, {
language: 'en',
...message.args,
},
this.services.get(IEnvironmentService) as INativeEnvironmentService, this.services.get(IEnvironmentService) as INativeEnvironmentService,
); );
} }
connections.set(token, connection); connections.set(token, connection);
connection.onClose(() => { connection.onClose(() => connections.delete(token));
logger.debug('Connection closed', field('token', token));
connections.delete(token);
});
this.disposeOldOfflineConnections(connections); this.disposeOldOfflineConnections(connections);
logger.debug(`${connections.size} active ${connection.name} connection(s)`);
break; break;
case ConnectionType.Tunnel: return protocol.tunnel(); case ConnectionType.Tunnel:
default: throw new Error('Unrecognized connection type'); return protocol.tunnel();
default:
throw new Error(`Unrecognized connection type ${message.desiredConnectionType}`);
} }
} }
@ -200,8 +201,7 @@ export class Vscode {
const offline = Array.from(connections.values()) const offline = Array.from(connections.values())
.filter((connection) => typeof connection.offline !== 'undefined'); .filter((connection) => typeof connection.offline !== 'undefined');
for (let i = 0, max = offline.length - this.maxExtraOfflineConnections; i < max; ++i) { for (let i = 0, max = offline.length - this.maxExtraOfflineConnections; i < max; ++i) {
logger.debug('Disposing offline connection', field('token', offline[i].token)); offline[i].dispose('old');
offline[i].dispose();
} }
} }
@ -295,11 +295,4 @@ export class Vscode {
}); });
}); });
} }
/**
* TODO: implement.
*/
private async getDebugPort(): Promise<number | undefined> {
return undefined;
}
} }

View File

@ -41,7 +41,7 @@ export const register = async (
if (error) { if (error) {
return reject(error) return reject(error)
} }
logger.trace(plural(count, `${count} active connection`)) logger.debug(plural(count, `${count} active connection`))
resolve(count > 0) resolve(count > 0)
}) })
}) })