stream.pipeline() & Error Handling
Chaining streams with .pipe() looks tidy, but it hides a dangerous flaw: if any stream in the chain errors, the others are not destroyed and your process can leak file descriptors, sockets, or memory. stream.pipeline() fixes this by wiring streams together with full error forwarding and automatic cleanup, and stream.finished() lets you observe when a single stream is truly done. For any production pipeline that handles real I/O, these are the APIs you should reach for.
Why pipe() is not enough
source.pipe(dest) only forwards data and the end event. It does not forward errors, and it does not destroy the source if the destination fails (or vice versa). That means a half-finished transfer can leave open handles dangling, and you must manually attach an 'error' listener to every stream to avoid an unhandled exception crashing the process.
import { createReadStream, createWriteStream } from 'node:fs';
// Fragile: an error on either stream is not propagated or cleaned up
createReadStream('input.txt')
.pipe(createWriteStream('output.txt'));
// If the write fails mid-stream, the read stream is never destroyed.
stream.pipeline() solves all of this in one call.
stream.pipeline() — the callback version
pipeline(source, ...transforms, destination, callback) connects each stream to the next, propagates errors, and destroys every stream when the chain finishes or fails. The final callback receives an error (or null on success).
import { pipeline } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
pipeline(
createReadStream('input.txt'),
createGzip(),
createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err.message);
process.exitCode = 1;
} else {
console.log('Pipeline succeeded.');
}
},
);
Output:
Pipeline succeeded.
If the source file does not exist, the same callback handles it and every stream is already destroyed for you:
Output:
Pipeline failed: ENOENT: no such file or directory, open 'input.txt'
The promises version
The node:stream/promises module exposes a promise-returning pipeline(). This is the idiomatic choice for async/await code: the promise rejects on error, so a single try/catch covers the whole chain.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
async function compress(src, dest) {
try {
await pipeline(
createReadStream(src),
createGzip(),
createWriteStream(dest),
);
console.log(`Compressed ${src} -> ${dest}`);
} catch (err) {
console.error('Compression failed:', err.message);
}
}
await compress('input.txt', 'input.txt.gz');
In CommonJS, use
const { pipeline } = require('node:stream/promises');. The API is identical; only the import syntax differs.
Cancelling with an AbortSignal
The promises pipeline() accepts an options object with a signal. Aborting destroys every stream and rejects with an AbortError — ideal for request timeouts.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
const ac = new AbortController();
const timer = setTimeout(() => ac.abort(), 5000);
try {
await pipeline(
createReadStream('huge.bin'),
createWriteStream('copy.bin'),
{ signal: ac.signal },
);
} catch (err) {
if (err.name === 'AbortError') console.error('Timed out, streams cleaned up.');
else throw err;
} finally {
clearTimeout(timer);
}
Automatic cleanup on error
The key guarantee of pipeline() is that when any stream emits 'error', or the chain is aborted, all participating streams are destroyed via stream.destroy(). There are no dangling file descriptors and no unhandled error events. You handle failure in exactly one place — the callback or the catch block.
| Behaviour | .pipe() | stream.pipeline() |
|---|---|---|
| Forwards data | Yes | Yes |
| Forwards errors | No | Yes |
| Destroys all streams on failure | No | Yes |
| Single completion handler | No | Yes (callback / promise) |
| Supports AbortSignal | No | Yes (promises version) |
| Returns the destination stream | Yes | The last stream / a promise |
stream.finished() — observing one stream
Sometimes you do not need to connect streams, only to know when a single stream has fully ended, errored, or closed. stream.finished() registers a one-shot completion handler and is also available in a promise form.
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('input.txt');
rs.resume(); // drain the data so the stream can end
try {
await finished(rs);
console.log('Read stream finished cleanly.');
} catch (err) {
console.error('Read stream failed:', err.message);
}
Output:
Read stream finished cleanly.
Both
pipeline()andfinished()already calldestroy()for you. Do not manually destroy streams inside their callbacks — doing so can mask the real error or trigger a double-destroy.
Best Practices
- Prefer
stream.pipeline()over.pipe()for any chain that touches real I/O — it is the only way to get reliable error propagation and cleanup. - Use the
node:stream/promisesversion withasync/awaitso a singletry/catchcovers the entire chain. - Always handle the error path: log it, set
process.exitCode, or surface it to the caller. Never leave a pipeline error silently swallowed. - Pass an
AbortSignalto enforce timeouts or cancel transfers; the streams are destroyed automatically on abort. - Reach for
stream.finished()when you only need to know a single stream is done, not to connect a chain. - Let
pipeline()/finished()own stream destruction — avoid callingdestroy()yourself inside their handlers.