C#, BlockingCollection: How to wait until collection has less than N items
everyone. I'm using BlockingCollection in the traditional producer-consumer scenario. To process items in the collection one by one, I have to write this code:
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
ProcessItem(item);
}
But how to process a batch of N items (wait until collection has less than N items)? My solution is using some temporary buffer:
var buffer = new List<MyType>(N);
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
buffer.Add(item);
if (buffer.Count == N)
{
foreach (var item in items)
{
ProcessItem(item);
}
buffer.Clear();
}
}
But it seems to me very ugly... Is there any better approach?
[UPDATE]: Here's extension method's prototype, which makes the solution more readable. Maybe, someone will find it useful:
public static class BlockingCollectionExtensions
{
public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection,
CancellationToken cancellationToken, Int32 bufferSize)
{
var buffer = new List<T>(bufferSize);
while (buffer.Count < bufferSize)
{
try
{
buffer.Add(collection.Take(cancellationToken));
}
catch (OperationCanceledException)
{
// we need to handle the rest of buffer,
// even if the task has been cancelled.
开发者_开发问答 break;
}
}
return buffer;
}
}
And usage:
foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token, 5))
{
// TODO: process items here...
}
Of course, this is not a complete solution: for example, I would add any timeout support - if there's not enough items, but time is elapsed, we need to stop waiting and process items already added to the buffer.
I don't find that solution all that ugly. The batch processing is an orthogonal requirement to what the blocking collection does and should be treated as such. I would encapsulate the batch processing behaviour in a BatchProcessor
class with a clean interface but other than that I don't really see a problem with that approach.
You may find the lock-free implementation of a queue together with a blocking collection to be a premature optimization. You might be able to write cleaner code if you take a step back and use Queue with Monitor-based locks.
First of all I'm not sure if your logic is correct. You say you want to wait until collection has less than N items - isn't it the other way around? You want the collection to have N or more items, in order to process N items. Or perhaps I'm misunderstanding.
Then I also suggest you process items one by one if there are less than N items, or you may find that your application seems to hang at N-1 items. Of course if this is a steady stream of data, processing only when buffer.Count >= N could be good enough.
I'd suggest going for a queue and Monitor like GregC says.
Something like this:
public object Dequeue() {
while (_queue.Count < N) {
Monitor.Wait(_queue);
}
return _queue.Dequeue();
}
public void Enqueue( object q )
{
lock (_queue)
{
_queue.Enqueue(q);
if (_queue.Count == N)
{
// wake up any blocked dequeue call(s)
Monitor.PulseAll(_queue);
}
}
}
精彩评论