This tutorial will show how to use and create custom transform streams for the manipulation of textual, binary, or object stream data.

Version Information

  • Author: Jeff Barczewski
  • Published: August 20th, 2013
  • Tags: nodejs, streams
  • Level: Intermediate
  • Prerequisites: buffer, crypto, events, install npm modules, zlib
  • Node.js v0.10+ (latest stable is v0.10.17 as of this writing), but streams have generally been a part of Node.js from its early days
  • Streams2 Transform abstract class can be used with older versions (prior to v0.10) of node by using npm module readable-stream (tested with v1.0.15)
  • Updated: September 1st, 2015

What are transform streams?

Node.js transform streams are streams which read input, process the data manipulating it, and then outputing new data.

transform-stream graphic

You can chain streams together to create complex processes by piping from one to the next.

double-transform-stream graphic

Using built-in Node.js transform streams

Node.js comes with a variety of transform streams in the Core API:

  • zlib - for gzip compressing and uncompressing
  • crypto - for encrypting, decrypting, and calculating message digests

Compressing stream with gzip

To gzip a stream simply create a gzip transform stream with zlib and pipe a stream through it. You can customize the compression level and buffers by passing an options object to the zlib.createGzip(options) factory. See gzip options for details.

This example reads myfile.txt, compresses the streaming data on the fly, and writes it to myfile.txt.gz

var fs = require('fs');
var zlib = require('zlib');

var gzip = zlib.createGzip();
var rstream = fs.createReadStream('myfile.txt');
var wstream = fs.createWriteStream('myfile.txt.gz');

rstream   // reads from myfile.txt
  .pipe(gzip)  // compresses
  .pipe(wstream)  // writes to myfile.txt.gz
  .on('finish', function () {  // finished
    console.log('done compressing');
  });

Uncompressing stream with gzip

This example reads the compressed file myfile.txt.gz from the previous example, decompresses, and outputs it to standard output.

var fs = require('fs');
var zlib = require('zlib');

var gunzip = zlib.createGunzip();
var rstream = fs.createReadStream('myfile.txt.gz');

rstream   // reads from myfile.txt.gz
  .pipe(gunzip)  // uncompresses
  .pipe(process.stdout);  // writes to stdout

Encrypting data stream with aes-256

Encrypting is very similar since we can use the crypto module which is built-in to Node.js. It provides all of the ciphers installed via openssl for which you can get a listing by running openssl list-cipher-algorithms from the bash command line.

Some of the more notable options available in my version of openssl are:

  • aes-256-cbc - AES 256 bit - cipher block chaining
  • bf-ofb - Blowfish - output feedback mode
  • cast5-ofb - Cast 5 - output feedback mode

Here is a simple example of creating an aes256 encrypting transform stream. Note that for this example I am not securely storing the password but in your real code you would want to keep that in a secure store.

var crypto = require('crypto');
var fs = require('fs');

// get your password from safe store
var password = new Buffer('my secret');
var aes = crypto.createCipher('aes-256-cbc', password);
var rstream = fs.createReadStream('myfile.txt');
var wstream = fs.createWriteStream('myfile.encrypted');

rstream   // reads from myfile.txt
  .pipe(aes)  // encrypts with aes256
  .pipe(wstream)  // writes to myfile.encrypted
  .on('finish', function () {  // finished
    console.log('done encrypting');
  });

Decrypting data stream with aes-256

Here is the code to decrypt the file and output on stdout:

var crypto = require('crypto');
var fs = require('fs');

// get your password from safe store
var password = new Buffer('my secret');
var aes = crypto.createDecipher('aes-256-cbc',
                                password);
var rstream = fs.createReadStream('myfile.encrypted');

rstream   // reads from myfile.txt
  .pipe(aes)  // decrypt with aes256
  .pipe(process.stdout);  // output stdout

Note the above example works on Node.js v0.10+, but it appears to encounter a bug when running on v0.8.25 which I haven’t looked into yet.

Creating custom transform streams

Often you will want to do your own transformations on a stream, so Node.js makes it easy to create custom transform streams with the v0.10 Transform abstract class. By using a polyfill npm module readable-stream we can make the code work with earlier versions of Node.js too, which I demonstrate below.

