
JavaScript Streams: From Beginner to Expert
Streams in JavaScript are an elegant pattern for handling data, allowing us to process data as if we were dealing with a flowing stream—bit by bit, rather than loading everything all at once. Imagine the water in a stream, continuously flowing; this is the approach streams take, enabling efficient handling of videos, large files, and real-time data, even in resource-constrained environments.
Do you remember the last time you tried to upload a large video file in a browser? The waiting, the freezing, and even the crashes can be quite frustrating. This is where the limitations of the traditional "load everything at once" approach become apparent.
When dealing with large files, the traditional method loads the entire file into memory at once, like pouring a bucket of water into a small cup—overflow is inevitable.
I encountered this issue in a project where users were uploading large files, causing the server's memory usage to spike instantly, eventually leading to a complete service outage. In such scenarios, stream programming is clearly the most appropriate solution. Let's compare the traditional method with stream processing:
// Traditional method - Load the entire file into memory
fs.readFile('huge-video.mp4', (err, data) => {
if (err) throw err;
// At this point, the data variable contains the entire file content
processFile(data);
});
// Stream processing - Read the file in chunks
const readStream = fs.createReadStream('huge-video.mp4');
readStream.on('data', (chunk) => {
// Process a small portion of data at a time
processChunk(chunk);
});
After using stream programming, you'll notice that the server's memory usage drops significantly, and the freezing seems to never have occurred. This is the power of stream programming.
In fact, the strength of streams lies in "processing on demand"—you don't have to wait for all the data to start working. It's like sipping coffee while continuously refilling the cup, rather than waiting for a whole pot of coffee to brew before drinking. With streams, the memory footprint for a 1GB file might only be 64KB (default chunk size), a significant efficiency improvement compared to the traditional method's 1GB usage.
Even if the client has enough memory, processing large files all at once can cause noticeable performance issues. The main thread gets blocked, the user interface may freeze, and the overall responsiveness of the application is affected. Similarly, in a Node.js server environment, this blocking directly impacts the ability to handle other requests.
Moreover, modern applications often need to handle real-time data streams, such as live video streaming, real-time log analysis, or financial transaction data. Traditional data processing methods can no longer meet the demand for "processing as it is generated," leading to increased data processing delays and reduced real-time performance.
Therefore, what we need is a data processing mechanism that can quickly handle real-time data, has low memory costs, and is feature-rich. This mechanism is implemented in JavaScript as streams.
Note: Unless otherwise specified, the streams mentioned below refer to streams in the Node.js environment by default.
Streams provide a new paradigm for data processing, allowing us to split data into small chunks (chunks) and process them step by step, just like a continuous flow of water.
The best way to understand the concept of streams is through analogies from everyday life. Imagine water flowing through a pipe: water flows from the source (such as a reservoir) through the pipe system, eventually reaching the destination (such as a faucet). Throughout the process, the water is continuously flowing, not all delivered at once.
Technically, a stream is an abstract representation of an asynchronous sequence of data. It provides a standardized interface for handling continuously arriving data chunks. These data chunks can be:
- Binary data in files
- HTTP request bodies
- Terminal input
- Any segmentable continuous data
In Node.js, the stream
module provides the core API for stream processing. Almost all I/O operations are built on streams, from the file system to HTTP requests and responses, streams are everywhere.
const { Readable, Writable, Transform, Duplex } = require('stream');
Stream processing enables a "pipeline" operation of data, where producers and consumers can work in parallel. Data can be processed immediately as it is generated, greatly improving the system's responsiveness and real-time performance. This is particularly prominent in scenarios such as video transcoding and real-time data analysis.
Performance Comparison Example:
Processing Method | Memory Usage | Processing Delay | Applicable Scenarios |
---|---|---|---|
Traditional Method | ~1GB | High | Small Files |
Stream Processing | ~64KB(default chunk size) | Low | Large Files/Real-time Data |
The main types of streams in Node.js are (we will cover these in detail in the next chapter):
- Readable Streams
- Writable Streams
- Duplex Streams
- Transform Streams
Each type of stream is an instance of EventEmitter
, meaning they communicate through events. Common stream events include:
data
- Triggered when there is data available to readend
- Triggered when no more data is available to readerror
- Triggered when an error occursfinish
- Triggered when all data has been flushed to the underlying system
In the following chapters, we will delve into the various types of streams in JavaScript, their usage methods, and best practices, helping you master this powerful data processing tool.
Readable streams are the source of data, like a reservoir or water source. File reading, HTTP requests, and user input are typical scenarios for readable streams. To describe it professionally, a readable stream is a producer of data, representing a source of data. Its core characteristics are:
- Data can only be read from the stream, not written to it
- Supports two data consumption modes: flowing mode and paused mode
- Automatically handles the backpressure mechanism
- Data can be piped to writable streams
Typical Use Cases:
- Reading data from files
- Receiving HTTP request bodies
- Reading database query results
- Any scenario that requires sequential reading of data
Here are some common implementations of readable streams:
const fs = require("fs");
// 1. File Read Stream
const fileStream = fs.createReadStream("./data.txt");
// 2. HTTP Request Stream
const http = require("http");
http.createServer((req, res) => {
// req is a readable stream
req.on("data", (chunk) => {
console.log(`Received ${chunk.length} bytes of data`);
});
});
// 3. Custom Readable Stream
const { Readable } = require("stream");
const myReadable = new Readable({
read(size) {
// Custom data generation logic
this.push("some data");
this.push(null); // Indicates the end of data
},
});
Writable streams are the consumers of data, representing the destination of data. Its characteristics are:
- Data can only be written to the stream, not read from it
- Supports buffering to handle differences in write speeds
- Provides a drain event to handle backpressure
- Can receive data piped from readable streams
Typical Use Cases:
- Writing to files
- Sending HTTP responses
- Writing to databases
- Any scenario that requires sequential writing of data
Here are some common implementations of writable streams:
// 1. File Write Stream
const fileWriter = fs.createWriteStream("./output.txt");
// 2. HTTP Response Stream
http.createServer((req, res) => {
// res is a writable stream
res.write("Hello World");
res.end();
});
// 3. Custom Writable Stream
const { Writable } = require("stream");
const myWritable = new Writable({
write(chunk, encoding, callback) {
console.log("Writing data:", chunk.toString());
callback(); // Indicates the write is complete
},
});
Duplex streams are bidirectional streams, implementing both readable and writable interfaces, with both reading and writing capabilities. Its characteristics are:
- Can read and write
- The read and write ends are independent
- Commonly used in bidirectional communication scenarios
Here are some common implementations of duplex streams, such as TCP socket servers:
const net = require('net');
// Create a TCP server
const server = net.createServer((socket) => {
// socket is a duplex stream
socket.write('Welcome to the connection!\n');
socket.on('data', (data) => {
console.log('Received data from client:', data.toString());
socket.write('Server received your message\n');
});
});
server.listen(8080, () => {
console.log('Server started');
});
// Custom Duplex Stream
const { Duplex } = require('stream');
const myDuplex = new Duplex({
write(chunk, encoding, callback) {
console.log('Writing:', chunk.toString());
callback();
},
read(size) {
this.push('Data from duplex stream\n');
this.push(null);
}
});
Transform streams are a special type of duplex stream, specifically used for data transformation. Its characteristics are:
- Can read and write simultaneously
- Data written to the write end is transformed and appears at the read end
- Commonly used in data format conversion, encryption/decryption, etc.
There are many types of transform stream implementations, such as compression/decompression streams, encryption streams, etc.:
const { Transform } = require("stream");
const zlib = require("zlib");
// 1. Custom Transform Stream: Convert input to uppercase
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// Usage Example
process.stdin.pipe(new UpperCaseTransform()).pipe(process.stdout);
// 2. Gzip Compression Stream
fs.createReadStream("input.txt")
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream("input.txt.gz"));
// 3. Encryption Stream
const crypto = require("crypto");
const cipher = crypto.createCipher("aes192", "secret-key");
process.stdin.pipe(cipher).pipe(fs.createWriteStream("secret.enc"));
Summary of the Four Stream Types:
Stream Type | Readable | Writable | Characteristics | Typical Applications |
---|---|---|---|---|
Readable | ✓ | ✗ | Data source | File reading, HTTP requests |
Writable | ✗ | ✓ | Data destination | File writing, HTTP responses |
Duplex | ✓ | ✓ | Bidirectional | TCP sockets, bidirectional communication |
Transform | ✓ | ✓ | Data transformation | Compression/decompression, encryption/decryption |
Understanding these four basic stream types is the foundation of mastering Node.js stream programming. In practical applications, we often combine these basic stream types to build complex data processing pipelines.
data, end, error, etc.
Node.js streams are based on an event-driven model using EventEmitter. Key events include:
-
Readable Stream Events:
Readable streams in Node.js have two core states: flowing mode and paused mode.
Flowing Mode: Data is automatically read from the underlying system and pushed to consumers through events (such as 'data'), without manual intervention. Suitable for high-speed continuous data processing. If no listeners are set up or piped to a destination, data may be lost.
Paused Mode: The default state, where data must be explicitly fetched by calling
stream.read()
or by listening to the 'readable' event. Suitable for fine-grained control of the data flow. Switching to flowing mode can be achieved by listening to the 'data' event, callingresume()
, or using thepipe()
method.data
- Triggered when the stream passes a data chunk to the consumer. Once a 'data' event listener is set, the readable stream automatically switches to flowing mode.
readable.on("data", (chunk) => { console.log(`Received ${chunk.length} bytes of data`); });
end
- Triggered when there is no more data to consume from the streamerror
- Triggered when an underlying system operation fails or an error occurs in the stream implementationclose
- Triggered when the stream or its underlying resource (such as a file descriptor) is closed
-
Writable Stream Events:
drain
- Triggered when the writable stream can receive more data (key event for backpressure)finish
- Triggered whenend()
is called and all data has been flushed to the underlying systempipe
/unpipe
- Triggered when a readable stream is piped to/from the writable stream
Event-Driven Programming Model
The event mechanism of streams enables an efficient producer-consumer pattern:
// Typical event-driven stream processing
const readable = getReadableStreamSomehow();
readable.on("data", (chunk) => {
console.log("Processing data chunk:", chunk);
// Simulate asynchronous processing
process.nextTick(() => {
if (!writable.write(chunk)) {
// Handle backpressure
readable.pause();
writable.once("drain", () => readable.resume());
}
});
});
readable.on("end", () => {
console.log("Data reading completed");
writable.end();
});
readable.on("error", (err) => {
console.error("Reading error:", err);
});
pipe()
pipe()
is one of the most powerful features of streams, establishing a direct channel from a readable stream to a writable stream:
// Basic usage
readable.pipe(writable);
// Chained piping
sourceStream
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(destinationStream);
// Modern alternative (recommended)
const { pipeline } = require("stream/promises");
await pipeline(
sourceStream,
transformStream1,
transformStream2,
destinationStream
);
pipe() vs pipeline():
Feature | pipe() | pipeline() |
---|---|---|
Automatic error propagation | ✗ | ✓ |
Promise support | ✗ | ✓ |
Backpressure handling | ✓ | ✓ |
Stream cleanup | Manual | Automatic |
read()/write()
-
read(size) - Explicitly reads data from a readable stream
const data = readable.read(1024); // Try to read 1024 bytes if (data !== null) { processData(data); }
-
write(chunk[, encoding][, callback]) - Writes data to a writable stream
This function returns a boolean value indicating whether the writable stream's buffer has reached or exceeded its capacity limit. If the buffer is full or the data volume exceeds the limit, it returns
false
; otherwise, it returnstrue
.Returning
false
indicates that the current write operation may cause backpressure (backpressure), suggesting pausing writes or waiting for the 'drain' event before continuing. Returningtrue
means the buffer still has space, and more data can be safely written.const canWriteMore = writable.write(chunk, "utf8", () => { console.log("Write completed"); }); if (!canWriteMore) { // Handle backpressure }
pause()/resume()
Control the data flow of a readable stream:
// Pause data flow
readable.pause();
// Resume data flow
readable.resume();
// Practical application: Combined with backpressure management
writable.on("drain", () => {
console.log("drain event: Can continue writing");
readable.resume();
});
readable.on("data", (chunk) => {
if (!writable.write(chunk)) {
readable.pause();
}
});
What is Backpressure?
Backpressure is the pressure generated in a stream system when the data production speed exceeds the consumption speed. Just like water flowing too fast in a pipe can increase pressure, data flowing too fast can cause:
- Memory Accumulation: Unprocessed data accumulates in the buffer
- Resource Exhaustion: May exhaust memory or file descriptors
- Performance Degradation: Frequent GC triggers, slowing down the system
How to Handle Mismatched Data Production and Consumption Speeds?
-
Automatic Backpressure Handling with pipe()
// pipe() internally handles backpressure readable.pipe(writable);
-
Manual Backpressure Control Mode
readable.on("data", (chunk) => { const canContinue = writable.write(chunk); if (!canContinue) { readable.pause(); // Pause reading writable.once("drain", () => { readable.resume(); // Resume reading }); } });
-
High Water Mark Configuration
// Custom buffer threshold (default 16KB) const readable = new Readable({ highWaterMark: 64 * 1024, // 64KB }); const writable = new Writable({ highWaterMark: 128 * 1024, // 128KB });
-
Modern Asynchronous Iteration Approach
// Node.js 10+ supports asynchronous iterators async function processStream() { for await (const chunk of readable) { const canContinue = writable.write(chunk); if (!canContinue) { await new Promise((resolve) => { writable.once("drain", resolve); }); } } }
Backpressure Handling Flowchart:
[Readable Stream] --(data too fast)--> [Buffer Full]
↓
[write() returns false]
↓
[Readable Stream Paused] --waiting--> [drain event]
↓
[Readable Stream Resumed]
Practical Example: Custom Backpressure-Aware Transform Stream
const { Transform } = require("stream");
class PressureAwareTransform extends Transform {
constructor(options) {
super({ ...options, highWaterMark: 32 * 1024 });
this.processItem = this.processItem.bind(this);
this.pending = 0;
this.concurrency = 4;
}
_transform(chunk, enc, cb) {
this.pending++;
this.processItem(chunk, cb);
// Backpressure control
if (this.pending >= this.concurrency) {
this.pause();
}
}
async processItem(chunk, cb) {
try {
const result = await this.asyncOperation(chunk);
this.push(result);
if (--this.pending < this.concurrency) {
this.resume();
}
cb();
} catch (err) {
this.emit("error", err);
}
}
async asyncOperation(data) {
// Simulate asynchronous operation
return new Promise((resolve) => {
setTimeout(() => resolve(data.toString().toUpperCase()), 100);
});
}
}
Understanding and correctly implementing the backpressure mechanism is key to building robust stream applications. Modern Node.js versions provide various automated handling solutions, but manual control of backpressure is still a necessary optimization in high-performance scenarios.
Inheriting from the Readable Class
Creating a custom readable stream requires inheriting from the stream.Readable
class and implementing the _read()
method. This is a common pattern in Node.js stream implementations:
const { Readable } = require("stream");
class MyReadable extends Readable {
constructor(options) {
// Call the parent class constructor
super(options);
// Initialize custom state
this.current = 0;
this.max = options.max || 100;
}
// Must implement the _read method
_read(size) {
// size parameter indicates the number of bytes requested by the consumer (for reference only)
if (this.current >= this.max) {
// Push null to indicate the end of the stream
this.push(null);
} else {
// Generate a data chunk
const chunk = `Data chunk ${this.current++}\n`;
// Push the data chunk into the read queue
this.push(chunk);
}
}
}
Implementing the _read Method
Key points:
- When called, it should synchronously or asynchronously push data
- Must call the
push()
method to provide data - When data ends, push
null
- The
size
parameter is just a suggestion and can be ignored
Usage Example:
// Create an instance of a custom readable stream
const myStream = new MyReadable({ max: 5 });
// Consume data
myStream.on("data", (chunk) => {
console.log("Received:", chunk.toString());
});
myStream.on("end", () => {
console.log("Stream ended");
});
Of course, you might also be curious about why there is a createReadStream
method that can directly read files and convert them into streams. Let's try to implement one ourselves.
import { Readable } from "stream";
import { open } from "fs/promises";
class FileReadStream extends Readable {
private fd: number | null = null;
private position: number = 0;
private filePath: string;
constructor(filePath: string) {
super();
this.filePath = filePath;
}
async _construct(callback: (error?: Error | null) => void) {
try {
const fileHandle = await open(this.filePath);
this.fd = fileHandle.fd;
callback();
} catch (error) {
callback(error as Error);
}
}
_read(size: number) {
const buffer = Buffer.alloc(size);
if (this.fd === null) {
this.push(null);
return;
}
// Read file content
const fs = require('fs');
fs.read(this.fd, buffer, 0, size, this.position, (err: NodeJS.ErrnoException | null, bytesRead: number) => {
if (err) {
this.destroy(err);
return;
}
// If no data is read, it means the end of the file has been reached
if (bytesRead === 0) {
this.push(null);
return;
}
// Update the position pointer
this.position += bytesRead;
// Push data to the stream
this.push(bytesRead < size ? buffer.slice(0, bytesRead) : buffer);
});
}
}
// Usage Example
const customStream = new FileReadStream('./weather_station.csv');
customStream.on('data', (chunk) => {
console.log(chunk.toString());
});
customStream.on('end', () => {
console.log('Reading completed');
});
customStream.on('error', (err) => {
console.error('An error occurred:', err);
});
Inheriting from the Writable Class
Custom writable streams need to inherit from stream.Writable
and implement the _write()
method:
const { Writable } = require("stream");
class MyWritable extends Writable {
constructor(options) {
super(options);
// Initialize custom state
this.data = [];
}
// Must implement the _write method
_write(chunk, encoding, callback) {
// chunk: The data buffer to be written
// encoding: If chunk is a string, specify the character encoding
console.log("Writing data:", chunk.toString());
// Store data
this.data.push(chunk.toString());
// Simulate an asynchronous operation
setTimeout(() => {
// Call callback upon completion (must be called)
callback();
}, 100);
}
// Optional _final method (triggered when end() is called)
_final(callback) {
console.log("All data has been written");
console.log("Complete data:", this.data.join(""));
callback();
}
}
Implementing the _write Method
Key points:
_write()
must eventually callcallback
callback
can be called synchronously or asynchronously- If the write fails, pass an Error to
callback
encoding
indicates the string encoding (when chunk is a Buffer, it might be 'buffer')
Usage Example:
const writer = new MyWritable();
writer.write('First data chunk\n');
writer.write('Second data chunk\n');
writer.end('Final data'); // Triggers _final
Inheriting from the Transform Class
Transform streams are the most flexible type of custom stream, inheriting from stream.Transform
and implementing the _transform()
method:
const { Transform } = require("stream");
class MyTransform extends Transform {
constructor(options) {
super(options);
// Initialize transformation state
this.lineCount = 0;
}
// Must implement the _transform method
_transform(chunk, encoding, callback) {
// Transformation logic
const data = chunk.toString();
this.lineCount += data.split("\n").length - 1;
// Push transformed data to the readable end
this.push(data.toUpperCase());
// Complete processing
callback();
}
// Optional _flush method (triggered before the stream ends)
_flush(callback) {
// Output statistics before the stream ends
this.push(`\nTotal lines: ${this.lineCount}\n`);
callback();
}
}
Implementing the _transform Method
Key points:
- Must call
push()
zero or more times to output transformation results - Must call
callback
to indicate processing is complete - Can use
_flush()
to perform cleanup operations before the stream ends
Usage Example:
const transform = new MyTransform();
// Use piping to connect
process.stdin.pipe(transform).pipe(process.stdout);
Here is a complete custom transform stream implementation for converting CSV data into JSON objects:
const { Transform } = require("stream");
class CSVToJSON extends Transform {
constructor(options) {
super({ ...options, readableObjectMode: true });
this.headers = null;
this.buffer = "";
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
// Process complete lines in the buffer
let lines = this.buffer.split("\n");
// Keep the last incomplete line
this.buffer = lines.pop() || "";
if (!this.headers) {
// The first line is the header
this.headers = lines.shift().split(",");
}
// Process data lines
for (const line of lines) {
if (line.trim()) {
try {
const values = this.parseCSVLine(line);
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i] ? values[i].trim() : "";
});
this.push(obj);
} catch (err) {
this.emit("error", err);
}
}
}
callback();
}
_flush(callback) {
// Process the remaining last line in the buffer
if (this.buffer && this.headers) {
try {
const values = this.parseCSVLine(this.buffer);
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i] ? values[i].trim() : "";
});
this.push(obj);
} catch (err) {
this.emit("error", err);
}
}
callback();
}
parseCSVLine(line) {
// Simple CSV parsing (actual projects should use more robust parsing logic)
return line.split(",").map((field) => {
// Remove quotes from the beginning and end of the field (if present)
return field.replace(/^"|"$/g, "");
});
}
}
Usage Example:
const fs = require("fs");
const { pipeline } = require("stream/promises");
async function processCSV(inputFile) {
const csvParser = new CSVToJSON();
csvParser.on("data", (obj) => {
console.log("Parsed object:", obj);
});
csvParser.on("error", (err) => {
console.error("CSV parsing error:", err);
});
await pipeline(fs.createReadStream(inputFile), csvParser);
console.log("CSV parsing completed");
}
// Assume there is a data.csv file
processCSV("data.csv").catch(console.error);
Key Points for Custom Stream Implementation:
-
Mode Selection: •
objectMode
: Process JavaScript objects instead of Buffers/strings • Binary mode: Process raw binary data -
Error Handling: • Always pass errors in the callback • Use
emit('error')
to notify of errors -
Performance Considerations: • Avoid CPU-intensive operations in
_transform
• Consider using worker threads for complex transformations -
Stream Lifecycle: • Initialization → Data Processing → End Cleanup • Properly implement
_flush
for resource cleanup
By creating custom streams, you can integrate any data source or processing logic into Node.js's stream ecosystem, seamlessly working with other stream implementations.
Large File Read and Write Example
When dealing with large files, stream-based methods can significantly reduce memory usage. Here is an example of file copying, demonstrating how to efficiently handle large files:
const fs = require("fs");
const { pipeline } = require("stream/promises");
async function copyLargeFile(src, dest) {
console.time("File copy time");
await pipeline(fs.createReadStream(src), fs.createWriteStream(dest));
console.timeEnd("File copy time");
}
// Usage Example (assuming bigfile.iso is a 2GB large file)
copyLargeFile("./bigfile.iso", "./copy.iso");
Performance Comparison: Stream vs Traditional Method
We compare the two methods through a benchmark test:
const fs = require("fs");
const { performance } = require("perf_hooks");
// Test file: 500MB test file
const testFile = "./test.data";
// Traditional method
async function traditionalMethod() {
const start = performance.now();
const data = await fs.promises.readFile(testFile);
await fs.promises.writeFile("./traditional-copy.data", data);
return performance.now() - start;
}
// Stream-based method
async function streamMethod() {
const start = performance.now();
await fs.promises.pipeline(
fs.createReadStream(testFile),
fs.createWriteStream("./stream-copy.data")
);
return performance.now() - start;
}
// Run the test
(async () => {
console.log("Traditional method time:", await traditionalMethod(), "ms");
console.log("Stream method time:", await streamMethod(), "ms");
// Memory usage comparison
console.log(
"Traditional method memory peak:",
process.memoryUsage().rss / 1024 / 1024,
"MB"
);
console.log(
"Stream method memory peak:",
process.memoryUsage().rss / 1024 / 1024,
"MB"
);
})();
Typical Test Results:
Method | Time for 500MB File | Memory Peak | Applicable Scenarios |
---|---|---|---|
Traditional Method | 1200ms | ~500MB | Small file processing |
Stream Method | 800ms | ~30MB | Large file processing |
Stream Processing of Request Bodies
When handling large HTTP request bodies, stream processing can prevent memory overflow:
const http = require("http");
const { pipeline } = require("stream/promises");
const fs = require("fs");
// File upload server
http
.createServer(async (req, res) => {
if (req.method === "POST" && req.url === "/upload") {
const fileWriter = fs.createWriteStream("./upload.data");
try {
await pipeline(req, fileWriter);
res.end("File uploaded successfully");
} catch (err) {
res.statusCode = 500;
res.end("Upload failed");
}
} else {
res.end("Please use POST method to upload files");
}
})
.listen(3000);
Stream-Generated Response Bodies
For dynamically generating large responses, stream processing can significantly improve TTFB (Time To First Byte):
// Stream a large JSON response
http
.createServer((req, res) => {
res.writeHead(200, {
"Content-Type": "application/json",
"Transfer-Encoding": "chunked",
});
// Start the JSON array
res.write("[\n");
// Simulate generating 10,000 records
let count = 0;
const max = 10000;
const interval = setInterval(() => {
if (count++ < max) {
res.write(
JSON.stringify({ id: count, data: "..." }) +
(count < max ? ",\n" : "\n")
);
} else {
clearInterval(interval);
res.end("]"); // End the JSON array
}
}, 10);
})
.listen(3001);
Combining Multiple Streams
The true power of streams lies in their ability to connect multiple processing steps into a pipeline:
const { pipeline } = require("stream/promises");
const zlib = require("zlib");
const crypto = require("crypto");
async function processFile(input, output) {
await pipeline(
fs.createReadStream(input),
// Step 1: Decompress
zlib.createGunzip(),
// Step 2: Decrypt
crypto.createDecipheriv("aes-256-cbc", "secret-key", "IV"),
// Step 3: Convert to uppercase
new Transform({
transform(chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
},
}),
// Step 4: Compress
zlib.createGzip(),
fs.createWriteStream(output)
);
}
Practical Example: Log Processing Pipeline
Here is a complete log processing system example, demonstrating the application of streams in real projects:
const { Transform, pipeline } = require("stream");
const byline = require("byline"); // Library for line-by-line stream processing
// 1. Log Parsing Transform Stream
class LogParser extends Transform {
_transform(chunk, enc, cb) {
const logEntry = chunk.toString();
try {
const parsed = this.parseLog(logEntry);
this.push(JSON.stringify(parsed) + "\n");
} catch (err) {
// Log parsing errors to another stream
this.emit("parseError", { err, logEntry });
}
cb();
}
parseLog(entry) {
// Actual projects would have more complex parsing logic
const [timestamp, level, ...message] = entry.split(" ");
return { timestamp, level, message: message.join(" ") };
}
}
// 2. Log Filtering Transform Stream
class LogFilter extends Transform {
constructor(options) {
super(options);
this.level = options.level || "error";
}
_transform(chunk, enc, cb) {
const log = JSON.parse(chunk);
if (log.level === this.level) {
this.push(chunk);
}
cb();
}
}
// 3. Use a pipeline to process logs
async function processLogs(inputFile, outputFile) {
const startTime = Date.now();
// Create processing pipeline
const logStream = byline(fs.createReadStream(inputFile));
const parser = new LogParser();
const filter = new LogFilter({ level: "error" });
const output = fs.createWriteStream(outputFile);
// Error handling stream
const errorStream = new Writable({
write(chunk, enc, cb) {
fs.appendFileSync("./parse-errors.log", chunk.logEntry + "\n");
cb();
},
});
// Listen for parsing errors
parser.on("parseError", (errObj) => {
errorStream.write(errObj);
});
// Build processing pipeline
await pipeline(logStream, parser, filter, output);
console.log(`Log processing completed in ${Date.now() - startTime}ms`);
}
// Run log processing
processLogs("./app.log", "./errors.json").catch(console.error);
Log Processing Pipeline Workflow:
- Read: Create a readable stream from the log file
- Split Lines: Use
byline
to split the stream by lines - Parse: Parse each line into a structured JSON object
- Filter: Only keep logs at the error level
- Output: Write the results to a new file
- Error Handling: Handle parsing errors separately
This example demonstrates the powerful combination capabilities of streams, allowing you to build complex but efficient data processing systems. With stream processing, even when handling GB-level log files, memory usage can remain stable.
Centralized Error Handling
In stream-based applications, it is recommended to use a centralized error handling mechanism to manage errors uniformly:
const { pipeline } = require('stream/promises');
async function processWithPipeline() {
try {
await pipeline(
fs.createReadStream('input.txt'),
new TransformStream(), // Custom transform stream
fs.createWriteStream('output.txt')
);
} catch (err) {
// Centralized handling of all potential stream errors
console.error('Pipeline processing failed:', err);
// Perform cleanup operations
await cleanupResources();
// Retry or exit based on the error
if (shouldRetry(err)) {
return processWithPipeline();
}
process.exit(1);
}
}
// Traditional pipe() error handling comparison
readable
.pipe(transform)
.pipe(writable)
.on('error', (err) => {
// Manual cleanup of all streams is required
readable.destroy();
transform.destroy();
writable.destroy();
console.error('Processing failed:', err);
});
Error Propagation Mechanism
Understanding the error propagation behavior in Node.js streams is crucial:
-
Automatic Propagation:
pipeline()
automatically propagates errors and cleans up all streams.pipe()
does not automatically propagate errors; manual handling is required.
-
Custom Error Handling:
class SafeTransform extends Transform { _transform(chunk, enc, cb) { try { const result = this.processData(chunk); cb(null, result); // First argument is the error object } catch (err) { // Handle transformation error this.emit('error', new Error(`Data processing failed: ${err.message}`)); cb(); // Still need to call the callback } } }
-
Error Categorization Handling:
// Handle errors based on their type stream.on('error', (err) => { if (err.code === 'ENOENT') { console.error('File not found'); } else if (err instanceof CustomParseError) { console.error('Parsing error:', err.details); } else { console.error('Unknown error:', err); } });
High Water Mark Configuration
The high water mark is a key parameter for controlling stream memory usage:
// Readable stream configuration (default 16KB)
const readable = new Readable({
highWaterMark: 64 * 1024 // 64KB
});
// Writable stream configuration (default 16KB)
const writable = new Writable({
highWaterMark: 128 * 1024 // 128KB
});
// Object mode configuration (default 16 objects)
const objectStream = new Transform({
objectMode: true,
highWaterMark: 100 // 100 objects
});
Configuration Recommendations: • Adjust based on data chunk size: Use higher values for larger chunks. • Balance memory and throughput: Higher values increase memory usage but improve performance. • Monitor actual usage:
setInterval(() => {
console.log('Memory usage:', process.memoryUsage().rss);
console.log('Readable stream buffer:', readable._readableState.length);
console.log('Writable stream buffer:', writable._writableState.length);
}, 1000);
Buffer Management
Optimizing buffer usage can significantly enhance performance:
-
Avoid Unnecessary Buffering:
// Bad practice: Accumulating data in the transform stream class BadTransform extends Transform { constructor() { super(); this.data = []; } _transform(chunk, enc, cb) { this.data.push(chunk); // Memory usage keeps growing cb(); } } // Good practice: Process data immediately class GoodTransform extends Transform { _transform(chunk, enc, cb) { this.push(process(chunk)); // Process and push immediately cb(); } }
-
Reuse Buffers:
const pool = require('buffer-pool'); // Third-party buffer pool class PooledTransform extends Transform { _transform(chunk, enc, cb) { // Allocate a buffer from the pool const buffer = pool.alloc(1024); // Process data into the buffer... processDataInto(chunk, buffer); this.push(buffer); pool.free(buffer); // Return buffer to the pool cb(); } }
Best Practices for Piping
-
Prefer
pipeline
overpipe
for error handling:const { pipeline } = require('stream/promises'); // Best practice: Automatic error propagation and cleanup async function processData() { await pipeline( fs.createReadStream('input'), zlib.createGzip(), encryptStream, fs.createWriteStream('output.gz.enc') ); }
-
Readability in piping:
// Bad practice: Long pipeline is hard to read input.pipe(a).pipe(b).pipe(c).pipe(d).pipe(output); // Good practice: Organize in lines input .pipe(transformA) .pipe(transformB) .pipe(transformC) .pipe(output);
-
Adding middleware for processing:
function createProcessingPipeline() { const stream = require('stream'); const passThrough = new stream.PassThrough(); // Add monitoring middleware passThrough.on('data', (chunk) => { monitor.recordChunk(chunk.length); }); return passThrough; } input .pipe(createProcessingPipeline()) .pipe(output);
Reusable Stream Design Patterns
-
Factory Pattern for Streams:
function createCSVParser(options = {}) { const parser = new Transform({ objectMode: true, ...options }); // Initialization logic... return parser; } // Usage example const csvParser = createCSVParser({ delimiter: '|' });
-
Decorator Pattern to Enhance Streams:
function withLogging(stream) { const duplex = new Duplex({ write(chunk, enc, cb) { console.log('Writing:', chunk.length, 'bytes'); stream.write(chunk, enc, cb); }, read(size) { const chunk = stream.read(size); if (chunk) console.log('Reading:', chunk.length, 'bytes'); this.push(chunk); } }); // Pass through original stream events stream.on('error', err => duplex.emit('error', err)); return duplex; } // Usage example const loggedStream = withLogging(fs.createReadStream('data'));
-
Composite Pattern for Complex Streams:
class CompositeTransform extends Transform { constructor() { super(); this.step1 = new Transform1(); this.step2 = new Transform2(); this.step3 = new Transform3(); // Internal piping this.step1.pipe(this.step2).pipe(this.step3); // Proxy data flow this.step3.on('data', chunk => this.push(chunk)); } _transform(chunk, enc, cb) { this.step1.write(chunk, enc, cb); } _flush(cb) { this.step1.end(() => { this.step3.once('end', cb); }); } }
Performance Optimization Checklist:
- Use
pipeline
instead ofpipe
for error handling. - Adjust high water mark based on data characteristics.
- Avoid accumulating large amounts of data in streams.
- Consider using buffer pools for buffer reuse.
- Design reusable stream components.
- Implement composite streams for complex processing.
- Add monitoring middleware to track stream performance.
By applying these best practices, you can build efficient and robust stream-based systems capable of handling various data processing needs.
Browser Support for Streams
The Web Streams API is a modern, standardized stream processing API supported in contemporary browsers. It offers a similar abstraction to Node.js streams but with some differences:
// Creating a readable stream
const readableStream = new ReadableStream({
start(controller) {
// Stream initialization logic
controller.enqueue('Data chunk 1');
controller.enqueue('Data chunk 2');
controller.close();
},
pull(controller) {
// Called when the consumer requests more data
},
cancel(reason) {
// Cleanup resources when the stream is canceled
}
});
// Creating a writable stream
const writableStream = new WritableStream({
write(chunk) {
console.log('Writing data:', chunk);
},
close() {
console.log('Stream closed');
},
abort(err) {
console.error('Stream error:', err);
}
});
// Connecting streams
readableStream.pipeTo(writableStream);
Browser Support:
- Modern browsers like Chrome, Firefox, Edge, and Safari have implemented the Web Streams API.
- Suitable for use in Service Workers, Fetch API, and other web platform features.
Differences from Node.js Streams
Similarities:
- Both are based on the same data stream processing concepts.
- Both support backpressure mechanisms.
- Both can be piped together.
Key Differences:
Feature | Node.js Streams | Web Streams API |
---|---|---|
Creation Method | Inherit base classes | Use constructors |
Error Handling | EventEmitter | Promise-based |
Data Chunk Types | Buffers/Strings | Uint8Array/Strings |
Piping Methods | pipe() | pipeTo() , pipeThrough() |
Backpressure Signals | pause() , resume() | Built-in Promise mechanism |
Transform Streams | Transform | TransformStream |
In Node.js, a ReadableStream
can be converted to a Web Stream using Readable.toWeb()
, and vice versa with Duplex.fromWeb()
.
// Node.js Stream to Web Stream
import { Readable } from 'stream';
const nodeReadable = Readable.from(['hello', 'world']);
const webReadableStream = Readable.toWeb(nodeReadable);
// Web Stream to Node.js Stream
const nodeDuplex = Duplex.fromWeb(webDuplexStream);
Stream-Based Processing of Response Bodies
The Fetch API's response body is inherently a readable stream, enabling stream-based processing of large responses:
// Stream-based processing of a JSON response
async function processLargeJSON(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let result = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Process each chunk
const chunk = decoder.decode(value, { stream: true });
result += chunk;
// Perform incremental processing
const partialData = tryParseJSON(result);
if (partialData) updateUI(partialData);
}
return JSON.parse(result);
}
// Stream-based file download
async function downloadFile(url, outputStream) {
const response = await fetch(url);
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
outputStream.write(value);
}
outputStream.end();
}
Progressive Rendering Applications
Using stream-based responses allows for progressive content rendering:
// Server-side code (Node.js)
app.get('/streaming-page', (req, res) => {
// Immediately send HTML headers
res.write(`
<!DOCTYPE html>
<html>
<head><title>Progressive Rendering</title></head>
<body>
`);
// Simulate asynchronous content loading
const items = ['Item 1', 'Item 2', 'Item 3'];
let index = 0;
const timer = setInterval(() => {
if (index < items.length) {
res.write(`<div>${items[index++]}</div>`);
} else {
clearInterval(timer);
res.end('</body></html>');
}
}, 500);
});
// Client-side code
async function renderStreamingResponse() {
const response = await fetch('/streaming-page');
const reader = response.body.getReader();
const decoder = new TextDecoder();
const contentEl = document.getElementById('content');
while (true) {
const { done, value } = await reader.read();
if (done) break;
const htmlChunk = decoder.decode(value);
contentEl.innerHTML += htmlChunk;
// Scroll to the latest content
window.scrollTo(0, document.body.scrollHeight);
}
}
Service Workers, combined with the Stream API, enable powerful offline caching and response handling:
// Cache-first stream-based response strategy
self.addEventListener('fetch', (event) => {
const url = new URL(event.request.url);
if (url.pathname.endsWith('.mp4')) {
event.respondWith(
caches.match(event.request).then((cachedResponse) => {
// Return cached response if available
if (cachedResponse) {
return cachedResponse;
}
// Stream and cache the response
return fetch(event.request).then((response) => {
const { readable, writable } = new TransformStream();
// Asynchronously cache data
response.clone().body.pipeTo(
new WritableStream({
write(chunk) {
// Implement caching logic here
cache.put(event.request, new Response(chunk));
}
})
);
// Immediately return the response stream
return new Response(readable, response);
});
})
);
}
});
// Stream multiple responses into one
self.addEventListener('fetch', (event) => {
if (event.request.url.endsWith('combined.json')) {
event.respondWith(
(async () => {
const [part1, part2] = await Promise.all([
fetch('/api/part1'),
fetch('/api/part2')
]);
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
// Write JSON start
writer.write(new TextEncoder().encode('{"combined":['));
// Stream merge two responses
part1.body.pipeTo(new WritableStream({
write(chunk) {
writer.write(chunk);
}
}));
writer.write(new TextEncoder().encode(','));
part2.body.pipeTo(new WritableStream({
write(chunk) {
writer.write(chunk);
},
close() {
writer.write(new TextEncoder().encode(']}'));
writer.close();
}
}));
return new Response(readable, {
headers: { 'Content-Type': 'application/json' }
});
})()
);
}
});
Advantages of Stream Applications in Service Workers:
- Faster Content Presentation: Content can be displayed as it is downloaded.
- Memory Efficiency: No need to cache the entire response.
- Bandwidth Savings: Interrupt unnecessary downloads.
- Offline Experience: Combined with Cache API for seamless offline functionality.
Modern Stream-Based Application Architecture Example:
[Client] ← Stream Response → [Service Worker]
↓ ↑
[Edge Cache] ← Stream Processing → [Origin Server]
Through the Web Streams API, modern JavaScript applications can achieve consistent stream processing across both browser and Node.js environments, providing a standardized solution for handling large data and real-time applications.