开发者

Idiomatic way to wait for multiple callbacks in Node.js

Suppose you need to do some operations that depend on some temp file. Since we're talking about Node here, those operations are obviously asynchronous. What is the idiomatic way to wait for all operations to finish in order to know when the temp file can be deleted?

Here is some code showing what I want to do:

do_something(tmp_file_name, function(err) {});
do_something_other(tmp_file_name, function(err) {});
fs.unlink(tmp_file_name);

But if I write it this way, the third call can be executed before the first two get a chance to use the file. I need some way to guarantee that the first two calls already finished (invoked their callbacks) before moving on without nesting the calls (and making them synchronous in practice).

I thought about using event emitters on the callbacks and registering a counter as receiver. The counter would receive the finished events and count how many operations were still pending. When the last one finished, it would delete the file. But there is the risk of a开发者_JAVA百科 race condition and I'm not sure this is usually how this stuff is done.

How do Node people solve this kind of problem?


Update:

Now I would advise to have a look at:

  • Promises

    The Promise object is used for deferred and asynchronous computations. A Promise represents an operation that hasn't completed yet, but is expected in the future.

    A popular promises library is bluebird. A would advise to have a look at why promises.

    You should use promises to turn this:

    fs.readFile("file.json", function (err, val) {
        if (err) {
            console.error("unable to read file");
        }
        else {
            try {
                val = JSON.parse(val);
                console.log(val.success);
            }
            catch (e) {
                console.error("invalid json in file");
            }
        }
    });
    

    Into this:

    fs.readFileAsync("file.json").then(JSON.parse).then(function (val) {
        console.log(val.success);
    })
    .catch(SyntaxError, function (e) {
        console.error("invalid json in file");
    })
    .catch(function (e) {
        console.error("unable to read file");
    });
    
  • generators: For example via co.

    Generator based control flow goodness for nodejs and the browser, using promises, letting you write non-blocking code in a nice-ish way.

    var co = require('co');
    
    co(function *(){
      // yield any promise
      var result = yield Promise.resolve(true);
    }).catch(onerror);
    
    co(function *(){
      // resolve multiple promises in parallel
      var a = Promise.resolve(1);
      var b = Promise.resolve(2);
      var c = Promise.resolve(3);
      var res = yield [a, b, c];
      console.log(res);
      // => [1, 2, 3]
    }).catch(onerror);
    
    // errors can be try/catched
    co(function *(){
      try {
        yield Promise.reject(new Error('boom'));
      } catch (err) {
        console.error(err.message); // "boom"
     }
    }).catch(onerror);
    
    function onerror(err) {
      // log any uncaught errors
      // co will not throw any errors you do not handle!!!
      // HANDLE ALL YOUR ERRORS!!!
      console.error(err.stack);
    }
    

If I understand correctly I think you should have a look at the very good async library. You should especially have a look at the series. Just a copy from the snippets from github page:

async.series([
    function(callback){
        // do some stuff ...
        callback(null, 'one');
    },
    function(callback){
        // do some more stuff ...
        callback(null, 'two');
    },
],
// optional callback
function(err, results){
    // results is now equal to ['one', 'two']
});


// an example using an object instead of an array
async.series({
    one: function(callback){
        setTimeout(function(){
            callback(null, 1);
        }, 200);
    },
    two: function(callback){
        setTimeout(function(){
            callback(null, 2);
        }, 100);
    },
},
function(err, results) {
    // results is now equals to: {one: 1, two: 2}
});

As a plus this library can also run in the browser.


The simplest way increment an integer counter when you start an async operation and then, in the callback, decrement the counter. Depending on the complexity, the callback could check the counter for zero and then delete the file.

A little more complex would be to maintain a list of objects, and each object would have any attributes that you need to identify the operation (it could even be the function call) as well as a status code. The callbacks would set the status code to completed.

Then you would have a loop that waits (using process.nextTick) and checks to see if all tasks are completed. The advantage of this method over the counter, is that if it is possible for all outstanding tasks to complete, before all tasks are issued, the counter technique would cause you to delete the file prematurely.


// simple countdown latch
function CDL(countdown, completion) {
    this.signal = function() { 
        if(--countdown < 1) completion(); 
    };
}

// usage
var latch = new CDL(10, function() {
    console.log("latch.signal() was called 10 times.");
});


There is no "native" solution, but there are a million flow control libraries for node. You might like Step:

Step(
  function(){
      do_something(tmp_file_name, this.parallel());
      do_something_else(tmp_file_name, this.parallel());
  },
  function(err) {
    if (err) throw err;
    fs.unlink(tmp_file_name);
  }
)

