Accessing the youngest element of a ConcurrentQueue in presence of multiple threads operating on the queue
We have a ConcurrentQueue which is used to share data among 3 threads. Thread A continuously fills the queue with data. Thread B is designed to record this data to a file. Thread C is supposed to retrieve the youngest entry in the queue (or as close to youngest as possible), perform some operations on it and display results on the screen.
Thread B, in order to cluster the file write operations in time, does something like this:
if (cq.Count > 100)
{
while (cq.Count > 1)
{
qElement = PopFromCq(cq); // PopFromCq uses cq.TryDequeue()
bw.Write(qElement.data); // bw is a binary writer
}
}
else
{
System.Threading.Thread.Sleep(10);
}
ie, it waits for at least 100 elements to queue up, then writes them to the disk. It always maintains at least one item i开发者_StackOverflown the queue though and the reason is because we want Thread C to always have access to at least one item.
The loop in thread C looks like:
while (threadsRunning)
{
System.Threading.Thread.Sleep(500); // Update twice per second
ProcessDataAndUpdateScreen(cq.ElementAt(cq.Count - 1)); // our terrible attempt at looking at the latest (or close to latest) entry in the queue
}
In this loop, we sometimes get an exception due to the race between the thread that writes the data to disk, and the cq.ElementAt(cq.Count-1) call. I believe what is happening is as follows:
- cq.Count is calculated to be, say 90.
- By that time, Thread B already started its loop and it is dequeueing data from the queue to write to the disk
- By the time cq.ElementAt() is called, Thread B consumed a number of items such that (cq.Count - 1) no longer points to a valid entry in the queue.
Any ideas on what would be a nice way of accessing the youngest entry in the queue in presence of multiple threads operating on the queue?
Regards,
Is it necessary for the A-B communication and A-C communication to both go through the queue? What if you have thread A write each entry to the queue (for B to read and log) and also save the entry it's just queued in a volatile property somewhere. Every time C wants to get the youngest element, it can just read directly from that property.
EDIT: Instead of just relying on a volatile property, you should actually use Interlocked.CompareExchange<T>(T, T)
to set and read the "youngest entry" property.
精彩评论