cheat sheet
node:stream
Node.js streams — Readable, Writable, Duplex, and Transform, the modern pipeline() API, backpressure, async iterators, Web Streams interop, and patterns for piping files, HTTP bodies, and gzip compression.
node:stream — Streams
What it is
A Node.js stream is an abstraction for processing data in chunks rather than loading it all at once. Streams let you read a 10 GB file, gzip it, and upload it to object storage using a fixed amount of memory, because data flows through the pipeline one buffer at a time. Streams pre-date Promises by about a decade and historically came with sharp edges, but the modern API — stream/promises.pipeline(), async iterators, and Web Streams interop — makes them genuinely pleasant.
Install
node:stream is built into Node.js. The promises sub-module (node:stream/promises) shipped stable in Node 15.
node --version
Output:
v22.14.0
Stream types
Four base classes; each solves a different problem. Almost every concrete stream (a file reader, an HTTP response, a zlib compressor) extends one of these.
| Class | Direction | Example |
|---|---|---|
Readable | Out → consumer | fs.createReadStream, process.stdin, http.IncomingMessage |
Writable | Consumer → out | fs.createWriteStream, process.stdout, http.ServerResponse |
Duplex | Both, independent | TCP socket, crypto.createCipheriv |
Transform | Duplex; output is a function of input | zlib.createGzip, crypto.createHash |
import { Readable, Writable, Transform } from 'node:stream';
const r = new Readable({ read() {} }); // Readable
const w = new Writable({ write(chunk, enc, cb) { cb(); } }); // Writable
const t = new Transform({ transform(chunk, enc, cb) { cb(null, chunk); } }); // Transform
console.log(r instanceof Readable, w instanceof Writable, t instanceof Transform);
Output:
true true true
Reading: the modern way
For a Readable, async iteration is the simplest, most modern way to consume chunks. The for await loop handles backpressure and end-of-stream automatically — no event handlers required.
import { createReadStream } from 'node:fs';
const stream = createReadStream('large.txt', { encoding: 'utf8', highWaterMark: 64 * 1024 });
let lineCount = 0;
let remainder = '';
for await (const chunk of stream) {
const lines = (remainder + chunk).split('\n');
remainder = lines.pop();
lineCount += lines.length;
}
if (remainder.length) lineCount++;
console.log(`${lineCount} lines`);
Output:
1048576 lines
The highWaterMark option sets the internal buffer size — 64 KiB by default for byte streams, 16 objects for object-mode streams. Raising it trades memory for fewer round-trips through the event loop.
Writing chunks
A Writable accepts data via write() and signals end with end(). write() returns false when the internal buffer is full — that's the backpressure signal. Either respect it manually or let pipeline() handle it for you.
import { createWriteStream } from 'node:fs';
const out = createWriteStream('out.txt');
for (let i = 0; i < 5; i++) {
const ok = out.write(`line ${i}\n`);
if (!ok) {
// Internal buffer is full — wait for 'drain' before writing more
await new Promise((resolve) => out.once('drain', resolve));
}
}
out.end();
await new Promise((resolve) => out.on('finish', resolve));
console.log('written');
Output:
written
The pattern above is verbose because it's manual. The next section covers the much shorter pipeline form.
pipeline() — the right way to compose streams
pipeline() from node:stream/promises wires multiple streams together end-to-end. It propagates errors, handles backpressure, and resolves a Promise when everything has finished — replacing the historical pipe() + manual error handler dance.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('book.txt'),
createGzip(),
createWriteStream('book.txt.gz')
);
console.log('gzipped');
Output:
gzipped
Why this beats .pipe() chains:
| Concern | pipeline() | .pipe() |
|---|---|---|
| Errors propagate | Yes — single await/try block | No — must attach error listener to every stream |
| Resource cleanup on error | Automatic | Manual (destroy() each stream) |
| Returns a Promise | Yes | No |
| Backpressure | Yes | Yes |
The rule: never use .pipe() for new code. It silently leaks resources on error.
Backpressure — why this matters
Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. If a Readable produces 100 MB/s and a Writable accepts 10 MB/s, something has to throttle — otherwise memory grows unbounded.
Streams handle backpressure automatically when you use pipeline() or pipe(). When the Writable's buffer fills, it returns false from write(); the Readable pauses; the Writable emits drain when it has room; the Readable resumes. Manually plumbing this is error-prone, which is exactly why pipeline() exists.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
// Source produces faster than dest can write — backpressure keeps memory bounded
await pipeline(
createReadStream('/dev/urandom', { highWaterMark: 1024 * 1024 }),
createWriteStream('random.bin', { highWaterMark: 64 * 1024 })
);
Output: (none — exits 0 on success)
Readable.from() — turn anything into a stream
Readable.from() adapts any iterable, async iterable, or generator into a Readable. It's the bridge between "I have an array of data" and "I want to push it through a streaming pipeline".
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createWriteStream } from 'node:fs';
async function* lines() {
for (let i = 0; i < 1000; i++) {
yield `record ${i}\n`;
}
}
await pipeline(Readable.from(lines()), createWriteStream('records.txt'));
console.log('done');
Output:
done
Works with synchronous iterables too:
import { Readable } from 'node:stream';
const stream = Readable.from(['hello\n', 'world\n']);
for await (const chunk of stream) process.stdout.write(chunk);
Output:
hello
world
Writing a custom Transform
A Transform consumes input chunks and produces output chunks. Define transform(chunk, encoding, callback) to process each chunk, calling callback(error, output) to emit downstream. Use it for streaming line splitting, encryption, parsing, or filtering.
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
class Uppercase extends Transform {
_transform(chunk, encoding, callback) {
callback(null, chunk.toString('utf8').toUpperCase());
}
}
await pipeline(
createReadStream('input.txt'),
new Uppercase(),
createWriteStream('output.txt')
);
console.log('uppercased');
Output:
uppercased
For Transforms that consume entire lines instead of arbitrary byte chunks, buffer a remainder between calls:
import { Transform } from 'node:stream';
class LineFilter extends Transform {
constructor(predicate) {
super();
this.predicate = predicate;
this.remainder = '';
}
_transform(chunk, encoding, callback) {
const lines = (this.remainder + chunk.toString('utf8')).split('\n');
this.remainder = lines.pop();
const kept = lines.filter(this.predicate).join('\n');
callback(null, kept ? kept + '\n' : '');
}
_flush(callback) {
if (this.predicate(this.remainder)) callback(null, this.remainder);
else callback();
}
}
Output: (none — exits 0 on success)
Object mode
By default, streams carry Buffer or string chunks. Setting objectMode: true lets a stream emit arbitrary JavaScript objects — useful for pipelines that parse records (CSV rows, JSON lines) and pass them downstream as structured data.
import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
const source = Readable.from([{ id: 1 }, { id: 2 }, { id: 3 }]);
const doubler = new Transform({
objectMode: true,
transform(obj, enc, cb) {
cb(null, { id: obj.id * 2 });
},
});
const sink = new Transform({
objectMode: true,
transform(obj, enc, cb) {
console.log(obj);
cb();
},
});
await pipeline(source, doubler, sink);
Output:
{ id: 2 }
{ id: 4 }
{ id: 6 }
Readable.from() returns an object-mode stream by default when given an iterable of non-Buffer values.
HTTP request and response bodies
In Node's http/https modules, IncomingMessage is a Readable and ServerResponse is a Writable — so streaming a file response, or piping a request body to disk, is just pipeline():
import { createServer } from 'node:http';
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
const server = createServer(async (req, res) => {
res.setHeader('Content-Type', 'application/octet-stream');
try {
await pipeline(createReadStream('./big.bin'), res);
} catch (err) {
console.error('stream failed', err.message);
}
});
server.listen(3000, () => console.log('listening on 3000'));
Output:
listening on 3000
Uploading a request body to disk is the mirror image:
import { createServer } from 'node:http';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
createServer(async (req, res) => {
if (req.method !== 'POST') return res.end();
await pipeline(req, createWriteStream(`./uploads/${Date.now()}.bin`));
res.end('saved');
}).listen(3000);
Output: (none — exits 0 on success)
Web Streams interop
Modern browsers and fetch use the Web Streams standard (ReadableStream, WritableStream, TransformStream), which is similar to but not identical with Node streams. Node provides converters in both directions.
import { Readable, Writable } from 'node:stream';
// Node Readable → Web ReadableStream
const nodeReadable = Readable.from(['hello ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);
// Web ReadableStream → Node Readable
const webStream = new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.close();
},
});
const nodeStream = Readable.fromWeb(webStream);
for await (const chunk of nodeStream) console.log(chunk);
Output:
a
b
A common reason to convert: fetch().body is a Web ReadableStream, but most Node sinks expect a Node Readable. Wrap it once with Readable.fromWeb() and pipe it as usual:
import { Readable } from 'node:stream';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
const response = await fetch('https://example.com/large.bin');
await pipeline(Readable.fromWeb(response.body), createWriteStream('downloaded.bin'));
console.log('downloaded');
Output:
downloaded
Comparison: classic events vs modern API
The old way — events and .pipe() — still works but is error-prone. Use it only when reading legacy code.
import { createReadStream } from 'node:fs';
const stream = createReadStream('file.txt', 'utf8');
// Old style — manual event handlers
stream.on('data', (chunk) => process.stdout.write(chunk));
stream.on('end', () => console.log('\n--EOF--'));
stream.on('error', (err) => console.error(err));
Output:
hello
world
--EOF--
Equivalent modern code:
import { createReadStream } from 'node:fs';
try {
for await (const chunk of createReadStream('file.txt', 'utf8')) {
process.stdout.write(chunk);
}
console.log('\n--EOF--');
} catch (err) {
console.error(err);
}
Output:
hello
world
--EOF--
Common pitfalls
- Using
.pipe()instead ofpipeline()—.pipe()silently leaks file descriptors and event listeners on error. Always preferpipeline()fromnode:stream/promises. - Forgetting to
await pipeline()— Without the await, the function returns immediately and the caller'sfinallycleanup may run before the streams have actually finished. - Ignoring backpressure in manual
write()loops — A loop that callsout.write(...)thousands of times without checking the return value buffers everything in RAM. Listen fordrainor usepipeline. - Reading large files with
readFile—readFileloads the whole file. For anything over a few MB, switch tocreateReadStream. - Mixing async iteration with event listeners — Once you start iterating with
for await, do not also attach'data'handlers. The iterator consumes the chunks; the events fire on a different code path and you'll see ghost behaviour. - Object-mode streams crossing boundaries — A binary-mode Writable downstream of an object-mode Readable throws on the first object chunk. Match modes throughout the pipeline.
- Confusing Web and Node streams —
pipeThroughworks on Web streams;pipe/pipelineworks on Node streams. Use the converters at the boundary. - Not destroying on error — A Readable still consuming a network socket leaks if you abandon it without
.destroy().pipeline()handles this automatically. - Encoding surprises —
createReadStreamwithout an encoding yieldsBufferchunks. Comparing a Buffer to a string with===always fails; either setencoding: 'utf8'or call.toString('utf8')per chunk. - Using sync zlib on big files —
zlib.gzipSync()blocks the event loop for the duration. Always stream it throughcreateGzip()in a pipeline.
Real-world recipes
Read, gzip, and upload to S3
Compose a four-stage pipeline: read from disk → gzip → upload. Memory stays bounded regardless of file size because each chunk is consumed downstream before the next is produced.
import { createReadStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3 = new S3Client({ region: 'us-east-1' });
const gzip = createGzip();
const source = createReadStream('./big-export.csv');
const upload = new Upload({
client: s3,
params: {
Bucket: 'my-backups',
Key: `exports/${Date.now()}.csv.gz`,
Body: gzip,
ContentType: 'application/gzip',
},
});
await pipeline(source, gzip);
await upload.done();
console.log('uploaded');
Output:
uploaded
Line-by-line processing of a huge file
Build a line splitter as a Transform; pipe a file through it; process each line. Memory use is bounded by the longest line, not the file size.
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
const rl = createInterface({
input: createReadStream('access.log'),
crlfDelay: Infinity,
});
let errors = 0;
for await (const line of rl) {
if (line.includes(' 500 ') || line.includes(' 502 ')) errors++;
}
console.log(`${errors} server errors`);
Output:
247 server errors
readline.createInterface wraps any line-oriented Readable; it's the standard pattern for log scanners.
Compute a file's SHA-256 without loading it
crypto.createHash is a Transform — feed bytes in, read the digest at the end. Pipe a file through it and grab the result.
import { createReadStream } from 'node:fs';
import { createHash } from 'node:crypto';
import { pipeline } from 'node:stream/promises';
const hash = createHash('sha256');
await pipeline(createReadStream('./image.iso'), hash);
console.log(hash.digest('hex'));
Output:
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
Concurrent transform with bounded parallelism
The default Transform processes one chunk at a time. For CPU-bound work like image conversion, you can lift concurrency by buffering work in the callback — but be careful not to lose backpressure.
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Readable } from 'node:stream';
function concurrentMap(concurrency, fn) {
let active = 0;
return new Transform({
objectMode: true,
transform(item, enc, cb) {
active++;
Promise.resolve(fn(item))
.then((result) => {
this.push(result);
active--;
})
.catch((err) => cb(err));
if (active < concurrency) cb();
else this._cb = cb;
},
flush(cb) {
const check = () => (active === 0 ? cb() : setImmediate(check));
check();
},
});
}
const items = Array.from({ length: 6 }, (_, i) => ({ id: i }));
const sink = new Transform({
objectMode: true,
transform(item, enc, cb) {
console.log('done', item.id);
cb();
},
});
await pipeline(
Readable.from(items),
concurrentMap(3, async (item) => {
await new Promise((r) => setTimeout(r, 100));
return item;
}),
sink
);
Output:
done 0
done 1
done 2
done 3
done 4
done 5
Forward an upload to another service
Take an incoming HTTP body (a Readable) and proxy it to another service via fetch (which accepts a Readable as body). No temp file involved.
import { createServer } from 'node:http';
import { Readable } from 'node:stream';
createServer(async (req, res) => {
const upstream = await fetch('https://storage.example.com/upload', {
method: 'POST',
body: Readable.toWeb(req),
duplex: 'half',
});
res.writeHead(upstream.status, { 'Content-Type': 'application/json' });
await Readable.fromWeb(upstream.body).pipe(res);
}).listen(3000);
Output: (none — exits 0 on success)
Split a CSV into per-key files
Stream-parse a CSV; route each row to a different Writable based on a column value. Useful for sharding a giant export by region, tenant, or month.
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { parse } from 'csv-parse';
const sinks = new Map();
const router = new Transform({
objectMode: true,
transform(row, enc, cb) {
const key = row.region;
let sink = sinks.get(key);
if (!sink) {
sink = createWriteStream(`./out/${key}.csv`);
sinks.set(key, sink);
}
sink.write(`${row.id},${row.amount}\n`);
cb();
},
flush(cb) {
for (const sink of sinks.values()) sink.end();
cb();
},
});
await pipeline(
createReadStream('./sales.csv'),
parse({ columns: true }),
router
);
console.log(`split into ${sinks.size} files`);
Output:
split into 12 files
Tee — duplicate a stream to two destinations
There's no single built-in tee for Node streams, but two PassThroughs can pipe the same source to two sinks. Useful for writing to disk and uploading at the same time.
import { createReadStream, createWriteStream } from 'node:fs';
import { PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
const source = createReadStream('./report.csv');
const a = new PassThrough();
const b = new PassThrough();
source.pipe(a);
source.pipe(b);
await Promise.all([
pipeline(a, createWriteStream('./report.local.csv')),
pipeline(b, createWriteStream('./report.backup.csv')),
]);
console.log('tee complete');
Output:
tee complete