This tutorial will explain the use and creation of node.js readable streams:

Version Information

  • Author: Jeff Barczewski
  • Published: August 3rd, 2013
  • Tags: nodejs, streams
  • Level: Intermediate
  • Prerequisites: crypto, events, install npm modules
  • Node.js v0.10+ (latest stable is v0.10.15 as of this writing), but streams have generally been a part of Node.js from its early days
  • Streams2 can be used with older versions of node by using npm module readable-stream

Consuming or using readable streams

Simple example of reading a file and echoing it to stdout:

var fs = require('fs');
var readStream = fs.createReadStream('myfile.txt');
readStream.pipe(process.stdout);

Creating a sha1 digest of a file and echoing the result to stdout (similar to shasum):

var crypto = require('crypto');
var fs = require('fs');

var readStream = fs.createReadStream('myfile.txt');
var hash = crypto.createHash('sha1');
readStream
  .on('data', function (chunk) {
    hash.update(chunk);
  })
  .on('end', function () {
    console.log(hash.digest('hex'));
  });

The data event is fired on the readable stream for each chunk of data, so you update the digest with for each chunk as you go, then finally the end event is fired when the stream has ended so you can output the final result.

Note that each time you call .on() to register a listener it returns the original stream so you can chain methods easily.

With Node.js 0.10+ there is a better way to consume streams. The Readable interface makes it easier to work with streams, especially streams where you want to do other things between creating a stream and using the stream. These newer Readable streams are pull streams where you request the data when you are read for it rather than having the data pushed to you.

var crypto = require('crypto');
var fs = require('fs');
var readStream = fs.createReadStream('myfile.txt');
var hash = crypto.createHash('sha1');
readStream
  .on('readable', function () {
    var chunk;
    while (null !== (chunk = readStream.read())) {
      hash.update(chunk);
    }
  })
  .on('end', function () {
    console.log(hash.digest('hex'));
  });

The key to understanding this example is that with the new streams2 Readable interface, a readable event will be emitted as soon as data is available to be read and you can call .read() to read chunks of it. Once there is no more data available, .read() returns null, but then another readable event is fired again when data is available again. This continues until the end of the file when end is fired like before.

Producing a readable stream

To use streams with the file system or from http, you can use the core fs and http methods to construct a stream, but how would you create your own stream and fill it with data? This might be data from a database or from any number of sources.

Here is an example of creating a readable stream which is generated from random binary data, then hashing it like before. This would be useful in creating streams for testing:

var crypto = require('crypto');
var stream = require('stream');
var util = require('util');

var Readable = stream.Readable;

function RandomStream(length, options) {
  // allow calling with or without new
  if (!(this instanceof RandomStream)) {
    return new RandomStream(length, options);
  }

  // init Readable
  Readable.call(this, options);

  // save the length to generate
  this.lenToGenerate = length;
}
util.inherits(RandomStream, Readable);

RandomStream.prototype._read = function (size) {
  if (!size) size = 1024; // default size
  var ready = true;
  while (ready) { // only cont while push returns true
    if (size > this.lenToGenerate) { // only this left
      size = this.lenToGenerate;
    }
    if (size) {
      ready = this.push(crypto.randomBytes(size));
      this.lenToGenerate -= size;
    }

    // when done, push null and exit loop
    if (!this.lenToGenerate) {
      this.push(null);
      ready = false;
    }
  }
};


// now use our RandomStream and compute digest of it
var readStream = new RandomStream(204800);
var hash = crypto.createHash('sha1');
readStream
  .on('readable', function () {
    var chunk;
    while (null !== (chunk = readStream.read())) {
      console.log('chunk: ', chunk);
      hash.update(chunk);
    }
  })
  .on('end', function () {
    console.log('digest: ', hash.digest('hex'));
  });

Note: after _read() is called, we should continue reading until we are done or until push() returns false.

Using Streams2 with older Node.js versions

If you want to make this code work with Node.js older than 0.10, you can include a dependency for readable-stream in your package.json and change line 5 to read:

var Readable = stream.Readable ||
  require('readable-stream').Readable;

This will use the native Readable stream if Node.js version is 0.10+ and if not, then it will load the polyfill readable-stream module and use it from there.

Pause / resume of stream and Streams2

Since streams can sometimes provide data more quickly than an application can consume it, streams include the ability to pause and the data is buffered until the stream is resumed. Prior to the streams2, you would need to pay careful attention to pausing and resuming methods as well as buffering the data until resumed. However Readable from streams2 (Node.js 0.10+ or via the readable-stream package) implements that functionality for you and streams are automatically paused until .read() is called.

You can also wrap old streams with a Readable to implement the new interface on the old stream:

var readStream = new Readable().wrap(oldStream);

Another situation where you need to worry about pause and resume is if your consuming code uses the old push style interface calling .on('data', listener), this puts the stream in backwards compatibility mode and you would need to call .pause() and .resume() to control the rate of data coming to your application. See the Stream API docs for details if you are using the older interface in your code.

Object Streams

Initially when streams were introduced the official API indicated that data chunks being streamed would be Buffers or strings, however many users found that it was great to be able to stream objects as well. Streams2 in Node.js 0.10+ added an object mode to streams to formalize how this should work. When in object mode, .read(n) simply returns the next object (ignoring the n).

To switch a stream into object mode, set the objectMode property to true in the options used to create your Readable stream

var Readable = require('stream').Readable;
var util = require('util');

function CountingObjectStream(length, options) {
  if (!(this instanceof CountingObjectStream)) {
    return new CountingObjectStream(length, options);
  }
  if (!options) options = {}; // ensure object
  options.objectMode = true; // forcing object mode
  Readable.call(this, options);
  this.lenToCount = length;  // how far to count
  this.index = 0;  // to track our count
}
util.inherits(CountingObjectStream, Readable);

CountingObjectStream.prototype._read = function () {
  this.index += 1;
  if (this.index > this.lenToCount) {
    return this.push(null); // done, return
  }

  // pushing number, but could be any non-null obj
  this.push(this.index);
};


// consume this stream and output to stdout
// coercing it to a string
var readStream = new CountingObjectStream(10);
readStream
  .on('readable', function () {
    var obj;
    while (null !== (obj = readStream.read())) {
      console.log(obj);
    }
  });

So you can use objects in streams just as easily as you can use Buffers and strings, but the only limitation is that the objects you pass cannot be null since that will indicate that the stream has ended.

Node.js readable streams are flexible and simple

Node.js readable streams are easy to consume and even simple to construct. You can not only stream binary and string data but also objects and still take advantage of stream functionality.

I hope you enjoyed this quick tour of readable streams, let me know if you have any questions.

For additional reading