Initial Save

This commit is contained in:
jackbeeby
2025-03-28 12:30:19 +11:00
parent e381994f19
commit d8773925e8
9910 changed files with 982718 additions and 0 deletions

230
node_modules/fs-capacitor/lib/index.js generated vendored Normal file
View File

@@ -0,0 +1,230 @@
"use strict";
exports.__esModule = true;
exports.default = exports.WriteStream = exports.ReadStream = exports.ReadAfterDestroyedError = void 0;
var _crypto = _interopRequireDefault(require("crypto"));
var _fs = _interopRequireDefault(require("fs"));
var _os = _interopRequireDefault(require("os"));
var _path = _interopRequireDefault(require("path"));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : { default: obj };
}
class ReadAfterDestroyedError extends Error {}
exports.ReadAfterDestroyedError = ReadAfterDestroyedError;
class ReadStream extends _fs.default.ReadStream {
constructor(writeStream, name) {
super("", {});
this.name = name;
this._writeStream = writeStream;
this.error = this._writeStream.error;
this.addListener("error", error => {
this.error = error;
});
this.open();
}
get ended() {
return this._readableState.ended;
}
_read(n) {
if (typeof this.fd !== "number")
return this.once("open", function() {
this._read(n);
});
if (this._writeStream.finished || this._writeStream.closed)
return super._read(n);
const unread = this._writeStream.bytesWritten - this.bytesRead;
if (unread === 0) {
const retry = () => {
this._writeStream.removeListener("finish", retry);
this._writeStream.removeListener("write", retry);
this._read(n);
};
this._writeStream.addListener("finish", retry);
this._writeStream.addListener("write", retry);
return;
}
return super._read(Math.min(n, unread));
}
_destroy(error, callback) {
if (typeof this.fd !== "number") {
this.once("open", this._destroy.bind(this, error, callback));
return;
}
_fs.default.close(this.fd, closeError => {
callback(closeError || error);
this.fd = null;
this.closed = true;
this.emit("close");
});
}
open() {
if (!this._writeStream) return;
if (typeof this._writeStream.fd !== "number") {
this._writeStream.once("open", () => this.open());
return;
}
this.path = this._writeStream.path;
super.open();
}
}
exports.ReadStream = ReadStream;
class WriteStream extends _fs.default.WriteStream {
constructor() {
super("", {
autoClose: false
});
this._readStreams = new Set();
this.error = null;
this._cleanupSync = () => {
process.removeListener("exit", this._cleanupSync);
process.removeListener("SIGINT", this._cleanupSync);
if (typeof this.fd === "number")
try {
_fs.default.closeSync(this.fd);
} catch (error) {}
try {
_fs.default.unlinkSync(this.path);
} catch (error) {}
};
}
get finished() {
return this._writableState.finished;
}
open() {
_crypto.default.randomBytes(16, (error, buffer) => {
if (error) {
this.destroy(error);
return;
}
this.path = _path.default.join(
_os.default.tmpdir(),
`capacitor-${buffer.toString("hex")}.tmp`
);
_fs.default.open(this.path, "wx", this.mode, (error, fd) => {
if (error) {
this.destroy(error);
return;
}
process.addListener("exit", this._cleanupSync);
process.addListener("SIGINT", this._cleanupSync);
this.fd = fd;
this.emit("open", fd);
this.emit("ready");
});
});
}
_write(chunk, encoding, callback) {
super._write(chunk, encoding, error => {
if (!error) this.emit("write");
callback(error);
});
}
_destroy(error, callback) {
if (typeof this.fd !== "number") {
this.once("open", this._destroy.bind(this, error, callback));
return;
}
process.removeListener("exit", this._cleanupSync);
process.removeListener("SIGINT", this._cleanupSync);
const unlink = error => {
_fs.default.unlink(this.path, unlinkError => {
callback(unlinkError || error);
this.fd = null;
this.closed = true;
this.emit("close");
});
};
if (typeof this.fd === "number") {
_fs.default.close(this.fd, closeError => {
unlink(closeError || error);
});
return;
}
unlink(error);
}
destroy(error, callback) {
if (error) this.error = error;
if (this.destroyed) return super.destroy(error, callback);
if (typeof callback === "function")
this.once("close", callback.bind(this, error));
if (this._readStreams.size === 0) {
super.destroy(error, callback);
return;
}
this._destroyPending = true;
if (error)
for (let readStream of this._readStreams) readStream.destroy(error);
}
createReadStream(name) {
if (this.destroyed)
throw new ReadAfterDestroyedError(
"A ReadStream cannot be created from a destroyed WriteStream."
);
const readStream = new ReadStream(this, name);
this._readStreams.add(readStream);
const remove = () => {
this._deleteReadStream(readStream);
readStream.removeListener("end", remove);
readStream.removeListener("close", remove);
};
readStream.addListener("end", remove);
readStream.addListener("close", remove);
return readStream;
}
_deleteReadStream(readStream) {
if (this._readStreams.delete(readStream) && this._destroyPending)
this.destroy();
}
}
exports.WriteStream = WriteStream;
var _default = WriteStream;
exports.default = _default;