To implement, subclass Transform and implement prototype methods:

  • _transform(chunk, enc, cb) - reading chunks and pushing your transformed data
  • _flush(cb) - if you need to write additional data at the end after input is finished

Creating transform stream which uppercases all text

var stream = require('stream');
var util = require('util');

// node v0.10+ use native Transform, else polyfill
var Transform = stream.Transform ||
  require('readable-stream').Transform;

function Upper(options) {
  // allow use without new
  if (!(this instanceof Upper)) {
    return new Upper(options);
  }

  // init Transform
  Transform.call(this, options);
}
util.inherits(Upper, Transform);

Upper.prototype._transform = function (chunk, enc, cb) {
  var upperChunk = chunk.toString().toUpperCase();
  this.push(upperChunk);
  cb();
};


// try it out
var upper = new Upper();
upper.pipe(process.stdout); // output to stdout
upper.write('hello world\n'); // input line 1
upper.write('another line');  // input line 2
upper.end();  // finish

Creating a transform stream which calculates sha1 digest

var crypto = require('crypto');
var stream = require('stream');
var util = require('util');

// node v0.10+ use native Transform, else polyfill
var Transform = stream.Transform ||
  require('readable-stream').Transform;

function ShaSum(options) {
  // allow use without new
  if (!(this instanceof ShaSum)) {
    return new ShaSum(options);
  }

  // init Transform
  Transform.call(this, options);

  this.digester = crypto.createHash('sha1');
}
util.inherits(ShaSum, Transform);

/* during each chunk, update the digest */
ShaSum.prototype._transform = function (chunk, enc, cb) {
  // if is Buffer use it, otherwise coerce
  var buffer = (Buffer.isBuffer(chunk)) ?
    chunk :
    new Buffer(chunk, enc);
  this.digester.update(buffer); // update hash

  // we are not writing anything out at this
  // time, only at end during _flush
  // so we don't need to call push
  cb();
};

/* at the end, output the hex digest */
ShaSum.prototype._flush = function (cb) {
  this.push(this.digester.digest('hex'));
  cb();
};


// try it out
var shasum = new ShaSum();
shasum.pipe(process.stdout); // output to stdout
shasum.write('hello world\n'); // input line 1
shasum.write('another line');  // input line 2
shasum.end();  // finish

Creating object stream which filters out data

Transform streams can be used to manipulate a stream of objects in a very similar fashion.

This example filters sensitive properties from a stream of objects:

var stream = require('stream');
var util = require('util');

// node v0.10+ use native Transform, else polyfill
var Transform = stream.Transform ||
  require('readable-stream').Transform;

/*
 * Filters an object stream properties
 *
 * @param filterProps array of props to filter
 */
function Filter(filterProps, options) {
  // allow use without new
  if (!(this instanceof Filter)) {
    return new Filter(filterProps, options);
  }

  // init Transform
  if (!options) options = {}; // ensure object
  options.objectMode = true; // forcing object mode
  Transform.call(this, options);
  this.filterProps = filterProps;
}
util.inherits(Filter, Transform);

/* filter each object's sensitive properties */
Filter.prototype._transform = function (obj, enc, cb) {
  var self = this;
  // determine what keys to keep
  var filteredKeys = Object.keys(obj).filter(
    function (key) {
      // only those keys not in this list
      return (self.filterProps.indexOf(key) === -1);
    }
  );

  // create clone with only these keys
  var filteredObj = filteredKeys.reduce(
    function (accum, key) {
      accum[key] = obj[key];
      return accum;
    },
    {}
  );

  // push the filtered obj out
  this.push(filteredObj);
  cb();
};


// try it out, output to stdout
// filter phone and email from objects
var filter = new Filter([ 'phone', 'email' ]);
filter
  .on('readable', function () {
    var obj;
    while (null !== (obj = filter.read())) {
      console.log(obj);
    }
  });

// now send some objects to filter through
filter.write({ name: 'Foo', phone: '555-1212',
               email: 'foo@foo.com', id: 123 });
filter.write({ name: 'Bar', phone: '555-1313',
               email: 'bar@bar.com', id: 456 });
filter.end();  // finish

Summary

Transform streams are another powerful feature of Node.js. I hope you can begin to see the power of this feature from the simple examples I have provided. By building up functionality as small modules that can be piped together you can build complex systems while keeping each module focused and lean.

For additional reading