Reading text and binary data with Node.js readable streams
This tutorial will explain the use and creation of node.js readable streams:
Version Information
- Author: Jeff Barczewski
- Published: August 4th, 2013
- Tags: nodejs, streams
- Level: Intermediate
- Prerequisites: crypto, events, install npm modules
- Node.js v0.10+ (latest stable is v0.10.15 as of this writing), but streams have generally been a part of Node.js from its early days
- Streams2 can be used with older versions of node by using npm module readable-stream
Consuming or using readable streams
Simple example of reading a file and echoing it to stdout:
var fs = require('fs');
var readStream = fs.createReadStream('myfile.txt');
readStream.pipe(process.stdout);
Creating a sha1 digest of a file and echoing the result to stdout (similar to shasum):
var crypto = require('crypto');
var fs = require('fs');
var readStream = fs.createReadStream('myfile.txt');
var hash = crypto.createHash('sha1');
readStream
.on('data', function (chunk) {
hash.update(chunk);
})
.on('end', function () {
console.log(hash.digest('hex'));
});
The data
event is fired on the readable stream for each chunk of data, so you update the digest with for each chunk as you go, then finally the end
event is fired when the stream has ended so you can output the final result.
Note that each time you call .on()
to register a listener it returns the original stream so you can chain methods easily.
With Node.js 0.10+ there is a better way to consume streams. The Readable
interface makes it easier to work with streams, especially streams where you want to do other things between creating a stream and using the stream. These newer Readable
streams are pull
streams where you request the data when you are read for it rather than having the data pushed to you.
var crypto = require('crypto');
var fs = require('fs');
var readStream = fs.createReadStream('myfile.txt');
var hash = crypto.createHash('sha1');
readStream
.on('readable', function () {
var chunk;
while (null !== (chunk = readStream.read())) {
hash.update(chunk);
}
})
.on('end', function () {
console.log(hash.digest('hex'));
});
The key to understanding this example is that with the new streams2 Readable interface, a readable
event will be emitted as soon as data is available to be read and you can call .read()
to read chunks of it. Once there is no more data available, .read()
returns null, but then another readable
event is fired again when data is available again. This continues until the end of the file when end
is fired like before.
Producing a readable stream
To use streams with the file system or from http, you can use the core fs and http methods to construct a stream, but how would you create your own stream and fill it with data? This might be data from a database or from any number of sources.
Here is an example of creating a readable stream which is generated from random binary data, then hashing it like before. This would be useful in creating streams for testing:
var crypto = require('crypto');
var stream = require('stream');
var util = require('util');
var Readable = stream.Readable;
function RandomStream(length, options) {
// allow calling with or without new
if (!(this instanceof RandomStream)) {
return new RandomStream(length, options);
}
// init Readable
Readable.call(this, options);
// save the length to generate
this.lenToGenerate = length;
}
util.inherits(RandomStream, Readable);
RandomStream.prototype._read = function (size) {
if (!size) size = 1024; // default size
var ready = true;
while (ready) { // only cont while push returns true
if (size > this.lenToGenerate) { // only this left
size = this.lenToGenerate;
}
if (size) {
ready = this.push(crypto.randomBytes(size));
this.lenToGenerate -= size;
}
// when done, push null and exit loop
if (!this.lenToGenerate) {
this.push(null);
ready = false;
}
}
};
// now use our RandomStream and compute digest of it
var readStream = new RandomStream(204800);
var hash = crypto.createHash('sha1');
readStream
.on('readable', function () {
var chunk;
while (null !== (chunk = readStream.read())) {
console.log('chunk: ', chunk);
hash.update(chunk);
}
})
.on('end', function () {
console.log('digest: ', hash.digest('hex'));
});
Note: after _read()
is called, we should continue reading until we are done or until push()
returns false.
Using Streams2 with older Node.js versions
If you want to make this code work with Node.js older than 0.10, you can include a dependency for readable-stream
in your package.json and change line 5 to read:
var Readable = stream.Readable ||
require('readable-stream').Readable;
This will use the native Readable stream if Node.js version is 0.10+ and if not, then it will load the polyfill readable-stream
module and use it from there.
Pause / resume of stream and Streams2
Since streams can sometimes provide data more quickly than an application can consume it, streams include the ability to pause and the data is buffered until the stream is resumed. Prior to the streams2, you would need to pay careful attention to pausing and resuming methods as well as buffering the data until resumed. However Readable
from streams2 (Node.js 0.10+ or via the readable-stream package) implements that functionality for you and streams are automatically paused until .read()
is called.
You can also wrap old streams with a Readable
to implement the new interface on the old stream:
var readStream = new Readable().wrap(oldStream);
Another situation where you need to worry about pause and resume is if your consuming code uses the old push style interface calling .on('data', listener)
, this puts the stream in backwards compatibility mode and you would need to call .pause()
and .resume()
to control the rate of data coming to your application. See the Stream API docs for details if you are using the older interface in your code.
Object Streams
Initially when streams were introduced the official API indicated that data chunks being streamed would be Buffers or strings, however many users found that it was great to be able to stream objects as well. Streams2 in Node.js 0.10+ added an object mode to streams to formalize how this should work. When in object mode, .read(n)
simply returns the next object (ignoring the n
).
To switch a stream into object mode, set the objectMode
property to true
in the options used to create your Readable stream
var Readable = require('stream').Readable;
var util = require('util');
function CountingObjectStream(length, options) {
if (!(this instanceof CountingObjectStream)) {
return new CountingObjectStream(length, options);
}
if (!options) options = {}; // ensure object
options.objectMode = true; // forcing object mode
Readable.call(this, options);
this.lenToCount = length; // how far to count
this.index = 0; // to track our count
}
util.inherits(CountingObjectStream, Readable);
CountingObjectStream.prototype._read = function () {
this.index += 1;
if (this.index > this.lenToCount) {
return this.push(null); // done, return
}
// pushing number, but could be any non-null obj
this.push(this.index);
};
// consume this stream and output to stdout
// coercing it to a string
var readStream = new CountingObjectStream(10);
readStream
.on('readable', function () {
var obj;
while (null !== (obj = readStream.read())) {
console.log(obj);
}
});
So you can use objects in streams just as easily as you can use Buffers and strings, but the only limitation is that the objects you pass cannot be null
since that will indicate that the stream has ended.
Node.js readable streams are flexible and simple
Node.js readable streams are easy to consume and even simple to construct. You can not only stream binary and string data but also objects and still take advantage of stream functionality.
I hope you enjoyed this quick tour of readable streams, let me know if you have any questions.