开发者

What type of IProducerConsumerCollection<T> to use for my task?

I have exactly 100 Sensors each "measuring" own data. I have exactly one DataSender which should send information from "sensors". The most recent information should be sent.

Bandwidth of the channel may be less than data produced by 100 sensors. In this case some data can be skipped - but we should be "roughly fair". For example, we could skip every second measurement from each sensor.

I don't know how often each sensor generates data, but in general they generate data pretty often.

After my other posts:

  • how to create singleton which always running in separate thread?
  • Modified Producer/Consumer example, any problems with it?

I have decided that I have classical Producer/Consumer problem, with:

  • 100 Producers, and
  • 1 Consumer

I've been suggested to use BlockingCollection for this. The only problem with BlockingCollection - once you have added item, you cannot replace it. But in my application, if sensor produces a new value, and previous value was not processed by the Consumer, the value should be replaced.

Should I use use a ConcurentDictionary or ConcurentBag for that task?

Conceptually, all I need is an array of 100 elements.

Sensor #33 should replace it's value into array[33]:

| Sensor | Value |
|--------|-------|
|      1 |       |
|      2 |       |
|      3 |       |
/......../......./
|     32 |       |
|     33 | 101.9 |
|     34 |       |
/......../......./
|     98 |       |
|     99 |       |
|    100 |       |

Consumer should take value from array[33] and if not null, then send it and set array[33] t开发者_如何学JAVAo null. Consumer should react on any not null values in array asap.


I think you should implement your own IProducerConsumerCollection<T>. That's why it's an interface: so that you could easily make your own.

You could do it using Dictionary<K,V> and Queue<T> to make sure receiving the data is fair, i.e. if you have just one device that produces data very fast, you won't send data just from this one.

public class DeviceDataQueue<TDevice, TData>
    : IProducerConsumerCollection<Tuple<TDevice, TData>>
{
    private readonly object m_lockObject = new object();
    private readonly Dictionary<TDevice, TData> m_data
        = new Dictionary<TDevice, TData>();
    private readonly Queue<TDevice> m_queue = new Queue<TDevice>();

    //some obviously implemented methods elided, just make sure they are thread-safe

    public int Count { get { return m_queue.Count; } }

    public object SyncRoot { get { return m_lockObject; } }

    public bool IsSynchronized { get { return true; } }

    public bool TryAdd(Tuple<TDevice, TData> item)
    {
        var device = item.Item1;
        var data = item.Item2;

        lock (m_lockObject)
        {
            if (!m_data.ContainsKey(device))
                m_queue.Enqueue(device);

            m_data[device] = data;
        }

        return true;
    }

    public bool TryTake(out Tuple<TDevice, TData> item)
    {
        lock (m_lockObject)
        {
            if (m_queue.Count == 0)
            {
                item = null;
                return false;
            }

            var device = m_queue.Dequeue();
            var data = m_data[device];
            m_data.Remove(device);
            item = Tuple.Create(device, data);
            return true;
        }
    }
}

When used along these lines:

Queue = new BlockingCollection<Tuple<IDevice, Data>>(
    new DeviceDataQueue<IDevice, Data>());

Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue);
Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue);

while (true)
{
    var tuple = Queue.Take();
    var device = tuple.Item1;
    var data = tuple.Item2;

    Console.WriteLine("{0}: Device {1} produced data at {2}.",
        DateTime.Now, device.Id, data.Created);

    Thread.Sleep(TimeSpan.FromSeconds(2));
}

it produces the following output:

30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43.
30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44.
30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47.
30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49.
30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51.
30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54.


Instead of using another data structure, do another trick. The element in your collection cannot be replaced but you can, instead of storing the actual value, store a mini container. When you want to replace, you actually replace the value in the container, instead of replacing the container.

class ElementFromQueue
{
     public object SensorData;
}

...

ElementFromQueue elem = new ElementFromQueue();
elem.SensorData = new object();
...
queue.Add(elem); //Element is in queue now
...
elem.SensorData = new object(); //Update the data, simulating replace

Or just create a queue of indices that will point to a sensor number. When a value is popped, the latest sensor value is queried from another, update-able collection

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