206
node_modules/fs-capacitor/lib/index.mjs generated vendored Normal file
View File

@@ -0,0 +1,206 @@
import crypto from "crypto";
import fs from "fs";
import os from "os";
import path from "path";
export class ReadAfterDestroyedError extends Error {}
export class ReadStream extends fs.ReadStream {
constructor(writeStream, name) {
super("", {});
this.name = name;
this._writeStream = writeStream;
this.error = this._writeStream.error;
this.addListener("error", error => {
this.error = error;
});
this.open();
}
get ended() {
return this._readableState.ended;
}
_read(n) {
if (typeof this.fd !== "number")
return this.once("open", function() {
this._read(n);
});
if (this._writeStream.finished || this._writeStream.closed)
return super._read(n);
const unread = this._writeStream.bytesWritten - this.bytesRead;
if (unread === 0) {
const retry = () => {
this._writeStream.removeListener("finish", retry);
this._writeStream.removeListener("write", retry);
this._read(n);
};
this._writeStream.addListener("finish", retry);
this._writeStream.addListener("write", retry);
return;
}
return super._read(Math.min(n, unread));
}
_destroy(error, callback) {
if (typeof this.fd !== "number") {
this.once("open", this._destroy.bind(this, error, callback));
return;
}
fs.close(this.fd, closeError => {
callback(closeError || error);
this.fd = null;
this.closed = true;
this.emit("close");
});
}
open() {
if (!this._writeStream) return;
if (typeof this._writeStream.fd !== "number") {
this._writeStream.once("open", () => this.open());
return;
}
this.path = this._writeStream.path;
super.open();
}
}
export class WriteStream extends fs.WriteStream {
constructor() {
super("", {
autoClose: false
});
this._readStreams = new Set();
this.error = null;
this._cleanupSync = () => {
process.removeListener("exit", this._cleanupSync);
process.removeListener("SIGINT", this._cleanupSync);
if (typeof this.fd === "number")
try {
fs.closeSync(this.fd);
} catch (error) {}
try {
fs.unlinkSync(this.path);
} catch (error) {}
};
}
get finished() {
return this._writableState.finished;
}
open() {
crypto.randomBytes(16, (error, buffer) => {
if (error) {
this.destroy(error);
return;
}
this.path = path.join(
os.tmpdir(),
`capacitor-${buffer.toString("hex")}.tmp`
);
fs.open(this.path, "wx", this.mode, (error, fd) => {
if (error) {
this.destroy(error);
return;
}
process.addListener("exit", this._cleanupSync);
process.addListener("SIGINT", this._cleanupSync);
this.fd = fd;
this.emit("open", fd);
this.emit("ready");
});
});
}
_write(chunk, encoding, callback) {
super._write(chunk, encoding, error => {
if (!error) this.emit("write");
callback(error);
});
}
_destroy(error, callback) {
if (typeof this.fd !== "number") {
this.once("open", this._destroy.bind(this, error, callback));
return;
}
process.removeListener("exit", this._cleanupSync);
process.removeListener("SIGINT", this._cleanupSync);
const unlink = error => {
fs.unlink(this.path, unlinkError => {
callback(unlinkError || error);
this.fd = null;
this.closed = true;
this.emit("close");
});
};
if (typeof this.fd === "number") {
fs.close(this.fd, closeError => {
unlink(closeError || error);
});
return;
}
unlink(error);
}
destroy(error, callback) {
if (error) this.error = error;
if (this.destroyed) return super.destroy(error, callback);
if (typeof callback === "function")
this.once("close", callback.bind(this, error));
if (this._readStreams.size === 0) {
super.destroy(error, callback);
return;
}
this._destroyPending = true;
if (error)
for (let readStream of this._readStreams) readStream.destroy(error);
}
createReadStream(name) {
if (this.destroyed)
throw new ReadAfterDestroyedError(
"A ReadStream cannot be created from a destroyed WriteStream."
);
const readStream = new ReadStream(this, name);
this._readStreams.add(readStream);
const remove = () => {
this._deleteReadStream(readStream);
readStream.removeListener("end", remove);
readStream.removeListener("close", remove);
};
readStream.addListener("end", remove);
readStream.addListener("close", remove);
return readStream;
}
_deleteReadStream(readStream) {
if (this._readStreams.delete(readStream) && this._destroyPending)
this.destroy();
}
}
export default WriteStream;

