cheat sheet

node:stream

Node.js streams — Readable, Writable, Duplex, and Transform, the modern pipeline() API, backpressure, async iterators, Web Streams interop, and patterns for piping files, HTTP bodies, and gzip compression.

node:stream — Streams

What it is

A Node.js stream is an abstraction for processing data in chunks rather than loading it all at once. Streams let you read a 10 GB file, gzip it, and upload it to object storage using a fixed amount of memory, because data flows through the pipeline one buffer at a time. Streams pre-date Promises by about a decade and historically came with sharp edges, but the modern API — stream/promises.pipeline(), async iterators, and Web Streams interop — makes them genuinely pleasant.

Install

node:stream is built into Node.js. The promises sub-module (node:stream/promises) shipped stable in Node 15.

bash
node --version

Output:

text
v22.14.0

Stream types

Four base classes; each solves a different problem. Almost every concrete stream (a file reader, an HTTP response, a zlib compressor) extends one of these.

ClassDirectionExample
ReadableOut → consumerfs.createReadStream, process.stdin, http.IncomingMessage
WritableConsumer → outfs.createWriteStream, process.stdout, http.ServerResponse
DuplexBoth, independentTCP socket, crypto.createCipheriv
TransformDuplex; output is a function of inputzlib.createGzip, crypto.createHash
javascript
import { Readable, Writable, Transform } from 'node:stream';

const r = new Readable({ read() {} });   // Readable
const w = new Writable({ write(chunk, enc, cb) { cb(); } });  // Writable
const t = new Transform({ transform(chunk, enc, cb) { cb(null, chunk); } });  // Transform

console.log(r instanceof Readable, w instanceof Writable, t instanceof Transform);

Output:

text
true true true

Reading: the modern way

For a Readable, async iteration is the simplest, most modern way to consume chunks. The for await loop handles backpressure and end-of-stream automatically — no event handlers required.

javascript
import { createReadStream } from 'node:fs';

const stream = createReadStream('large.txt', { encoding: 'utf8', highWaterMark: 64 * 1024 });
let lineCount = 0;
let remainder = '';

for await (const chunk of stream) {
  const lines = (remainder + chunk).split('\n');
  remainder = lines.pop();
  lineCount += lines.length;
}
if (remainder.length) lineCount++;

console.log(`${lineCount} lines`);

Output:

text
1048576 lines

The highWaterMark option sets the internal buffer size — 64 KiB by default for byte streams, 16 objects for object-mode streams. Raising it trades memory for fewer round-trips through the event loop.

Writing chunks

A Writable accepts data via write() and signals end with end(). write() returns false when the internal buffer is full — that's the backpressure signal. Either respect it manually or let pipeline() handle it for you.

javascript
import { createWriteStream } from 'node:fs';

const out = createWriteStream('out.txt');

for (let i = 0; i < 5; i++) {
  const ok = out.write(`line ${i}\n`);
  if (!ok) {
    // Internal buffer is full — wait for 'drain' before writing more
    await new Promise((resolve) => out.once('drain', resolve));
  }
}

out.end();
await new Promise((resolve) => out.on('finish', resolve));
console.log('written');

Output:

text
written

The pattern above is verbose because it's manual. The next section covers the much shorter pipeline form.

pipeline() — the right way to compose streams

pipeline() from node:stream/promises wires multiple streams together end-to-end. It propagates errors, handles backpressure, and resolves a Promise when everything has finished — replacing the historical pipe() + manual error handler dance.

javascript
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('book.txt'),
  createGzip(),
  createWriteStream('book.txt.gz')
);

console.log('gzipped');

Output:

text
gzipped

Why this beats .pipe() chains:

Concernpipeline().pipe()
Errors propagateYes — single await/try blockNo — must attach error listener to every stream
Resource cleanup on errorAutomaticManual (destroy() each stream)
Returns a PromiseYesNo
BackpressureYesYes

The rule: never use .pipe() for new code. It silently leaks resources on error.

Backpressure — why this matters

Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. If a Readable produces 100 MB/s and a Writable accepts 10 MB/s, something has to throttle — otherwise memory grows unbounded.

