http-tunnel-client/lib.js

157 lines
4.3 KiB
JavaScript
Raw Normal View History

2022-06-24 23:44:12 +07:00
const stream = require("stream");
class TunnelRequest extends stream.Readable {
constructor({ socket, requestId }) {
super();
this._socket = socket;
this._requestId = requestId;
2022-06-24 23:44:12 +07:00
const onRequestPipe = (requestId, data) => {
if (this._requestId === requestId) {
this.push(data);
}
};
2022-06-24 23:44:12 +07:00
const onRequestPipes = (requestId, data) => {
if (this._requestId === requestId) {
data.forEach((chunk) => {
this.push(chunk);
});
}
};
2022-06-24 23:44:12 +07:00
const onRequestPipeError = (requestId, error) => {
if (this._requestId === requestId) {
this._socket.off("request-pipe", onRequestPipe);
this._socket.off("request-pipes", onRequestPipes);
this._socket.off("request-pipe-error", onRequestPipeError);
this._socket.off("request-pipe-end", onRequestPipeEnd);
this.destroy(new Error(error));
}
};
2022-06-24 23:44:12 +07:00
const onRequestPipeEnd = (requestId, data) => {
if (this._requestId === requestId) {
this._socket.off("request-pipe", onRequestPipe);
this._socket.off("request-pipes", onRequestPipes);
this._socket.off("request-pipe-error", onRequestPipeError);
this._socket.off("request-pipe-end", onRequestPipeEnd);
if (data) {
this.push(data);
}
this.push(null);
}
};
2022-06-24 23:44:12 +07:00
this._socket.on("request-pipe", onRequestPipe);
this._socket.on("request-pipes", onRequestPipes);
this._socket.on("request-pipe-error", onRequestPipeError);
this._socket.on("request-pipe-end", onRequestPipeEnd);
}
_read() {}
}
class TunnelResponse extends stream.Duplex {
constructor({ socket, responseId, duplex }) {
super();
this._socket = socket;
this._responseId = responseId;
2022-06-24 23:44:12 +07:00
if (duplex) {
// for websocket request: bidirection
const onResponsePipe = (responseId, data) => {
if (this._responseId === responseId) {
this.push(data);
}
};
2022-06-24 23:44:12 +07:00
const onResponsePipes = (responseId, data) => {
if (this._responseId === responseId) {
data.forEach((chunk) => {
this.push(chunk);
});
}
};
2022-06-24 23:44:12 +07:00
const onResponsePipeError = (responseId, error) => {
if (this._responseId === responseId) {
this._socket.off("response-pipe", onResponsePipe);
this._socket.off("response-pipes", onResponsePipes);
this._socket.off("response-pipe-error", onResponsePipeError);
this._socket.off("response-pipe-end", onResponsePipeEnd);
this.destroy(new Error(error));
}
};
2022-06-24 23:44:12 +07:00
const onResponsePipeEnd = (responseId, data) => {
if (this._responseId === responseId) {
this._socket.off("response-pipe", onResponsePipe);
this._socket.off("response-pipes", onResponsePipes);
this._socket.off("response-pipe-error", onResponsePipeError);
this._socket.off("response-pipe-end", onResponsePipeEnd);
if (data) {
this.push(data);
}
this.push(null);
}
};
2022-06-24 23:44:12 +07:00
this._socket.on("response-pipe", onResponsePipe);
this._socket.on("response-pipes", onResponsePipes);
this._socket.on("response-pipe-error", onResponsePipeError);
this._socket.on("response-pipe-end", onResponsePipeEnd);
}
}
_write(chunk, encoding, callback) {
this._socket.emit("response-pipe", this._responseId, chunk);
this._socket.io.engine.once("drain", () => {
callback();
});
}
_writev(chunks, callback) {
this._socket.emit("response-pipes", this._responseId, chunks);
this._socket.io.engine.once("drain", () => {
callback();
});
}
_final(callback) {
this._socket.emit("response-pipe-end", this._responseId);
this._socket.io.engine.once("drain", () => {
callback();
});
}
_destroy(e, callback) {
if (e) {
this._socket.emit(
"response-pipe-error",
this._responseId,
e && e.message
);
this._socket.io.engine.once("drain", () => {
callback();
});
return;
}
callback();
}
writeHead(statusCode, statusMessage, headers, httpVersion) {
this._socket.emit("response", this._responseId, {
statusCode,
statusMessage,
headers,
httpVersion,
});
}
_read(size) {}
}
exports.TunnelRequest = TunnelRequest;
exports.TunnelResponse = TunnelResponse;