开发者

Pattern for executing concurrent tasks just once

I am working on a java server which dispatches xmpp messages and workers execute the tasks from my clients.

private static ExecutorService threadpool = Executors.newCachedThreadPool();

Dispa开发者_开发问答tchWorker worker = new DispatchWorker(connection, packet);
threadpool.execute(worker);

Works fine, but i need a bit more than that.

  1. I don't want to execute the same request multiple times.
  2. My worker may start another thread with a backround task also only allowed to run once at a time. A Threadpool in the worker threads.

I can identify the requests by a string and i can also give the backround tasks an id to identify them.

My solution would be a synchronized hashmap where my running tasks are registered with their id. The reference of the map will be passed to the worker threads that they remove their entry when they finished.

Feels a bit clumsy this solution so i wanted to know if there are more elegant patterns/best practices.

best regards, m


This is exactly what Quartz does (although it does a lot more, like scheduling jobs in the future).


You can use a Singleton thread pool or pass the thread pool as an argument. (I would have the pool final)

You can use a HashSet to guard adding duplicate tasks.


I believe using Map is okay for this. But instead of synchronized HashMap you can also use ConcurrenHashMap which allows you to specify concurrency levels, i.e. how many thread can work with map at the same time. And also it has atomic putIfAbsent operation.


I would use queues and daemon worker threads that are always running and wait for something to arrive in the queue. This way it is guaranteed, that only one worker is working on a request. If you only want one thread to run, turn POOLSIZE down to 1, or use newSingleThreadExecutor.

I do not quite understand your second requirement: do you mean only 1 thread is allowed to run as background task? If so, you could create another SingleThreadExecutor and use that for the background task. Then it would not make too much sense to have POOLSIZE>1, unless the work done in the background thread is very short compared to that done in the worker itself.

private static interface Request {};
private final int POOLSIZE = 10;
private final int QUEUESIZE = 1000;
BlockingQueue<Request> e = new LinkedBlockingQueue<Request>(QUEUESIZE);

public void startWorkers() {
    ExecutorService threadPool = Executors.newFixedThreadPool(POOLSIZE);
    for(int i=0; i<POOLSIZE; i++) {
        threadPool.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    final Request request = e.take();
                    doStuffWithRequest(request);
                } catch (InterruptedException e) {
                    // LOG
                    // Shutdown worker thread.
                }
            }
        });
    }
}
public void handleRequest(Request request) {
    if(!e.offer(request)) {
        //Cancel request, queue is full;
    }
}

At startup-time, startworkers starts the workers (surprise!). handleRequest handles requests coming from a webservice, servlet or whatever.

Of course you need to adapt "Request" and "doStuffWithRequest" to your need, and add some additional logic for shutdown etc.


We originally wrote our own utilities to handle this, but if you want the results memoised, then Guava's ComputingMap encapsulates the initialisation by one and only one thread (with other threads blocking and waiting for the result), and the memoisation.

It also supports various expiration strategies.

Usage is simple, you construct it with an initialisation function:

Map<Long, Foo> cache = new MapMaker().makeComputingMap(new Function<Long, Foo>() {
  public Foo apply(String key) {
    return … // init with expensive calculation
  }
});

and then just call it:

Foo foo = cache.get("key");

The first thread to ask for "key" will be the one who performs the initialisation

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