374
node_modules/fs-capacitor/lib/test.js generated vendored Normal file
View File

@@ -0,0 +1,374 @@
"use strict";
require("leaked-handles");
var _fs = _interopRequireDefault(require("fs"));
var _stream = _interopRequireDefault(require("stream"));
var _tap = _interopRequireDefault(require("tap"));
var _ = _interopRequireDefault(require("."));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : { default: obj };
}
const streamToString = stream =>
new Promise((resolve, reject) => {
let ended = false;
let data = "";
stream
.on("error", reject)
.on("data", chunk => {
if (ended) throw new Error("`data` emitted after `end`");
data += chunk;
})
.on("end", () => {
ended = true;
resolve(data);
});
});
const waitForBytesWritten = (stream, bytes, resolve) => {
if (stream.bytesWritten >= bytes) {
setImmediate(resolve);
return;
}
setImmediate(() => waitForBytesWritten(stream, bytes, resolve));
};
_tap.default.test("Data from a complete stream.", async t => {
let data = "";
const source = new _stream.default.Readable({
read() {}
});
const chunk1 = "1".repeat(10);
source.push(chunk1);
source.push(null);
data += chunk1;
let capacitor1 = new _.default();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
source.pipe(capacitor1);
const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
const result = await streamToString(capacitor1Stream1);
t.sameStrict(result, data, "should stream all data");
t.sameStrict(
capacitor1._readStreams.size,
0,
"should no longer have any attacheds read streams"
);
});
_tap.default.test(
"Data from an open stream, 1 chunk, no read streams.",
async t => {
let data = "";
const source = new _stream.default.Readable({
read() {}
});
let capacitor1 = new _.default();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
source.pipe(capacitor1);
const chunk1 = "1".repeat(10);
source.push(chunk1);
source.push(null);
data += chunk1;
const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
const result = await streamToString(capacitor1Stream1);
t.sameStrict(result, data, "should stream all data");
t.sameStrict(
capacitor1._readStreams.size,
0,
"should no longer have any attacheds read streams"
);
}
);
_tap.default.test(
"Data from an open stream, 1 chunk, 1 read stream.",
async t => {
let data = "";
const source = new _stream.default.Readable({
read() {}
});
let capacitor1 = new _.default();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
source.pipe(capacitor1);
const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
const chunk1 = "1".repeat(10);
source.push(chunk1);
source.push(null);
data += chunk1;
const result = await streamToString(capacitor1Stream1);
t.sameStrict(result, data, "should stream all data");
t.sameStrict(
capacitor1._readStreams.size,
0,
"should no longer have any attacheds read streams"
);
}
);
const withChunkSize = size =>
_tap.default.test(`--- with chunk size: ${size}`, async t => {
let data = "";
const source = new _stream.default.Readable({
read() {}
});
let capacitor1;
let capacitor1Stream1;
await t.test(
"can add a read stream before any data has been written",
async t => {
capacitor1 = new _.default();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
await t.test("creates a temporary file", async t => {
t.plan(3);
await new Promise(resolve => capacitor1.on("open", resolve));
t.type(
capacitor1.path,
"string",
"capacitor1.path should be a string"
);
t.type(capacitor1.fd, "number", "capacitor1.fd should be a number");
t.ok(_fs.default.existsSync(capacitor1.path), "creates a temp file");
});
}
);
source.pipe(capacitor1);
const chunk1 = "1".repeat(size);
source.push(chunk1);
data += chunk1;
await new Promise(resolve =>
waitForBytesWritten(capacitor1, size, resolve)
);
let capacitor1Stream2;
t.test("can add a read stream after data has been written", t => {
capacitor1Stream2 = capacitor1.createReadStream("capacitor1Stream2");
t.strictSame(
capacitor1._readStreams.size,
2,
"should attach a new read stream after first write"
);
t.end();
});
const writeEventBytesWritten = new Promise(resolve => {
capacitor1.once("write", () => {
resolve(capacitor1.bytesWritten);
});
});
const chunk2 = "2".repeat(size);
source.push(chunk2);
data += chunk2;
await new Promise(resolve =>
waitForBytesWritten(capacitor1, 2 * size, resolve)
);
await t.test("write event emitted after bytes are written", async t => {
t.strictSame(
await writeEventBytesWritten,
2 * size,
"bytesWritten should include new chunk"
);
});
const finished = new Promise(resolve => capacitor1.once("finish", resolve));
source.push(null);
await finished;
let capacitor1Stream3;
let capacitor1Stream4;
t.test("can create a read stream after the source has ended", t => {
capacitor1Stream3 = capacitor1.createReadStream("capacitor1Stream3");
capacitor1Stream4 = capacitor1.createReadStream("capacitor1Stream4");
t.strictSame(
capacitor1._readStreams.size,
4,
"should attach new read streams after end"
);
t.end();
});
await t.test("streams complete data to a read stream", async t => {
const result2 = await streamToString(capacitor1Stream2);
t.strictSame(
capacitor1Stream2.ended,
true,
"should mark read stream as ended"
);
t.strictSame(result2, data, "should stream complete data");
const result4 = await streamToString(capacitor1Stream4);
t.strictSame(
capacitor1Stream4.ended,
true,
"should mark read stream as ended"
);
t.strictSame(result4, data, "should stream complete data");
t.strictSame(
capacitor1._readStreams.size,
2,
"should detach an ended read stream"
);
});
await t.test("can destroy a read stream", async t => {
await new Promise(resolve => {
capacitor1Stream1.once("error", resolve);
capacitor1Stream1.destroy(new Error("test"));
});
t.strictSame(
capacitor1Stream1.destroyed,
true,
"should mark read stream as destroyed"
);
t.type(
capacitor1Stream1.error,
Error,
"should store an error on read stream"
);
t.strictSame(
capacitor1._readStreams.size,
1,
"should detach a destroyed read stream"
);
});
t.test("can delay destruction of a capacitor", t => {
capacitor1.destroy(null);
t.strictSame(
capacitor1.destroyed,
false,
"should not destroy while read streams exist"
);
t.strictSame(
capacitor1._destroyPending,
true,
"should mark for future destruction"
);
t.end();
});
await t.test("destroys capacitor once no read streams exist", async t => {
const readStreamDestroyed = new Promise(resolve =>
capacitor1Stream3.on("close", resolve)
);
const capacitorDestroyed = new Promise(resolve =>
capacitor1.on("close", resolve)
);
capacitor1Stream3.destroy(null);
await readStreamDestroyed;
t.strictSame(
capacitor1Stream3.destroyed,
true,
"should mark read stream as destroyed"
);
t.strictSame(
capacitor1Stream3.error,
null,
"should not store an error on read stream"
);
t.strictSame(
capacitor1._readStreams.size,
0,
"should detach a destroyed read stream"
);
await capacitorDestroyed;
t.strictSame(capacitor1.closed, true, "should mark capacitor as closed");
t.strictSame(capacitor1.fd, null, "should set fd to null");
t.strictSame(
capacitor1.destroyed,
true,
"should mark capacitor as destroyed"
);
t.notOk(_fs.default.existsSync(capacitor1.path), "removes its temp file");
});
t.test("cannot create a read stream after destruction", t => {
try {
capacitor1.createReadStream();
} catch (error) {
t.ok(
error instanceof _.ReadAfterDestroyedError,
"should not create a read stream once destroyed"
);
t.end();
}
});
const capacitor2 = new _.default();
const capacitor2Stream1 = capacitor2.createReadStream("capacitor2Stream1");
const capacitor2Stream2 = capacitor2.createReadStream("capacitor2Stream2");
const capacitor2ReadStream1Destroyed = new Promise(resolve =>
capacitor2Stream1.on("close", resolve)
);
const capacitor2Destroyed = new Promise(resolve =>
capacitor2.on("close", resolve)
);
capacitor2Stream1.destroy();
await capacitor2ReadStream1Destroyed;
await t.test("propagates errors to attached read streams", async t => {
capacitor2.destroy();
await new Promise(resolve => setImmediate(resolve));
t.strictSame(
capacitor2Stream2.destroyed,
false,
"should not immediately mark attached read streams as destroyed"
);
capacitor2.destroy(new Error("test"));
await capacitor2Destroyed;
t.type(capacitor2.error, Error, "should store an error on capacitor");
t.strictSame(
capacitor2.destroyed,
true,
"should mark capacitor as destroyed"
);
t.type(
capacitor2Stream2.error,
Error,
"should store an error on attached read streams"
);
t.strictSame(
capacitor2Stream2.destroyed,
true,
"should mark attached read streams as destroyed"
);
t.strictSame(
capacitor2Stream1.error,
null,
"should not store an error on detached read streams"
);
});
});
withChunkSize(10);
withChunkSize(100000);

