Transforming data with Node.js transform streams
This tutorial will show how to use and create custom transform streams for the manipulation of textual, binary, or object stream data.
Version Information
- Author: Jeff Barczewski
- Published: August 20th, 2013
- Tags: nodejs, streams
- Level: Intermediate
- Prerequisites: buffer, crypto, events, install npm modules, zlib
- Node.js v0.10+ (latest stable is v0.10.17 as of this writing), but streams have generally been a part of Node.js from its early days
- Streams2 Transform abstract class can be used with older versions (prior to v0.10) of node by using npm module readable-stream (tested with v1.0.15)
- Updated: September 1st, 2015
What are transform streams?
Node.js transform streams are streams which read input, process the data manipulating it, and then outputing new data.
You can chain streams together to create complex processes by piping from one to the next.
Using built-in Node.js transform streams
Node.js comes with a variety of transform streams in the Core API:
- zlib - for gzip compressing and uncompressing
- crypto - for encrypting, decrypting, and calculating message digests
Compressing stream with gzip
To gzip a stream simply create a gzip transform stream with zlib and pipe a stream through it. You can customize the compression level and buffers by passing an options object to the zlib.createGzip(options)
factory. See gzip options for details.
This example reads myfile.txt
, compresses the streaming data on the fly, and writes it to myfile.txt.gz
var fs = require('fs');
var zlib = require('zlib');
var gzip = zlib.createGzip();
var rstream = fs.createReadStream('myfile.txt');
var wstream = fs.createWriteStream('myfile.txt.gz');
rstream // reads from myfile.txt
.pipe(gzip) // compresses
.pipe(wstream) // writes to myfile.txt.gz
.on('finish', function () { // finished
console.log('done compressing');
});
Uncompressing stream with gzip
This example reads the compressed file myfile.txt.gz
from the previous example, decompresses, and outputs it to standard output.
var fs = require('fs');
var zlib = require('zlib');
var gunzip = zlib.createGunzip();
var rstream = fs.createReadStream('myfile.txt.gz');
rstream // reads from myfile.txt.gz
.pipe(gunzip) // uncompresses
.pipe(process.stdout); // writes to stdout
Encrypting data stream with aes-256
Encrypting is very similar since we can use the crypto
module which is built-in to Node.js. It provides all of the ciphers installed via openssl
for which you can get a listing by running openssl list-cipher-algorithms
from the bash command line.
Some of the more notable options available in my version of openssl are:
- aes-256-cbc - AES 256 bit - cipher block chaining
- bf-ofb - Blowfish - output feedback mode
- cast5-ofb - Cast 5 - output feedback mode
Here is a simple example of creating an aes256 encrypting transform stream. Note that for this example I am not securely storing the password but in your real code you would want to keep that in a secure store.
var crypto = require('crypto');
var fs = require('fs');
// get your password from safe store
var password = new Buffer('my secret');
var aes = crypto.createCipher('aes-256-cbc', password);
var rstream = fs.createReadStream('myfile.txt');
var wstream = fs.createWriteStream('myfile.encrypted');
rstream // reads from myfile.txt
.pipe(aes) // encrypts with aes256
.pipe(wstream) // writes to myfile.encrypted
.on('finish', function () { // finished
console.log('done encrypting');
});
Decrypting data stream with aes-256
Here is the code to decrypt the file and output on stdout:
var crypto = require('crypto');
var fs = require('fs');
// get your password from safe store
var password = new Buffer('my secret');
var aes = crypto.createDecipher('aes-256-cbc',
password);
var rstream = fs.createReadStream('myfile.encrypted');
rstream // reads from myfile.txt
.pipe(aes) // decrypt with aes256
.pipe(process.stdout); // output stdout
Note the above example works on Node.js v0.10+, but it appears to encounter a bug when running on v0.8.25 which I haven’t looked into yet.
Creating custom transform streams
Often you will want to do your own transformations on a stream, so Node.js makes it easy to create custom transform streams with the v0.10 Transform abstract class. By using a polyfill npm module readable-stream we can make the code work with earlier versions of Node.js too, which I demonstrate below.
To implement, subclass Transform and implement prototype methods:
_transform(chunk, enc, cb)
- reading chunks and pushing your transformed data_flush(cb)
- if you need to write additional data at the end after input is finished
Creating transform stream which uppercases all text
var stream = require('stream');
var util = require('util');
// node v0.10+ use native Transform, else polyfill
var Transform = stream.Transform ||
require('readable-stream').Transform;
function Upper(options) {
// allow use without new
if (!(this instanceof Upper)) {
return new Upper(options);
}
// init Transform
Transform.call(this, options);
}
util.inherits(Upper, Transform);
Upper.prototype._transform = function (chunk, enc, cb) {
var upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
cb();
};
// try it out
var upper = new Upper();
upper.pipe(process.stdout); // output to stdout
upper.write('hello world\n'); // input line 1
upper.write('another line'); // input line 2
upper.end(); // finish
Creating a transform stream which calculates sha1 digest
var crypto = require('crypto');
var stream = require('stream');
var util = require('util');
// node v0.10+ use native Transform, else polyfill
var Transform = stream.Transform ||
require('readable-stream').Transform;
function ShaSum(options) {
// allow use without new
if (!(this instanceof ShaSum)) {
return new ShaSum(options);
}
// init Transform
Transform.call(this, options);
this.digester = crypto.createHash('sha1');
}
util.inherits(ShaSum, Transform);
/* during each chunk, update the digest */
ShaSum.prototype._transform = function (chunk, enc, cb) {
// if is Buffer use it, otherwise coerce
var buffer = (Buffer.isBuffer(chunk)) ?
chunk :
new Buffer(chunk, enc);
this.digester.update(buffer); // update hash
// we are not writing anything out at this
// time, only at end during _flush
// so we don't need to call push
cb();
};
/* at the end, output the hex digest */
ShaSum.prototype._flush = function (cb) {
this.push(this.digester.digest('hex'));
cb();
};
// try it out
var shasum = new ShaSum();
shasum.pipe(process.stdout); // output to stdout
shasum.write('hello world\n'); // input line 1
shasum.write('another line'); // input line 2
shasum.end(); // finish
Creating object stream which filters out data
Transform streams can be used to manipulate a stream of objects in a very similar fashion.
This example filters sensitive properties from a stream of objects:
var stream = require('stream');
var util = require('util');
// node v0.10+ use native Transform, else polyfill
var Transform = stream.Transform ||
require('readable-stream').Transform;
/*
* Filters an object stream properties
*
* @param filterProps array of props to filter
*/
function Filter(filterProps, options) {
// allow use without new
if (!(this instanceof Filter)) {
return new Filter(filterProps, options);
}
// init Transform
if (!options) options = {}; // ensure object
options.objectMode = true; // forcing object mode
Transform.call(this, options);
this.filterProps = filterProps;
}
util.inherits(Filter, Transform);
/* filter each object's sensitive properties */
Filter.prototype._transform = function (obj, enc, cb) {
var self = this;
// determine what keys to keep
var filteredKeys = Object.keys(obj).filter(
function (key) {
// only those keys not in this list
return (self.filterProps.indexOf(key) === -1);
}
);
// create clone with only these keys
var filteredObj = filteredKeys.reduce(
function (accum, key) {
accum[key] = obj[key];
return accum;
},
{}
);
// push the filtered obj out
this.push(filteredObj);
cb();
};
// try it out, output to stdout
// filter phone and email from objects
var filter = new Filter([ 'phone', 'email' ]);
filter
.on('readable', function () {
var obj;
while (null !== (obj = filter.read())) {
console.log(obj);
}
});
// now send some objects to filter through
filter.write({ name: 'Foo', phone: '555-1212',
email: 'foo@foo.com', id: 123 });
filter.write({ name: 'Bar', phone: '555-1313',
email: 'bar@bar.com', id: 456 });
filter.end(); // finish
Summary
Transform streams are another powerful feature of Node.js. I hope you can begin to see the power of this feature from the simple examples I have provided. By building up functionality as small modules that can be piped together you can build complex systems while keeping each module focused and lean.
For additional reading
- Crypto API
- File System API
- gzip options
- readable-stream - polyfill for streams2 on node.js prior to v0.10
- Stream API
- Transform
- Util API
- Zlib API