Improve protocol class

- Move destroy logic into the class itself
- Improve logging a bit
- Remove the record option; we should always do this when using
  permessage-deflate.
- Let debug port be null (it can be null in the message args).
- Add setSocket so we don't have to initiate a connection to set it.
- Move inflate bytes logic into the class itself.
This commit is contained in:
Asher 2021-04-20 11:52:21 -05:00
parent cbc2e8bc92
commit ae6089f852
No known key found for this signature in database
GPG Key ID: D63C1EF81242354A
4 changed files with 72 additions and 32 deletions

View File

@ -743,6 +743,11 @@ export class PersistentProtocol implements IMessagePassingProtocol {
}, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5); }, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5);
} }
// NOTE@coder: Set the socket without initiating a reconnect.
public setSocket(socket: ISocket): void {
this._socket = socket;
}
public getSocket(): ISocket { public getSocket(): ISocket {
return this._socket; return this._socket;
} }

View File

@ -4,7 +4,6 @@ import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter } from 'vs/base/common/event'; import { Emitter } from 'vs/base/common/event';
import { FileAccess } from 'vs/base/common/network'; import { FileAccess } from 'vs/base/common/network';
import { ISocket } from 'vs/base/parts/ipc/common/ipc.net'; import { ISocket } from 'vs/base/parts/ipc/common/ipc.net';
import { WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { INativeEnvironmentService } from 'vs/platform/environment/common/environment'; import { INativeEnvironmentService } from 'vs/platform/environment/common/environment';
import { getNlsConfiguration } from 'vs/server/node/nls'; import { getNlsConfiguration } from 'vs/server/node/nls';
import { Protocol } from 'vs/server/node/protocol'; import { Protocol } from 'vs/server/node/protocol';
@ -59,9 +58,7 @@ export class ManagementConnection extends Connection {
} }
protected doDispose(): void { protected doDispose(): void {
this.protocol.sendDisconnect(); this.protocol.destroy();
this.protocol.dispose();
this.protocol.getUnderlyingSocket().destroy();
} }
protected doReconnect(socket: ISocket, buffer: VSBuffer): void { protected doReconnect(socket: ISocket, buffer: VSBuffer): void {
@ -99,35 +96,34 @@ export class ExtensionHostConnection extends Connection {
} }
protected doDispose(): void { protected doDispose(): void {
this.protocol.destroy();
if (this.process) { if (this.process) {
this.process.kill(); this.process.kill();
} }
this.protocol.getUnderlyingSocket().destroy();
} }
protected doReconnect(socket: ISocket, buffer: VSBuffer): void { protected doReconnect(socket: ISocket, buffer: VSBuffer): void {
// This is just to set the new socket. this.protocol.setSocket(socket);
this.protocol.beginAcceptReconnection(socket, null);
this.protocol.dispose(); this.protocol.dispose();
this.sendInitMessage(buffer); this.sendInitMessage(buffer);
} }
private sendInitMessage(buffer: VSBuffer): void { private sendInitMessage(buffer: VSBuffer): void {
const socket = this.protocol.getUnderlyingSocket(); if (!this.process) {
socket.pause(); throw new Error("Tried to initialize VS Code before spawning");
}
const wrapperSocket = this.protocol.getSocket(); this.protocol.getUnderlyingSocket().pause();
this.logger.trace('Sending socket'); this.logger.debug('Sending socket');
this.process!.send({ // Process must be set at this point.
this.process.send({
type: 'VSCODE_EXTHOST_IPC_SOCKET', type: 'VSCODE_EXTHOST_IPC_SOCKET',
initialDataChunk: Buffer.from(buffer.buffer).toString('base64'), initialDataChunk: Buffer.from(buffer.buffer).toString('base64'),
skipWebSocketFrames: !(wrapperSocket instanceof WebSocketNodeSocket), skipWebSocketFrames: this.protocol.options.skipWebSocketFrames,
permessageDeflate: this.protocol.options.permessageDeflate, permessageDeflate: this.protocol.options.permessageDeflate,
inflateBytes: wrapperSocket instanceof WebSocketNodeSocket inflateBytes: this.protocol.inflateBytes,
? Buffer.from(wrapperSocket.recordedInflateBytes.buffer).toString('base64') }, this.protocol.getUnderlyingSocket());
: undefined,
}, socket);
} }
private async spawn(locale: string, buffer: VSBuffer): Promise<cp.ChildProcess> { private async spawn(locale: string, buffer: VSBuffer): Promise<cp.ChildProcess> {

View File

@ -1,21 +1,31 @@
import { field } from '@coder/logger'; import { field, logger, Logger } from '@coder/logger';
import * as net from 'net'; import * as net from 'net';
import { VSBuffer } from 'vs/base/common/buffer'; import { VSBuffer } from 'vs/base/common/buffer';
import { PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net'; import { PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net';
import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net'; import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { AuthRequest, ConnectionTypeRequest, HandshakeMessage } from 'vs/platform/remote/common/remoteAgentConnection'; import { AuthRequest, ConnectionTypeRequest, HandshakeMessage } from 'vs/platform/remote/common/remoteAgentConnection';
import { logger } from 'vs/server/node/logger';
export interface SocketOptions { export interface SocketOptions {
/** The token is how we identify and connect to existing sessions. */
readonly reconnectionToken: string; readonly reconnectionToken: string;
/** Specifies that the client is trying to reconnect. */
readonly reconnection: boolean; readonly reconnection: boolean;
/** If true assume this is not a web socket (always false for code-server). */
readonly skipWebSocketFrames: boolean; readonly skipWebSocketFrames: boolean;
/** Whether to support compression (web socket only). */
readonly permessageDeflate?: boolean; readonly permessageDeflate?: boolean;
/**
* Seed zlib with these bytes (web socket only). If parts of inflating was
* done in a different zlib instance we need to pass all those bytes into zlib
* otherwise the inflate might hit an inflated portion referencing a distance
* too far back.
*/
readonly inflateBytes?: VSBuffer; readonly inflateBytes?: VSBuffer;
readonly recordInflateBytes?: boolean;
} }
export class Protocol extends PersistentProtocol { export class Protocol extends PersistentProtocol {
private readonly logger: Logger;
public constructor(socket: net.Socket, public readonly options: SocketOptions) { public constructor(socket: net.Socket, public readonly options: SocketOptions) {
super( super(
options.skipWebSocketFrames options.skipWebSocketFrames
@ -24,9 +34,12 @@ export class Protocol extends PersistentProtocol {
new NodeSocket(socket), new NodeSocket(socket),
options.permessageDeflate || false, options.permessageDeflate || false,
options.inflateBytes || null, options.inflateBytes || null,
options.recordInflateBytes || false, // Always record inflate bytes if using permessage-deflate.
options.permessageDeflate || false,
), ),
); );
this.logger = logger.named('protocol', field('token', this.options.reconnectionToken));
} }
public getUnderlyingSocket(): net.Socket { public getUnderlyingSocket(): net.Socket {
@ -40,17 +53,17 @@ export class Protocol extends PersistentProtocol {
* Perform a handshake to get a connection request. * Perform a handshake to get a connection request.
*/ */
public handshake(): Promise<ConnectionTypeRequest> { public handshake(): Promise<ConnectionTypeRequest> {
logger.trace('Protocol handshake', field('token', this.options.reconnectionToken)); this.logger.debug('Initiating handshake...');
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const timeout = setTimeout(() => { const timeout = setTimeout(() => {
logger.error('Handshake timed out', field('token', this.options.reconnectionToken)); this.logger.debug('Handshake timed out');
reject(new Error('timed out')); reject(new Error('protocol handshake timed out'));
}, 10000); // Matches the client timeout. }, 10000); // Matches the client timeout.
const handler = this.onControlMessage((rawMessage) => { const handler = this.onControlMessage((rawMessage) => {
try { try {
const raw = rawMessage.toString(); const raw = rawMessage.toString();
logger.trace('Protocol message', field('token', this.options.reconnectionToken), field('message', raw)); this.logger.trace('Got message', field('message', raw));
const message = JSON.parse(raw); const message = JSON.parse(raw);
switch (message.type) { switch (message.type) {
case 'auth': case 'auth':
@ -58,6 +71,7 @@ export class Protocol extends PersistentProtocol {
case 'connectionType': case 'connectionType':
handler.dispose(); handler.dispose();
clearTimeout(timeout); clearTimeout(timeout);
this.logger.debug('Handshake completed');
return resolve(message); return resolve(message);
default: default:
throw new Error('Unrecognized message type'); throw new Error('Unrecognized message type');
@ -90,10 +104,38 @@ export class Protocol extends PersistentProtocol {
} }
/** /**
* Send a handshake message. In the case of the extension host, it just sends * Send a handshake message. In the case of the extension host it should just
* back a debug port. * send a debug port.
*/ */
public sendMessage(message: HandshakeMessage | { debugPort?: number } ): void { public sendMessage(message: HandshakeMessage | { debugPort?: number | null } ): void {
this.sendControl(VSBuffer.fromString(JSON.stringify(message))); this.sendControl(VSBuffer.fromString(JSON.stringify(message)));
} }
/**
* Disconnect and dispose everything including the underlying socket.
*/
public destroy(reason?: string): void {
try {
if (reason) {
this.sendMessage({ type: 'error', reason });
}
// If still connected try notifying the client.
this.sendDisconnect();
} catch (error) {
// I think the write might fail if already disconnected.
this.logger.warn(error.message || error);
}
this.dispose(); // This disposes timers and socket event handlers.
this.getSocket().dispose(); // This will destroy() the socket.
}
/**
* Get inflateBytes in base64 format from the current socket.
*/
public get inflateBytes(): string | undefined {
const socket = this.getSocket();
return socket instanceof WebSocketNodeSocket
? Buffer.from(socket.recordedInflateBytes.buffer).toString('base64')
: undefined;
}
} }

View File

@ -123,14 +123,11 @@ export class Vscode {
reconnection: query.reconnection === 'true', reconnection: query.reconnection === 'true',
skipWebSocketFrames: query.skipWebSocketFrames === 'true', skipWebSocketFrames: query.skipWebSocketFrames === 'true',
permessageDeflate, permessageDeflate,
recordInflateBytes: permessageDeflate,
}); });
try { try {
await this.connect(await protocol.handshake(), protocol); await this.connect(await protocol.handshake(), protocol);
} catch (error) { } catch (error) {
protocol.sendMessage({ type: 'error', reason: error.message }); protocol.destroy(error.message);
protocol.dispose();
protocol.getSocket().dispose();
} }
return true; return true;
} }