356
node_modules/fs-capacitor/lib/test.mjs generated vendored Normal file
View File

@@ -0,0 +1,356 @@
import "leaked-handles";
import fs from "fs";
import stream from "stream";
import t from "tap";
import WriteStream, { ReadAfterDestroyedError } from ".";
const streamToString = stream =>
new Promise((resolve, reject) => {
let ended = false;
let data = "";
stream
.on("error", reject)
.on("data", chunk => {
if (ended) throw new Error("`data` emitted after `end`");
data += chunk;
})
.on("end", () => {
ended = true;
resolve(data);
});
});
const waitForBytesWritten = (stream, bytes, resolve) => {
if (stream.bytesWritten >= bytes) {
setImmediate(resolve);
return;
}
setImmediate(() => waitForBytesWritten(stream, bytes, resolve));
};
t.test("Data from a complete stream.", async t => {
let data = "";
const source = new stream.Readable({
read() {}
});
const chunk1 = "1".repeat(10);
source.push(chunk1);
source.push(null);
data += chunk1;
let capacitor1 = new WriteStream();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
source.pipe(capacitor1);
const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
const result = await streamToString(capacitor1Stream1);
t.sameStrict(result, data, "should stream all data");
t.sameStrict(
capacitor1._readStreams.size,
0,
"should no longer have any attacheds read streams"
);
});
t.test("Data from an open stream, 1 chunk, no read streams.", async t => {
let data = "";
const source = new stream.Readable({
read() {}
});
let capacitor1 = new WriteStream();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
source.pipe(capacitor1);
const chunk1 = "1".repeat(10);
source.push(chunk1);
source.push(null);
data += chunk1;
const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
const result = await streamToString(capacitor1Stream1);
t.sameStrict(result, data, "should stream all data");
t.sameStrict(
capacitor1._readStreams.size,
0,
"should no longer have any attacheds read streams"
);
});
t.test("Data from an open stream, 1 chunk, 1 read stream.", async t => {
let data = "";
const source = new stream.Readable({
read() {}
});
let capacitor1 = new WriteStream();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
source.pipe(capacitor1);
const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
const chunk1 = "1".repeat(10);
source.push(chunk1);
source.push(null);
data += chunk1;
const result = await streamToString(capacitor1Stream1);
t.sameStrict(result, data, "should stream all data");
t.sameStrict(
capacitor1._readStreams.size,
0,
"should no longer have any attacheds read streams"
);
});
const withChunkSize = size =>
t.test(`--- with chunk size: ${size}`, async t => {
let data = "";
const source = new stream.Readable({
read() {}
});
let capacitor1;
let capacitor1Stream1;
await t.test(
"can add a read stream before any data has been written",
async t => {
capacitor1 = new WriteStream();
t.strictSame(
capacitor1._readStreams.size,
0,
"should start with 0 read streams"
);
capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
t.strictSame(
capacitor1._readStreams.size,
1,
"should attach a new read stream before receiving data"
);
await t.test("creates a temporary file", async t => {
t.plan(3);
await new Promise(resolve => capacitor1.on("open", resolve));
t.type(
capacitor1.path,
"string",
"capacitor1.path should be a string"
);
t.type(capacitor1.fd, "number", "capacitor1.fd should be a number");
t.ok(fs.existsSync(capacitor1.path), "creates a temp file");
});
}
);
source.pipe(capacitor1);
const chunk1 = "1".repeat(size);
source.push(chunk1);
data += chunk1;
await new Promise(resolve =>
waitForBytesWritten(capacitor1, size, resolve)
);
let capacitor1Stream2;
t.test("can add a read stream after data has been written", t => {
capacitor1Stream2 = capacitor1.createReadStream("capacitor1Stream2");
t.strictSame(
capacitor1._readStreams.size,
2,
"should attach a new read stream after first write"
);
t.end();
});
const writeEventBytesWritten = new Promise(resolve => {
capacitor1.once("write", () => {
resolve(capacitor1.bytesWritten);
});
});
const chunk2 = "2".repeat(size);
source.push(chunk2);
data += chunk2;
await new Promise(resolve =>
waitForBytesWritten(capacitor1, 2 * size, resolve)
);
await t.test("write event emitted after bytes are written", async t => {
t.strictSame(
await writeEventBytesWritten,
2 * size,
"bytesWritten should include new chunk"
);
});
const finished = new Promise(resolve => capacitor1.once("finish", resolve));
source.push(null);
await finished;
let capacitor1Stream3;
let capacitor1Stream4;
t.test("can create a read stream after the source has ended", t => {
capacitor1Stream3 = capacitor1.createReadStream("capacitor1Stream3");
capacitor1Stream4 = capacitor1.createReadStream("capacitor1Stream4");
t.strictSame(
capacitor1._readStreams.size,
4,
"should attach new read streams after end"
);
t.end();
});
await t.test("streams complete data to a read stream", async t => {
const result2 = await streamToString(capacitor1Stream2);
t.strictSame(
capacitor1Stream2.ended,
true,
"should mark read stream as ended"
);
t.strictSame(result2, data, "should stream complete data");
const result4 = await streamToString(capacitor1Stream4);
t.strictSame(
capacitor1Stream4.ended,
true,
"should mark read stream as ended"
);
t.strictSame(result4, data, "should stream complete data");
t.strictSame(
capacitor1._readStreams.size,
2,
"should detach an ended read stream"
);
});
await t.test("can destroy a read stream", async t => {
await new Promise(resolve => {
capacitor1Stream1.once("error", resolve);
capacitor1Stream1.destroy(new Error("test"));
});
t.strictSame(
capacitor1Stream1.destroyed,
true,
"should mark read stream as destroyed"
);
t.type(
capacitor1Stream1.error,
Error,
"should store an error on read stream"
);
t.strictSame(
capacitor1._readStreams.size,
1,
"should detach a destroyed read stream"
);
});
t.test("can delay destruction of a capacitor", t => {
capacitor1.destroy(null);
t.strictSame(
capacitor1.destroyed,
false,
"should not destroy while read streams exist"
);
t.strictSame(
capacitor1._destroyPending,
true,
"should mark for future destruction"
);
t.end();
});
await t.test("destroys capacitor once no read streams exist", async t => {
const readStreamDestroyed = new Promise(resolve =>
capacitor1Stream3.on("close", resolve)
);
const capacitorDestroyed = new Promise(resolve =>
capacitor1.on("close", resolve)
);
capacitor1Stream3.destroy(null);
await readStreamDestroyed;
t.strictSame(
capacitor1Stream3.destroyed,
true,
"should mark read stream as destroyed"
);
t.strictSame(
capacitor1Stream3.error,
null,
"should not store an error on read stream"
);
t.strictSame(
capacitor1._readStreams.size,
0,
"should detach a destroyed read stream"
);
await capacitorDestroyed;
t.strictSame(capacitor1.closed, true, "should mark capacitor as closed");
t.strictSame(capacitor1.fd, null, "should set fd to null");
t.strictSame(
capacitor1.destroyed,
true,
"should mark capacitor as destroyed"
);
t.notOk(fs.existsSync(capacitor1.path), "removes its temp file");
});
t.test("cannot create a read stream after destruction", t => {
try {
capacitor1.createReadStream();
} catch (error) {
t.ok(
error instanceof ReadAfterDestroyedError,
"should not create a read stream once destroyed"
);
t.end();
}
});
const capacitor2 = new WriteStream();
const capacitor2Stream1 = capacitor2.createReadStream("capacitor2Stream1");
const capacitor2Stream2 = capacitor2.createReadStream("capacitor2Stream2");
const capacitor2ReadStream1Destroyed = new Promise(resolve =>
capacitor2Stream1.on("close", resolve)
);
const capacitor2Destroyed = new Promise(resolve =>
capacitor2.on("close", resolve)
);
capacitor2Stream1.destroy();
await capacitor2ReadStream1Destroyed;
await t.test("propagates errors to attached read streams", async t => {
capacitor2.destroy();
await new Promise(resolve => setImmediate(resolve));
t.strictSame(
capacitor2Stream2.destroyed,
false,
"should not immediately mark attached read streams as destroyed"
);
capacitor2.destroy(new Error("test"));
await capacitor2Destroyed;
t.type(capacitor2.error, Error, "should store an error on capacitor");
t.strictSame(
capacitor2.destroyed,
true,
"should mark capacitor as destroyed"
);
t.type(
capacitor2Stream2.error,
Error,
"should store an error on attached read streams"
);
t.strictSame(
capacitor2Stream2.destroyed,
true,
"should mark attached read streams as destroyed"
);
t.strictSame(
capacitor2Stream1.error,
null,
"should not store an error on detached read streams"
);
});
});
withChunkSize(10);
withChunkSize(100000);