Streams handle backpressure automatically when you use pipeline() or pipe(). When the Writable's buffer fills, it returns false from write(); the Readable pauses; the Writable emits drain when it has room; the Readable resumes. Manually plumbing this is error-prone, which is exactly why pipeline() exists.

javascript
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

// Source produces faster than dest can write — backpressure keeps memory bounded
await pipeline(
  createReadStream('/dev/urandom', { highWaterMark: 1024 * 1024 }),
  createWriteStream('random.bin', { highWaterMark: 64 * 1024 })
);

Output: (none — exits 0 on success)

Readable.from() — turn anything into a stream

Readable.from() adapts any iterable, async iterable, or generator into a Readable. It's the bridge between "I have an array of data" and "I want to push it through a streaming pipeline".

javascript
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';

async function* lines() {
  for (let i = 0; i < 1000; i++) {
    yield `record ${i}\n`;
  }
}

await pipeline(Readable.from(lines()), createWriteStream('records.txt'));
console.log('done');

Output:

text
done

Works with synchronous iterables too:

javascript
import { Readable } from 'node:stream';

const stream = Readable.from(['hello\n', 'world\n']);
for await (const chunk of stream) process.stdout.write(chunk);

Output:

text
hello
world

Writing a custom Transform

A Transform consumes input chunks and produces output chunks. Define transform(chunk, encoding, callback) to process each chunk, calling callback(error, output) to emit downstream. Use it for streaming line splitting, encryption, parsing, or filtering.

javascript
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

class Uppercase extends Transform {
  _transform(chunk, encoding, callback) {
    callback(null, chunk.toString('utf8').toUpperCase());
  }
}

await pipeline(
  createReadStream('input.txt'),
  new Uppercase(),
  createWriteStream('output.txt')
);

console.log('uppercased');

Output:

text
uppercased

For Transforms that consume entire lines instead of arbitrary byte chunks, buffer a remainder between calls:

javascript
import { Transform } from 'node:stream';

class LineFilter extends Transform {
  constructor(predicate) {
    super();
    this.predicate = predicate;
    this.remainder = '';
  }
  _transform(chunk, encoding, callback) {
    const lines = (this.remainder + chunk.toString('utf8')).split('\n');
    this.remainder = lines.pop();
    const kept = lines.filter(this.predicate).join('\n');
    callback(null, kept ? kept + '\n' : '');
  }
  _flush(callback) {
    if (this.predicate(this.remainder)) callback(null, this.remainder);
    else callback();
  }
}

Output: (none — exits 0 on success)

Object mode

By default, streams carry Buffer or string chunks. Setting objectMode: true lets a stream emit arbitrary JavaScript objects — useful for pipelines that parse records (CSV rows, JSON lines) and pass them downstream as structured data.

javascript
import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';

const source = Readable.from([{ id: 1 }, { id: 2 }, { id: 3 }]);

const doubler = new Transform({
  objectMode: true,
  transform(obj, enc, cb) {
    cb(null, { id: obj.id * 2 });
  },
});

const sink = new Transform({
  objectMode: true,
  transform(obj, enc, cb) {
    console.log(obj);
    cb();
  },
});

await pipeline(source, doubler, sink);

Output:

text
{ id: 2 }
{ id: 4 }
{ id: 6 }

Readable.from() returns an object-mode stream by default when given an iterable of non-Buffer values.

HTTP request and response bodies

In Node's http/https modules, IncomingMessage is a Readable and ServerResponse is a Writable — so streaming a file response, or piping a request body to disk, is just pipeline():

javascript
import { createServer } from 'node:http';
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

const server = createServer(async (req, res) => {
  res.setHeader('Content-Type', 'application/octet-stream');
  try {
    await pipeline(createReadStream('./big.bin'), res);
  } catch (err) {
    console.error('stream failed', err.message);
  }
});

server.listen(3000, () => console.log('listening on 3000'));

Output:

text
listening on 3000

Uploading a request body to disk is the mirror image:

javascript
import { createServer } from 'node:http';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

createServer(async (req, res) => {
  if (req.method !== 'POST') return res.end();
  await pipeline(req, createWriteStream(`./uploads/${Date.now()}.bin`));
  res.end('saved');
}).listen(3000);

