Version Information

  • Author: Jeff Barczewski
  • Published: August 30th, 2013
  • Updated: 2014-09-06 - Correct duplex through stream, add handlers once
  • Tags: nodejs, streams
  • Level: Intermediate
  • Prerequisites: events, setInterval, setTimer, install npm modules, util
  • Node.js v0.10+ (latest stable is v0.10.17 as of this writing), but streams have generally been a part of Node.js from its early days
  • Streams2 Duplex abstract class can be used with older versions (prior to v0.10) of node by using npm module readable-stream (tested with v1.0.15)

What is a duplex stream?

The general definition of a duplex stream is one which is both readable and writable similar to a Transform stream. However most often a Duplex stream is usually referring to a stream which actually has two full independent streams embedded in it, one flowing out and one flowing in.

Duplex graphic

If stdin and stdout were packaged into a Duplex stream then from this one single duplex stream you could both read and write data simultaneously.

A concrete example of a duplex stream is a network socket. A Node.js socket builds on a duplex stream to implement the ability to transmit and receive data over the network. This one socket instance has two independent channels, one for sending data, and one for receiving data.

Creating a custom duplex stream

To create a custom duplex stream with Node.js v0.10+, creating a duplex stream is simple:

  1. Create a class which inherits from the Duplex abstract class
  2. Implement _write(chunk, encoding, cb) method for sending data
  3. Implement _read(n) method for receiving data

For backward compatibility with Node.js prior to v0.10, use the npm module readable-stream to polyfill. If stream.Duplex does not exist (old Node.js) then use Duplex from readable-stream:

var Duplex = stream.Duplex ||
  require('readable-stream').Duplex;

Creating duplex stream with read timer and write logging

To show the independent nature of the two embedded streams, this example creates a duplex class which:

  1. generates the current time string every second on the read stream
  2. outputs the write stream to stdout
var stream = require('stream');
var util = require('util');
var Duplex = stream.Duplex ||
  require('readable-stream').Duplex;


/**
 * Duplex stream which:
 *  - generates current time every sec for rstream
 *  - outputs the write stream to stdout
 *
 * Stop the read stream by calling stopTimer
 */
function DRTimeWLog(options) {
  // allow use without new operator
  if (!(this instanceof DRTimeWLog)) {
    return new DRTimeWLog(options);
  }
  Duplex.call(this, options); // init
  this.readArr = []; // array of times to read

  // every second, add new time string to array
  this.timer = setInterval(addTime, 1000, this.readArr);
}
util.inherits(DRTimeWLog, Duplex);

/* add new time string to array to read */
function addTime(readArr) {
  readArr.push((new Date()).toString());
}

DRTimeWLog.prototype._read = function readBytes(n) {
  var self = this;
  while (this.readArr.length) {
    var chunk = this.readArr.shift();
    if (!self.push(chunk)) {
      break; // false from push, stop reading
    }
  }
  if (self.timer) { // continuing if have timer
    // call readBytes again after a second has
    // passed to see if more data then
    setTimeout(readBytes.bind(self), 1000, n);
  } else { // we are done, push null to end stream
    self.push(null);
  }
};

/* stops the timer and ends the read stream */
DRTimeWLog.prototype.stopTimer = function () {
  if (this.timer) clearInterval(this.timer);
  this.timer = null;
};

/* for write stream just ouptut to stdout */
DRTimeWLog.prototype._write =
  function (chunk, enc, cb) {
    console.log('write: ', chunk.toString());
    cb();
  };


// try out DRTimeWLog
var duplex = new DRTimeWLog();
duplex.on('readable', function () {
  var chunk;
  while (null !== (chunk = duplex.read())) {
    console.log('read: ', chunk.toString());
  }
});
duplex.write('Hello \n');
duplex.write('World');
duplex.end();

// after 3 seconds stop the timer
setTimeout(function () {
  duplex.stopTimer();
}, 3000);

The above example has output similar to the following:

write:  Hello