Or, as Michael suggested, counters could be a simpler solution. Take a look at this semaphore mock-up. You'd use it like this:

do_something1(file, queue('myqueue'));
do_something2(file, queue('myqueue'));

queue.done('myqueue', function(){
  fs.unlink(file);
});


I'd like to offer another solution that utilizes the speed and efficiency of the programming paradigm at the very core of Node: events.

Everything you can do with Promises or modules designed to manage flow-control, like async, can be accomplished using events and a simple state-machine, which I believe offers a methodology that is, perhaps, easier to understand than other options.

For example assume you wish to sum the length of multiple files in parallel:

const EventEmitter = require('events').EventEmitter;

// simple event-driven state machine
const sm = new EventEmitter();

// running state
let context={
  tasks:    0,    // number of total tasks
  active:   0,    // number of active tasks
  results:  []    // task results
};

const next = (result) => { // must be called when each task chain completes

  if(result) { // preserve result of task chain
    context.results.push(result);
  }

  // decrement the number of running tasks
  context.active -= 1; 

  // when all tasks complete, trigger done state
  if(!context.active) { 
    sm.emit('done');
  }
};

// operational states
// start state - initializes context
sm.on('start', (paths) => {
  const len=paths.length;

  console.log(`start: beginning processing of ${len} paths`);

  context.tasks = len;              // total number of tasks
  context.active = len;             // number of active tasks

  sm.emit('forEachPath', paths);    // go to next state
});

// start processing of each path
sm.on('forEachPath', (paths)=>{

  console.log(`forEachPath: starting ${paths.length} process chains`);

  paths.forEach((path) => sm.emit('readPath', path));
});

// read contents from path
sm.on('readPath', (path) => {

  console.log(`  readPath: ${path}`);

  fs.readFile(path,(err,buf) => {
    if(err) {
      sm.emit('error',err);
      return;
    }
    sm.emit('processContent', buf.toString(), path);
  });

});

// compute length of path contents
sm.on('processContent', (str, path) => {

  console.log(`  processContent: ${path}`);

  next(str.length);
});

// when processing is complete
sm.on('done', () => { 
  const total = context.results.reduce((sum,n) => sum + n);
  console.log(`The total of ${context.tasks} files is ${total}`);
});

// error state
sm.on('error', (err) => { throw err; });

// ======================================================
// start processing - ok, let's go
// ======================================================
sm.emit('start', ['file1','file2','file3','file4']);

Which will output:

start: beginning processing of 4 paths
forEachPath: starting 4 process chains
  readPath: file1
  readPath: file2
  processContent: file1
  readPath: file3
  processContent: file2
  processContent: file3
  readPath: file4
  processContent: file4
The total of 4 files is 4021

Note that the ordering of the process chain tasks is dependent upon system load.

You can envision the program flow as:

start -> forEachPath -+-> readPath1 -> processContent1 -+-> done
                      +-> readFile2 -> processContent2 -+
                      +-> readFile3 -> processContent3 -+
                      +-> readFile4 -> processContent4 -+

For reuse, it would be trivial to create a module to support the various flow-control patterns, i.e. series, parallel, batch, while, until, etc.


The simplest solution is to run the do_something* and unlink in sequence as follows:

do_something(tmp_file_name, function(err) {
    do_something_other(tmp_file_name, function(err) {
        fs.unlink(tmp_file_name);
    });
});

Unless, for performance reasons, you want to execute do_something() and do_something_other() in parallel, I suggest to keep it simple and go this way.


Wait.for https://github.com/luciotato/waitfor

using Wait.for:

var wait=require('wait.for');

...in a fiber...

wait.for(do_something,tmp_file_name);
wait.for(do_something_other,tmp_file_name);
fs.unlink(tmp_file_name);


With pure Promises it could be a bit more messy, but if you use Deferred Promises then it's not so bad:

Install:

npm install --save @bitbar/deferred-promise

Modify your code:

const DeferredPromise = require('@bitbar/deferred-promise');

const promises = [
  new DeferredPromise(),
  new DeferredPromise()
];

do_something(tmp_file_name, (err) => {
  if (err) {
    promises[0].reject(err);
  } else {
    promises[0].resolve();
  }
});

do_something_other(tmp_file_name, (err) => {
  if (err) {
    promises[1].reject(err);
  } else {
    promises[1].resolve();
  }
});

Promise.all(promises).then( () => {
  fs.unlink(tmp_file_name);
});

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