147 lines
4.3 KiB
JavaScript
147 lines
4.3 KiB
JavaScript
|
const stream = require("stream");
|
|||
|
|
|||
|
class TunnelRequest extends stream.Readable {
|
|||
|
constructor({ socket, requestId }) {
|
|||
|
super();
|
|||
|
this._socket = socket;
|
|||
|
this._requestId = requestId;
|
|||
|
const onRequestPipe = (requestId, data) => {
|
|||
|
if (this._requestId === requestId) {
|
|||
|
this.push(data);
|
|||
|
}
|
|||
|
};
|
|||
|
const onRequestPipes = (requestId, data) => {
|
|||
|
if (this._requestId === requestId) {
|
|||
|
data.forEach((chunk) => {
|
|||
|
this.push(chunk);
|
|||
|
});
|
|||
|
}
|
|||
|
};
|
|||
|
const onRequestPipeError = (requestId, error) => {
|
|||
|
if (this._requestId === requestId) {
|
|||
|
this._socket.off("request-pipe", onRequestPipe);
|
|||
|
this._socket.off("request-pipes", onRequestPipes);
|
|||
|
this._socket.off("request-pipe-error", onRequestPipeError);
|
|||
|
this._socket.off("request-pipe-end", onRequestPipeEnd);
|
|||
|
this.destroy(new Error(error));
|
|||
|
}
|
|||
|
};
|
|||
|
const onRequestPipeEnd = (requestId, data) => {
|
|||
|
if (this._requestId === requestId) {
|
|||
|
this._socket.off("request-pipe", onRequestPipe);
|
|||
|
this._socket.off("request-pipes", onRequestPipes);
|
|||
|
this._socket.off("request-pipe-error", onRequestPipeError);
|
|||
|
this._socket.off("request-pipe-end", onRequestPipeEnd);
|
|||
|
if (data) {
|
|||
|
this.push(data);
|
|||
|
}
|
|||
|
this.push(null);
|
|||
|
}
|
|||
|
};
|
|||
|
this._socket.on("request-pipe", onRequestPipe);
|
|||
|
this._socket.on("request-pipes", onRequestPipes);
|
|||
|
this._socket.on("request-pipe-error", onRequestPipeError);
|
|||
|
this._socket.on("request-pipe-end", onRequestPipeEnd);
|
|||
|
}
|
|||
|
|
|||
|
_read() {}
|
|||
|
}
|
|||
|
|
|||
|
class TunnelResponse extends stream.Duplex {
|
|||
|
constructor({ socket, responseId, duplex }) {
|
|||
|
super();
|
|||
|
this._socket = socket;
|
|||
|
this._responseId = responseId;
|
|||
|
if (duplex) {
|
|||
|
// for websocket request: bidirection
|
|||
|
const onResponsePipe = (responseId, data) => {
|
|||
|
if (this._responseId === responseId) {
|
|||
|
this.push(data);
|
|||
|
}
|
|||
|
};
|
|||
|
const onResponsePipes = (responseId, data) => {
|
|||
|
if (this._responseId === responseId) {
|
|||
|
data.forEach((chunk) => {
|
|||
|
this.push(chunk);
|
|||
|
});
|
|||
|
}
|
|||
|
};
|
|||
|
const onResponsePipeError = (responseId, error) => {
|
|||
|
if (this._responseId === responseId) {
|
|||
|
this._socket.off("response-pipe", onResponsePipe);
|
|||
|
this._socket.off("response-pipes", onResponsePipes);
|
|||
|
this._socket.off("response-pipe-error", onResponsePipeError);
|
|||
|
this._socket.off("response-pipe-end", onResponsePipeEnd);
|
|||
|
this.destroy(new Error(error));
|
|||
|
}
|
|||
|
};
|
|||
|
const onResponsePipeEnd = (responseId, data) => {
|
|||
|
if (this._responseId === responseId) {
|
|||
|
this._socket.off("response-pipe", onResponsePipe);
|
|||
|
this._socket.off("response-pipes", onResponsePipes);
|
|||
|
this._socket.off("response-pipe-error", onResponsePipeError);
|
|||
|
this._socket.off("response-pipe-end", onResponsePipeEnd);
|
|||
|
if (data) {
|
|||
|
this.push(data);
|
|||
|
}
|
|||
|
this.push(null);
|
|||
|
}
|
|||
|
};
|
|||
|
this._socket.on("response-pipe", onResponsePipe);
|
|||
|
this._socket.on("response-pipes", onResponsePipes);
|
|||
|
this._socket.on("response-pipe-error", onResponsePipeError);
|
|||
|
this._socket.on("response-pipe-end", onResponsePipeEnd);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
_write(chunk, encoding, callback) {
|
|||
|
this._socket.emit("response-pipe", this._responseId, chunk);
|
|||
|
this._socket.io.engine.once("drain", () => {
|
|||
|
callback();
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
_writev(chunks, callback) {
|
|||
|
this._socket.emit("response-pipes", this._responseId, chunks);
|
|||
|
this._socket.io.engine.once("drain", () => {
|
|||
|
callback();
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
_final(callback) {
|
|||
|
this._socket.emit("response-pipe-end", this._responseId);
|
|||
|
this._socket.io.engine.once("drain", () => {
|
|||
|
callback();
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
_destroy(e, callback) {
|
|||
|
if (e) {
|
|||
|
this._socket.emit(
|
|||
|
"response-pipe-error",
|
|||
|
this._responseId,
|
|||
|
e && e.message
|
|||
|
);
|
|||
|
this._socket.io.engine.once("drain", () => {
|
|||
|
callback();
|
|||
|
});
|
|||
|
return;
|
|||
|
}
|
|||
|
callback();
|
|||
|
}
|
|||
|
|
|||
|
writeHead(statusCode, statusMessage, headers, httpVersion) {
|
|||
|
this._socket.emit("response", this._responseId, {
|
|||
|
statusCode,
|
|||
|
statusMessage,
|
|||
|
headers,
|
|||
|
httpVersion,
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
_read(size) {}
|
|||
|
}
|
|||
|
|
|||
|
exports.TunnelRequest = TunnelRequest;
|
|||
|
exports.TunnelResponse = TunnelResponse;
|