Output: (none — exits 0 on success)

Web Streams interop

Modern browsers and fetch use the Web Streams standard (ReadableStream, WritableStream, TransformStream), which is similar to but not identical with Node streams. Node provides converters in both directions.

javascript
import { Readable, Writable } from 'node:stream';

// Node Readable → Web ReadableStream
const nodeReadable = Readable.from(['hello ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);

// Web ReadableStream → Node Readable
const webStream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
    controller.enqueue('b');
    controller.close();
  },
});
const nodeStream = Readable.fromWeb(webStream);
for await (const chunk of nodeStream) console.log(chunk);

Output:

text
a
b

A common reason to convert: fetch().body is a Web ReadableStream, but most Node sinks expect a Node Readable. Wrap it once with Readable.fromWeb() and pipe it as usual:

javascript
import { Readable } from 'node:stream';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

const response = await fetch('https://example.com/large.bin');
await pipeline(Readable.fromWeb(response.body), createWriteStream('downloaded.bin'));
console.log('downloaded');

Output:

text
downloaded

Comparison: classic events vs modern API

The old way — events and .pipe() — still works but is error-prone. Use it only when reading legacy code.

javascript
import { createReadStream } from 'node:fs';

const stream = createReadStream('file.txt', 'utf8');

// Old style — manual event handlers
stream.on('data', (chunk) => process.stdout.write(chunk));
stream.on('end', () => console.log('\n--EOF--'));
stream.on('error', (err) => console.error(err));

Output:

text
hello
world
--EOF--

Equivalent modern code:

javascript
import { createReadStream } from 'node:fs';

try {
  for await (const chunk of createReadStream('file.txt', 'utf8')) {
    process.stdout.write(chunk);
  }
  console.log('\n--EOF--');
} catch (err) {
  console.error(err);
}

Output:

text
hello
world
--EOF--

Common pitfalls

  1. Using .pipe() instead of pipeline().pipe() silently leaks file descriptors and event listeners on error. Always prefer pipeline() from node:stream/promises.
  2. Forgetting to await pipeline() — Without the await, the function returns immediately and the caller's finally cleanup may run before the streams have actually finished.
  3. Ignoring backpressure in manual write() loops — A loop that calls out.write(...) thousands of times without checking the return value buffers everything in RAM. Listen for drain or use pipeline.
  4. Reading large files with readFilereadFile loads the whole file. For anything over a few MB, switch to createReadStream.
  5. Mixing async iteration with event listeners — Once you start iterating with for await, do not also attach 'data' handlers. The iterator consumes the chunks; the events fire on a different code path and you'll see ghost behaviour.
  6. Object-mode streams crossing boundaries — A binary-mode Writable downstream of an object-mode Readable throws on the first object chunk. Match modes throughout the pipeline.
  7. Confusing Web and Node streamspipeThrough works on Web streams; pipe/pipeline works on Node streams. Use the converters at the boundary.
  8. Not destroying on error — A Readable still consuming a network socket leaks if you abandon it without .destroy(). pipeline() handles this automatically.
  9. Encoding surprisescreateReadStream without an encoding yields Buffer chunks. Comparing a Buffer to a string with === always fails; either set encoding: 'utf8' or call .toString('utf8') per chunk.
  10. Using sync zlib on big fileszlib.gzipSync() blocks the event loop for the duration. Always stream it through createGzip() in a pipeline.

Real-world recipes

Read, gzip, and upload to S3

Compose a four-stage pipeline: read from disk → gzip → upload. Memory stays bounded regardless of file size because each chunk is consumed downstream before the next is produced.

