producer/consumer work queues
I'm wrestling with the best way to implement my processing pipeline.
My producers feed work to a BlockingQueue. On the consumer side, I poll the queue, wrap what I get in a Runnable task, and submit it to an ExecutorService.
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
This is not ideal; the ExecutorService has its own queue, of course, so what's really happening is that开发者_JS百科 I'm always fully draining my work queue and filling the task queue, which slowly empties as the tasks complete.
I realize that I could queue tasks at the producer end, but I'd really rather not do that - I like the indirection/isolation of my work queue being dumb strings; it really isn't any business of the producer what's going to happen to them. Forcing the producer to queue a Runnable or Callable breaks an abstraction, IMHO.
But I do want the shared work queue to represent the current processing state. I want to be able to block the producers if the consumers aren't keeping up.
I'd love to use Executors, but I feel like I'm fighting their design. Can I partially drink the Kool-ade, or do I have to gulp it? Am I being wrong-headed in resisting queueing tasks? (I suspect I could set up ThreadPoolExecutor to use a 1-task queue and override it's execute method to block rather than reject-on-queue-full, but that feels gross.)
Suggestions?
I want the shared work queue to represent the current processing state.
Try using a shared BlockingQueue and have a pool of Worker threads taking work items off of the Queue.
I want to be able to block the producers if the consumers aren't keeping up.
Both ArrayBlockingQueue and LinkedBlockingQueue support bounded queues such that they will block on put when full. Using the blocking put() methods ensures that producers are blocked if the queue is full.
Here is a rough start. You can tune the number of workers and queue size:
public class WorkerTest<T> {
private final BlockingQueue<T> workQueue;
private final ExecutorService service;
public WorkerTest(int numWorkers, int workQueueSize) {
workQueue = new LinkedBlockingQueue<T>(workQueueSize);
service = Executors.newFixedThreadPool(numWorkers);
for (int i=0; i < numWorkers; i++) {
service.submit(new Worker<T>(workQueue));
}
}
public void produce(T item) {
try {
workQueue.put(item);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private static class Worker<T> implements Runnable {
private final BlockingQueue<T> workQueue;
public Worker(BlockingQueue<T> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
T item = workQueue.take();
// Process item
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
"find an available existing worker thread if one exists, create one if necessary, kill them if they go idle."
Managing all those worker states is as unnecessary as it is perilous. I would create one monitor thread that constantly runs in the background, who's only task is to fill up the queue and spawn consumers... why not make the worker threads daemons so they die as soon as they complete? If you attach them all to one ThreadGroup you can dynamically re-size the pool... for example:
**for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ {
spawnDaemonWorkers(queue.poll());
}**
You could have your consumer execute Runnable::run
directly instead of starting a new thread up. Combine this with a blocking queue with a maximum size and I think that you will get what you want. Your consumer becomes a worker that is executing tasks inline based on the work items on the queue. They will only dequeue items as fast as they process them so your producer when your consumers stop consuming.
精彩评论