Removing a specific item from a BlockingCollection<>
Is there a way to remove a specific item from a BlockingCollection, like this:
IMyItem mySpecificItem = controller.getTopRequestedItem();
bool took = myBlockingCollection<IMyItem>.TryTake(out mySpecificItem);
if(took)
开发者_如何转开发process(mySpecificItem);
.....
in other words: I want to remove an item from a BlockingCollection<>, which is identified by some field (e.g. ID), not just the next item availible.
Do I need to implement getHashCode() or an IComparer ?
BlockingCollection<>
won't help you here. I think you need ConcurrentDictionary<>
.
I think the answer to the straight question here is "no, there isn't". But I had the same underlying scenario problem. Below is how I solved it.
TL;DR I used an array of BlockingCollections, with each array elements being for a different priority level. That seems like the simplest solution. It fits perfectly with the BlockingCollection methods available.
I had the same challenge: a queue which occasionally gets higher priority elements added. When there are higher priority elements, they need to be handled before all the normal priority elements. Both higher priority elements, and normal priority elements need to respectively be handled in FIFO order within their priority buckets. For flexibility in how the processing is used, the priority levels should ideally not be a finite small set, but as you'll see below I ended up compromising on that need.
The code I ended up with: In class declaration. This assumes we need 4 priority levels:
BlockingCollection<ImageToLoad>[] imagesToLoad = new BlockingCollection<ImageToLoad>[4];
In class constructor (to create the 4 instances of BlockingCollection with fancy LINQ syntax - it's frankly easier to read when just written out as a 0->3 for loop :-) ):
imagesToLoad = Enumerable.Repeat(0, 4).Select(bc => new BlockingCollection<ImageToLoad>()).ToArray();
When an element gets added to the work queue:
imagesToLoad[priority].Add(newImageToLoad);
The worker task, picking tasks with high prio (4 is higher prio than 3, etc.), and waking up from its no-work-queued block when work arrives in any of the 4 prio queues:
while (true)
{
ImageToLoad nextImageToLoad = null;
for (int i = 3; i >= 0; i--)
if (imagesToLoad[i].TryTake(out nextImageToLoad))
break;
if (nextImageToLoad == null)
BlockingCollection<ImageToLoad>.TakeFromAny(imagesToLoad, out nextImageToLoad);
if (nextImageToLoad != null)
await LoadOneImage(nextImageToLoad);
}
BlockingCollection backed by a FIFO ConcurrentQueue is a great base for this. It lets you do a FirstOrDefault(Func predicate) to find items with higher priority. But, as mentioned in this question, it doesn't have a Remove(Func predicate) or a Take(Func predicate) method to get the higher priority items out of the queue when they have been processed out-of-order.
I considered 4 solutions, in order of decreasing work and complexity: A. Writing my own version of BlockingCollection. This is the most elegant solution. I don't think it would be too bad in complexity when you only need Add, Take and Take(Func predicate), but it'll take time and there'll be at least some concurrency or lock bugs which will be hard to find. I would use a SemaphoreSlim to handle the blocking while waiting for new elements to arrive. I would probably use a OrderedDictionary to keep the elements in. B. Give up on BlockingCollection. Use a OrderedDictionary<> directly (similar to what David suggests above with a ConcurrentDictionary). The logic would be very similar to option A, just not encapsulated in a separate class. C. This is something of a hack: add a property to the elements which are getting queued, to indicate if the element has already been processed. When you process an element out-of-order due to it having higher priority, set this property to "already done with this one". When you later Take this element because it happens to become the one at the front of the queue, just discard it (instead of processing it, like you would do with elements you Take at the front of the queue, which haven't already been processed). This gets complex though, because: (1) you end up having to treat these elements which don't really belong in the BlockingCollection specially in many contexts. (2) confusing from a concurrency perspective, to rely partially on the locks inside BlockingCollection and BlockingCollection.Take(), and partially on the locks I need to add to do the logic around finding high-prio items and updating them. It'll be too easy to end up with deadlocks due to conflicts between the two separate locks. D. Instead of letting the number of priority levels be open-ended, lock it to just 2, or 3, or whatever I think I might need, and create that many separate BlockingCollections. You can then do the blocking with the TakeFromAny call. When not blocking (because there are elements already available when you're ready to process another one), you can inspect each BlockingCollection and Take from the highest priority one which isn't empty.
I spent a fair bit of time trying to make option (C) work before realizing all the complexity it was creating.
I ended up going with option (D).
It would be straight forward to make the number of priority levels dynamic so it's set when the class is created. But making it completely run-time dynamic with priorities as any int is not practical with this approach, since there would be so many unused BlockingCollection objects created.
精彩评论