Skip to content
Node.js nd streams 4 min read

stream.pipeline() & Error Handling

Chaining streams with .pipe() looks tidy, but it hides a dangerous flaw: if any stream in the chain errors, the others are not destroyed and your process can leak file descriptors, sockets, or memory. stream.pipeline() fixes this by wiring streams together with full error forwarding and automatic cleanup, and stream.finished() lets you observe when a single stream is truly done. For any production pipeline that handles real I/O, these are the APIs you should reach for.

Why pipe() is not enough

source.pipe(dest) only forwards data and the end event. It does not forward errors, and it does not destroy the source if the destination fails (or vice versa). That means a half-finished transfer can leave open handles dangling, and you must manually attach an 'error' listener to every stream to avoid an unhandled exception crashing the process.

import { createReadStream, createWriteStream } from 'node:fs';

// Fragile: an error on either stream is not propagated or cleaned up
createReadStream('input.txt')
  .pipe(createWriteStream('output.txt'));
// If the write fails mid-stream, the read stream is never destroyed.

stream.pipeline() solves all of this in one call.

stream.pipeline() — the callback version

pipeline(source, ...transforms, destination, callback) connects each stream to the next, propagates errors, and destroys every stream when the chain finishes or fails. The final callback receives an error (or null on success).

import { pipeline } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

pipeline(
  createReadStream('input.txt'),
  createGzip(),
  createWriteStream('input.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err.message);
      process.exitCode = 1;
    } else {
      console.log('Pipeline succeeded.');
    }
  },
);

Output:

Pipeline succeeded.

If the source file does not exist, the same callback handles it and every stream is already destroyed for you:

Output:

Pipeline failed: ENOENT: no such file or directory, open 'input.txt'

The promises version

The node:stream/promises module exposes a promise-returning pipeline(). This is the idiomatic choice for async/await code: the promise rejects on error, so a single try/catch covers the whole chain.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

async function compress(src, dest) {
  try {
    await pipeline(
      createReadStream(src),
      createGzip(),
      createWriteStream(dest),
    );
    console.log(`Compressed ${src} -> ${dest}`);
  } catch (err) {
    console.error('Compression failed:', err.message);
  }
}

await compress('input.txt', 'input.txt.gz');

In CommonJS, use const { pipeline } = require('node:stream/promises');. The API is identical; only the import syntax differs.

Cancelling with an AbortSignal

The promises pipeline() accepts an options object with a signal. Aborting destroys every stream and rejects with an AbortError — ideal for request timeouts.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

const ac = new AbortController();
const timer = setTimeout(() => ac.abort(), 5000);

try {
  await pipeline(
    createReadStream('huge.bin'),
    createWriteStream('copy.bin'),
    { signal: ac.signal },
  );
} catch (err) {
  if (err.name === 'AbortError') console.error('Timed out, streams cleaned up.');
  else throw err;
} finally {
  clearTimeout(timer);
}

Automatic cleanup on error

The key guarantee of pipeline() is that when any stream emits 'error', or the chain is aborted, all participating streams are destroyed via stream.destroy(). There are no dangling file descriptors and no unhandled error events. You handle failure in exactly one place — the callback or the catch block.

Behaviour.pipe()stream.pipeline()
Forwards dataYesYes
Forwards errorsNoYes
Destroys all streams on failureNoYes
Single completion handlerNoYes (callback / promise)
Supports AbortSignalNoYes (promises version)
Returns the destination streamYesThe last stream / a promise

stream.finished() — observing one stream

Sometimes you do not need to connect streams, only to know when a single stream has fully ended, errored, or closed. stream.finished() registers a one-shot completion handler and is also available in a promise form.

import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('input.txt');
rs.resume(); // drain the data so the stream can end

try {
  await finished(rs);
  console.log('Read stream finished cleanly.');
} catch (err) {
  console.error('Read stream failed:', err.message);
}

Output:

Read stream finished cleanly.

Both pipeline() and finished() already call destroy() for you. Do not manually destroy streams inside their callbacks — doing so can mask the real error or trigger a double-destroy.

Best Practices

  • Prefer stream.pipeline() over .pipe() for any chain that touches real I/O — it is the only way to get reliable error propagation and cleanup.
  • Use the node:stream/promises version with async/await so a single try/catch covers the entire chain.
  • Always handle the error path: log it, set process.exitCode, or surface it to the caller. Never leave a pipeline error silently swallowed.
  • Pass an AbortSignal to enforce timeouts or cancel transfers; the streams are destroyed automatically on abort.
  • Reach for stream.finished() when you only need to know a single stream is done, not to connect a chain.
  • Let pipeline()/finished() own stream destruction — avoid calling destroy() yourself inside their handlers.
Last updated June 14, 2026
Was this helpful?