开发者

Filtering Stream object in node.js

It seems to me that an elegant way to process certain kinds of data in Node.js would be to chain processing objects, like UNIX pipes.

For example, grep:

function Grep(pattern) {
    ...
}
util.inherits(Grep, stream.Stream);

Grep.prototype.???? = ???????  // What goes here?

grep = new Grep(/foo/);

process.stdin.pipe(grep);
myStream.pipe(process.stdout);

However it's not at all clear to me how the various Stream methods need to be overridden in order for this to work.

How can I create a Stream object that simply copies from its input to its output? Presumably with that answered, more sophisticated filtering streams become trivial.

Update: it feels as if the following should work (expressed in CoffeeScript, so I don't fill this box with JS syntax!):

class Forwarder extends stream.Stream
    write: (chunk, encoding) ->
        @emit 'data', chunk
    end: 开发者_StackOverflow(chunk, encoding) =>
        if chunk?
            @emit 'data', chunk
        @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

However catting something to this script doesn't output anything. Calling fwd.write() explicitly in the script does cause output on stdout.


You are so very close.

Because you are using the very low-level stream class, you need to set the stream writable property to make it a writable stream. If you were reading from the stream, you'd need to set the readable property. Also the end event doesn't have any arguments.

class Forwarder extends stream.Stream
  constructor: ->
    @writable = true
  write: (chunk, encoding) ->
    @emit 'data', chunk
  end: ->
    @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

Update

The answer above applied to V1 streams in Node <= 0.8. If you are using > 0.8, Node has added more specific classes that are designed to be extended, so you would use something more like this:

class Forwarder extends stream.Transform
    _transform: (chunk, encoding, callback) ->
      this.push(chunk);
      callback();

Processing chunk and pushing the pieces you actually want.


Although the answer that exists is nice, it still requires a bit of digging around on behalf of those looking for answers.

The following code completes the example the OP gave, using the Node 0.10 stream API.

var stream = require('stream')
var util = require('util')

function Grep(pattern) {
  stream.Transform.call(this)

  this.pattern = pattern
}

util.inherits(Grep, stream.Transform)

Grep.prototype._transform = function(chunk, encoding, callback) {
  var string = chunk.toString()

  if (string.match(this.pattern)) {
    this.push(chunk)
  }

  callback()
}

var grep = new Grep(/foo/)
process.stdin.pipe(grep)
grep.pipe(process.stdout)
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