javascript
import { createReadStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';

const s3 = new S3Client({ region: 'us-east-1' });

const gzip = createGzip();
const source = createReadStream('./big-export.csv');

const upload = new Upload({
  client: s3,
  params: {
    Bucket: 'my-backups',
    Key: `exports/${Date.now()}.csv.gz`,
    Body: gzip,
    ContentType: 'application/gzip',
  },
});

await pipeline(source, gzip);
await upload.done();
console.log('uploaded');

Output:

text
uploaded

Line-by-line processing of a huge file

Build a line splitter as a Transform; pipe a file through it; process each line. Memory use is bounded by the longest line, not the file size.

javascript
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

const rl = createInterface({
  input: createReadStream('access.log'),
  crlfDelay: Infinity,
});

let errors = 0;
for await (const line of rl) {
  if (line.includes(' 500 ') || line.includes(' 502 ')) errors++;
}
console.log(`${errors} server errors`);

Output:

text
247 server errors

readline.createInterface wraps any line-oriented Readable; it's the standard pattern for log scanners.

Compute a file's SHA-256 without loading it

crypto.createHash is a Transform — feed bytes in, read the digest at the end. Pipe a file through it and grab the result.

javascript
import { createReadStream } from 'node:fs';
import { createHash } from 'node:crypto';
import { pipeline } from 'node:stream/promises';

const hash = createHash('sha256');
await pipeline(createReadStream('./image.iso'), hash);
console.log(hash.digest('hex'));

Output:

text
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855

Concurrent transform with bounded parallelism

The default Transform processes one chunk at a time. For CPU-bound work like image conversion, you can lift concurrency by buffering work in the callback — but be careful not to lose backpressure.

javascript
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Readable } from 'node:stream';

function concurrentMap(concurrency, fn) {
  let active = 0;
  return new Transform({
    objectMode: true,
    transform(item, enc, cb) {
      active++;
      Promise.resolve(fn(item))
        .then((result) => {
          this.push(result);
          active--;
        })
        .catch((err) => cb(err));
      if (active < concurrency) cb();
      else this._cb = cb;
    },
    flush(cb) {
      const check = () => (active === 0 ? cb() : setImmediate(check));
      check();
    },
  });
}

const items = Array.from({ length: 6 }, (_, i) => ({ id: i }));
const sink = new Transform({
  objectMode: true,
  transform(item, enc, cb) {
    console.log('done', item.id);
    cb();
  },
});

await pipeline(
  Readable.from(items),
  concurrentMap(3, async (item) => {
    await new Promise((r) => setTimeout(r, 100));
    return item;
  }),
  sink
);

Output:

text
done 0
done 1
done 2
done 3
done 4
done 5

Forward an upload to another service

Take an incoming HTTP body (a Readable) and proxy it to another service via fetch (which accepts a Readable as body). No temp file involved.

javascript
import { createServer } from 'node:http';
import { Readable } from 'node:stream';

createServer(async (req, res) => {
  const upstream = await fetch('https://storage.example.com/upload', {
    method: 'POST',
    body: Readable.toWeb(req),
    duplex: 'half',
  });
  res.writeHead(upstream.status, { 'Content-Type': 'application/json' });
  await Readable.fromWeb(upstream.body).pipe(res);
}).listen(3000);

Output: (none — exits 0 on success)

Split a CSV into per-key files

Stream-parse a CSV; route each row to a different Writable based on a column value. Useful for sharding a giant export by region, tenant, or month.

javascript
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { parse } from 'csv-parse';

const sinks = new Map();

const router = new Transform({
  objectMode: true,
  transform(row, enc, cb) {
    const key = row.region;
    let sink = sinks.get(key);
    if (!sink) {
      sink = createWriteStream(`./out/${key}.csv`);
      sinks.set(key, sink);
    }
    sink.write(`${row.id},${row.amount}\n`);
    cb();
  },
  flush(cb) {
    for (const sink of sinks.values()) sink.end();
    cb();
  },
});

await pipeline(
  createReadStream('./sales.csv'),
  parse({ columns: true }),
  router
);

console.log(`split into ${sinks.size} files`);

Output:

text
split into 12 files

Tee — duplicate a stream to two destinations

There's no single built-in tee for Node streams, but two PassThroughs can pipe the same source to two sinks. Useful for writing to disk and uploading at the same time.

javascript
import { createReadStream, createWriteStream } from 'node:fs';
import { PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';

const source = createReadStream('./report.csv');
const a = new PassThrough();
const b = new PassThrough();

source.pipe(a);
source.pipe(b);

await Promise.all([
  pipeline(a, createWriteStream('./report.local.csv')),
  pipeline(b, createWriteStream('./report.backup.csv')),
]);

console.log('tee complete');

Output:

text
tee complete