Files
2025-03-28 12:30:19 +11:00

207 lines
4.8 KiB
JavaScript

import crypto from "crypto";
import fs from "fs";
import os from "os";
import path from "path";
export class ReadAfterDestroyedError extends Error {}
export class ReadStream extends fs.ReadStream {
constructor(writeStream, name) {
super("", {});
this.name = name;
this._writeStream = writeStream;
this.error = this._writeStream.error;
this.addListener("error", error => {
this.error = error;
});
this.open();
}
get ended() {
return this._readableState.ended;
}
_read(n) {
if (typeof this.fd !== "number")
return this.once("open", function() {
this._read(n);
});
if (this._writeStream.finished || this._writeStream.closed)
return super._read(n);
const unread = this._writeStream.bytesWritten - this.bytesRead;
if (unread === 0) {
const retry = () => {
this._writeStream.removeListener("finish", retry);
this._writeStream.removeListener("write", retry);
this._read(n);
};
this._writeStream.addListener("finish", retry);
this._writeStream.addListener("write", retry);
return;
}
return super._read(Math.min(n, unread));
}
_destroy(error, callback) {
if (typeof this.fd !== "number") {
this.once("open", this._destroy.bind(this, error, callback));
return;
}
fs.close(this.fd, closeError => {
callback(closeError || error);
this.fd = null;
this.closed = true;
this.emit("close");
});
}
open() {
if (!this._writeStream) return;
if (typeof this._writeStream.fd !== "number") {
this._writeStream.once("open", () => this.open());
return;
}
this.path = this._writeStream.path;
super.open();
}
}
export class WriteStream extends fs.WriteStream {
constructor() {
super("", {
autoClose: false
});
this._readStreams = new Set();
this.error = null;
this._cleanupSync = () => {
process.removeListener("exit", this._cleanupSync);
process.removeListener("SIGINT", this._cleanupSync);
if (typeof this.fd === "number")
try {
fs.closeSync(this.fd);
} catch (error) {}
try {
fs.unlinkSync(this.path);
} catch (error) {}
};
}
get finished() {
return this._writableState.finished;
}
open() {
crypto.randomBytes(16, (error, buffer) => {
if (error) {
this.destroy(error);
return;
}
this.path = path.join(
os.tmpdir(),
`capacitor-${buffer.toString("hex")}.tmp`
);
fs.open(this.path, "wx", this.mode, (error, fd) => {
if (error) {
this.destroy(error);
return;
}
process.addListener("exit", this._cleanupSync);
process.addListener("SIGINT", this._cleanupSync);
this.fd = fd;
this.emit("open", fd);
this.emit("ready");
});
});
}
_write(chunk, encoding, callback) {
super._write(chunk, encoding, error => {
if (!error) this.emit("write");
callback(error);
});
}
_destroy(error, callback) {
if (typeof this.fd !== "number") {
this.once("open", this._destroy.bind(this, error, callback));
return;
}
process.removeListener("exit", this._cleanupSync);
process.removeListener("SIGINT", this._cleanupSync);
const unlink = error => {
fs.unlink(this.path, unlinkError => {
callback(unlinkError || error);
this.fd = null;
this.closed = true;
this.emit("close");
});
};
if (typeof this.fd === "number") {
fs.close(this.fd, closeError => {
unlink(closeError || error);
});
return;
}
unlink(error);
}
destroy(error, callback) {
if (error) this.error = error;
if (this.destroyed) return super.destroy(error, callback);
if (typeof callback === "function")
this.once("close", callback.bind(this, error));
if (this._readStreams.size === 0) {
super.destroy(error, callback);
return;
}
this._destroyPending = true;
if (error)
for (let readStream of this._readStreams) readStream.destroy(error);
}
createReadStream(name) {
if (this.destroyed)
throw new ReadAfterDestroyedError(
"A ReadStream cannot be created from a destroyed WriteStream."
);
const readStream = new ReadStream(this, name);
this._readStreams.add(readStream);
const remove = () => {
this._deleteReadStream(readStream);
readStream.removeListener("end", remove);
readStream.removeListener("close", remove);
};
readStream.addListener("end", remove);
readStream.addListener("close", remove);
return readStream;
}
_deleteReadStream(readStream) {
if (this._readStreams.delete(readStream) && this._destroyPending)
this.destroy();
}
}
export default WriteStream;