diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index c2e8abc4efdadd..20d57ffe84c232 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -45,6 +45,7 @@ const kIsPerformingIO = Symbol('kIsPerformingIO'); const kFs = Symbol('kFs'); const kHandle = Symbol('kHandle'); +const kHandleCleanup = Symbol('kHandleCleanup'); function _construct(callback) { const stream = this; @@ -152,7 +153,12 @@ function importFd(stream, options) { stream[kHandle] = options.fd; stream[kFs] = FileHandleOperations(stream[kHandle]); stream[kHandle][kRef](); - options.fd.on('close', FunctionPrototypeBind(stream.close, stream)); + if (options.autoClose === false) { + stream[kHandleCleanup] = FunctionPrototypeBind(cleanupFileHandleRef, stream); + stream.once('end', stream[kHandleCleanup]); + stream.once('finish', stream[kHandleCleanup]); + stream.once('error', stream[kHandleCleanup]); + } return options.fd.fd; } @@ -160,6 +166,18 @@ function importFd(stream, options) { ['number', 'FileHandle'], options.fd); } +function cleanupFileHandleRef() { + if (this[kHandleCleanup] === undefined) { + return; + } + + this[kHandle][kUnref](); + this.removeListener('end', this[kHandleCleanup]); + this.removeListener('finish', this[kHandleCleanup]); + this.removeListener('error', this[kHandleCleanup]); + this[kHandleCleanup] = undefined; +} + function ReadStream(path, options) { if (!(this instanceof ReadStream)) return new ReadStream(path, options); diff --git a/test/parallel/test-fs-promises-file-handle-stream.js b/test/parallel/test-fs-promises-file-handle-stream.js index 71f312b6f9d78c..c4f2ae0c9812a6 100644 --- a/test/parallel/test-fs-promises-file-handle-stream.js +++ b/test/parallel/test-fs-promises-file-handle-stream.js @@ -42,7 +42,87 @@ async function validateRead() { ); } +async function validateReadStreamReleasesFileHandleCloseListener() { + const filePathForHandle = path.resolve(tmpDir, 'tmp-read-listener.txt'); + const buf = Buffer.from('Hello world', 'utf8'); + + fs.writeFileSync(filePathForHandle, buf); + + const fileHandle = await open(filePathForHandle); + + for (let i = 0; i < buf.length; i++) { + await buffer(fileHandle.createReadStream({ + start: i, + end: i, + autoClose: false, + })); + + assert.strictEqual(fileHandle.listenerCount('close'), 0); + } + + await fileHandle.close(); +} + +async function validateWriteStreamReleasesFileHandleCloseListener() { + const filePathForHandle = path.resolve(tmpDir, 'tmp-write-listener.txt'); + const buf = Buffer.from('Hello world', 'utf8'); + + const fileHandle = await open(filePathForHandle, 'w'); + + for (let i = 0; i < buf.length; i++) { + const stream = fileHandle.createWriteStream({ + start: i, + autoClose: false, + }); + stream.end(buf.subarray(i, i + 1)); + await finished(stream); + + assert.strictEqual(fileHandle.listenerCount('close'), 0); + } + + await fileHandle.close(); + assert.deepStrictEqual(fs.readFileSync(filePathForHandle), buf); +} + +async function validateReadStreamAutoCloseClosesFileHandle() { + const filePathForHandle = path.resolve(tmpDir, 'tmp-read-auto-close.txt'); + const buf = Buffer.from('Hello world', 'utf8'); + + fs.writeFileSync(filePathForHandle, buf); + + const fileHandle = await open(filePathForHandle); + const closed = new Promise((resolve) => { + fileHandle.once('close', common.mustCall(resolve)); + }); + + assert.deepStrictEqual(await buffer(fileHandle.createReadStream()), buf); + await closed; + assert.strictEqual(fileHandle.listenerCount('close'), 0); +} + +async function validateWriteStreamAutoCloseClosesFileHandle() { + const filePathForHandle = path.resolve(tmpDir, 'tmp-write-auto-close.txt'); + const buf = Buffer.from('Hello world', 'utf8'); + + const fileHandle = await open(filePathForHandle, 'w'); + const closed = new Promise((resolve) => { + fileHandle.once('close', common.mustCall(resolve)); + }); + const stream = fileHandle.createWriteStream(); + + stream.end(buf); + await finished(stream); + await closed; + + assert.strictEqual(fileHandle.listenerCount('close'), 0); + assert.deepStrictEqual(fs.readFileSync(filePathForHandle), buf); +} + Promise.all([ validateWrite(), validateRead(), + validateReadStreamReleasesFileHandleCloseListener(), + validateWriteStreamReleasesFileHandleCloseListener(), + validateReadStreamAutoCloseClosesFileHandle(), + validateWriteStreamAutoCloseClosesFileHandle(), ]).then(common.mustCall());