write:  World
read:  Mon Aug 25 2013 17:57:14 GMT-0500 (CDT)
read:  Mon Aug 25 2013 17:57:15 GMT-0500 (CDT)
read:  Mon Aug 25 2013 17:57:16 GMT-0500 (CDT)

Creating duplex passthrough stream

For this next example, I will show how you could create a duplex passthrough stream which gives you access to each of its embedded streams (channels). This would allow you to independently pipe the streams through additional transforms without having to build all of the logic directly into a single module. By keeping our streams light and focused we make it easy to test and evolve our software over time. For instance one might filter, compress, encrypt or manipulate data as it flows through one or both channels.

DuplexThrough graphic

As the graphic indicates, you access the left side of the duplex via the normal stream methods pipe(), read(), write() and the other side of those channels is exposed as inRStream and outWStream.

The implementation of our duplex passthrough stream involves embedding two PassThrough streams into our class which are then wired up in our _read() and _write() methods.

For brevity this example outputs each channel to stdout, prefixing with in or out to indicate the channel being displayed, however you could just as easily pipe the streams through additional transforms on the way in or out.

var fs = require('fs');
var stream = require('stream');
var util = require('util');

var Duplex = stream.Duplex ||
  require('readable-stream').Duplex;

var PassThrough = stream.PassThrough ||
  require('readable-stream').PassThrough;

/**
 * Duplex stream created with two transform streams
 * - inRStream - inbound side read stream
 * - outWStream - outbound side write stream
 */
function DuplexThrough(options) {
  if (!(this instanceof DuplexThrough)) {
    return new DuplexThrough(options);
  }
  Duplex.call(this, options);
  this.inRStream = new PassThrough();
  this.outWStream = new PassThrough();
  this.leftHandlersSetup = false; // only setup the handlers once
}
util.inherits(DuplexThrough, Duplex);

/* left inbound side */
DuplexThrough.prototype._write =
  function (chunk, enc, cb) {
    this.inRStream.write(chunk, enc, cb);
  };

/* left outbound side */
/**
 * The first time read is called we setup listeners
 */
DuplexThrough.prototype.setupLeftHandlersAndRead = function (n) {
  var self = this;
  self.leftHandlersSetup = true; // only set handlers up once
  self.outWStream
    .on('readable', function () {
      self.readLeft(n);
    })
    .on('end', function () {
      self.push(null); // EOF
    });
};

DuplexThrough.prototype.readLeft = function (n) {
  var chunk;
  while (null !==
         (chunk = this.outWStream.read(n))) {
    // if push returns false, stop writing
    if (!this.push(chunk)) break;
  }
};

DuplexThrough.prototype._read = function (n) {
  // first time, setup handlers then read
  if (!this.leftHandlersSetup) {
    return this.setupLeftHandlersAndRead(n);
  }
  // otherwise just read
  this.readLeft(n);
};


// try out DuplexThrough w/fileReadStream and writes
var rstream = fs.createReadStream('myfile.txt');
var duplex = new DuplexThrough();

// inbound side - pipe file through
duplex.inRStream
  .on('readable', function () {
    var chunk;
    while (null !==
           (chunk = duplex.inRStream.read())) {
      console.log('in: ', chunk.toString());
    }
  });
rstream.pipe(duplex);


// outbound side - write Hello \nworld
duplex
  .on('readable', function () {
    var chunk;
    while (null !== (chunk = duplex.read())) {
      console.log('out: ', chunk.toString());
    }
  });
duplex.outWStream.write('Hello \n');
duplex.outWStream.write('world');
duplex.outWStream.end();

Running the example produces output like:

out:  Hello
world
in:  Simple text file

 - one
 - two
 - three

Duplex streams summary

Streams are a powerful feature of Node.js and Duplex streams make it easy to encapsulate a combination of two channels into one instance. Most often we use prebuilt duplex streams like Socket but in the case where additional control is necessary, Node.js gives you the capability to create your own by inheriting from Duplex and implementing _read and _write methods.

If you enjoyed reading this article, I have a series of related articles on Node.js streams each focused on a different aspect which are linked at the end of this article.

Finally, to stay current on all the latest happenings, subscribe to my newsletter.

For additional reading