开发者

How to implement a suspendable BlockingCollection

I am writing a WCF service that receives notifications from several modules (DB, other services..) and adds them to a blocking collection, to be processed on a consumer thread that publishes the relevant data to the clients.

A client can request the full data stored on the server, and during this operation i don't want to accept any new notifications. Basically i want to suspend the blocking collection (or the consumer thread) and re开发者_高级运维sume the notification receiving and processing after completing the client's request.

What is a good way to implement this behavior?


If I have understood you correctly, you want to prevent the consumer thread from consuming data from the BlockingCollection while some other query operation is taking place, but during this time the producers can continue to push data into the collection.

I that is correct then I think the best course would be to have a ManualResetEvent which is normally signaled and the consumer threads will not be blocked, and when you want to pause the consumers you can reset the event which will cause each consumer to block waiting for the event to become signaled.

Update: Here is a quick console application demonstrating what I describe above. This is just a quick demo, but is shows 1 producer thread and two consumer threads. The state of the consumers can be toggled between Running and Paused by hitting the Space Bar on the keyboard.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;

namespace ProducerConsumerDemo
{
  class Program
  {
    static BlockingCollection<int> _queue = new BlockingCollection<int>();
    static ManualResetEvent _pauseConsumers = new ManualResetEvent(true);
    static bool _paused = false;
    static int _itemsEnqueued = 0;
    static int _itemsDequeued = 0;

    static void Main(string[] args)
    {
      Thread producerThread = new Thread(Producer);
      Thread consumerThread1 = new Thread(Consumer);
      Thread consumerThread2 = new Thread(Consumer);
      producerThread.Start();
      consumerThread1.Start();
      consumerThread2.Start();

      while (true)
      {
        WriteAt(0,0,"State: " + (string)(_paused ? "Paused" : "Running"));
        WriteAt(0,1,"Items In Queue: " + _queue.Count);
        WriteAt(0, 2, "Total enqueued: " + _itemsEnqueued);
        WriteAt(0, 3, "Total dequeued: " + _itemsDequeued);

        Thread.Sleep(100);
        if (Console.KeyAvailable)
        {
          if (Console.ReadKey().Key == ConsoleKey.Spacebar)
          {
            if (_paused)
            {
              _paused = false;
              _pauseConsumers.Set();
            }
            else
            {
              _paused = true;
              _pauseConsumers.Reset();
            }
          }
        }
      }
    }

    static void WriteAt(int x, int y, string format, params object[] args)
    {
      Console.SetCursorPosition(x, y);
      Console.Write("                                         ");
      Console.SetCursorPosition(x, y);
      Console.Write(format, args);
    }

    static void Consumer()
    {
      while (true)
      {
        if (_paused)
        {
          // If we are paused, wait for the signal to indicate that
          // we can continue
          _pauseConsumers.WaitOne();
        }

        int value;
        if (_queue.TryTake(out value))
        {
          Interlocked.Increment(ref _itemsDequeued);
          // Do something with the data
        }
        Thread.Sleep(500);
      }
    }

    static void Producer()
    {
      Random rnd = new Random();
      while (true)
      {
        if (_queue.TryAdd(rnd.Next(100)))
        {
          Interlocked.Increment(ref _itemsEnqueued);
        }
        Thread.Sleep(500);
      }
    }
  }
}


You can wrap the BlockingCollection and add a Queue for the producer and some sync mechanism internally.

here is a sample:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Windows.Forms;
using System.Threading;
using System.Collections;
using System.Collections.Concurrent;

namespace WindowsFormsApplication2
{
    static class Program
    {
        private static void Producer()
        {
            int i = 0;
            while (i < 500)
            {
                Thread.Sleep(50); // can remove the sleep just for debugging
                bc.ProducerAdd("Notification " + i);
                i++;
            }
        }

        private static void Consumer()
        {
            while (true)
            {
                foreach (var it in bc)
                {
                    Console.WriteLine(String.Format("{0} : CONSUMES {1}", Thread.CurrentThread.Name, it));

                    // this is just a check test
                    lock (consumed)
                    {
                        consumed.Add(it, Convert.ToInt32(it.Split(' ')[1])); // this will fail if we consume the same twice
                    }
                }
            }
        }

        private static void TogglePause()
        {
            while (true)
            {
                Thread.Sleep(3000); //every 3 seconds
                bc.Paused = !bc.Paused;
                Console.WriteLine("PAUSE is now: " + bc.Paused);
            }
        }

        private static QueuedBlockingCollection<string> bc = new QueuedBlockingCollection<string>();
        private static Dictionary<string, int> consumed = new Dictionary<string, int>();

        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        [STAThread]
        static void Main()
        {
            Thread producer = new Thread(Producer);
            producer.Start();

            Thread consumer1 = new Thread(Consumer);
            consumer1.Name = "Consumer 1";
            consumer1.Start();

            Thread consumer2 = new Thread(Consumer);
            consumer2.Name = "Consumer 2";
            consumer2.Start();

            Thread pauser = new Thread(TogglePause);
            pauser.Start();

            while (true)
            {
                // wait and observe console writelines
                Application.DoEvents();
            }
        }
    }

    class QueuedBlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
    {
        private Queue<T> queue = new Queue<T>();
        private BlockingCollection<T> collection = new BlockingCollection<T>();
        private Thread syncThread;

        public bool Paused { get; set; }
        public bool Exiting { get; set; }

        private void SyncLoop()
        {
            // this while will wait for the class to be destroyed
            while (!Exiting)
            {
                try
                {
                    // this while will finish when queue is synched to collection, or paused or exiting
                    while (queue.Count > 0 && !Exiting && !Paused)
                    {
                        lock (collection)
                        {
                            T item = queue.Dequeue();
                            collection.Add(item);
                            Console.WriteLine(String.Format("SYNCHED {0} TO COLLECTION", item));
                        }
                    }
                }
                catch (ObjectDisposedException)
                {
                    // collection has been disposed, exit this thread
                    break;
                }
            }
        }

        public void ProducerAdd(T item)
        {
            // producer always adds to the queue if the collection is paused so it wont block the collection
            // the sync thread will block the collection only when adding from the queue
            // consumers automatically block this collection when enumerating because it is a wrapper to the internal blocked coll...

            if (!Paused)
            {
                lock (collection)
                {
                    collection.Add(item);
                    Console.WriteLine(String.Format("Producer ADDED '" + item + "', status collection {0} queue {1}", this.Count, queue.Count));
                }
            }
            else
            {
                queue.Enqueue(item);
                Console.WriteLine(String.Format("Producer ENQUEUED '" + item + "', Status -> Collection: {0}, Queue: {1}", this.Count, queue.Count));
            }
        }

        public QueuedBlockingCollection()
        {
            //collection.CompleteAdding();
            syncThread = new Thread(SyncLoop);
            syncThread.Start();
        }

        ~QueuedBlockingCollection()
        {
            Exiting = true;
            syncThread.Join(200);
        }

        public IEnumerator<T> GetEnumerator()
        {
            return collection.GetConsumingEnumerable().GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return collection.GetConsumingEnumerable().GetEnumerator();
        }

        public void CopyTo(Array array, int index)
        {
            var items = collection.GetConsumingEnumerable();
            int offset = 0;

            if (array == null) throw new NullReferenceException("Array must be initialized");
            if (array.Rank > 1) throw new InvalidOperationException("Array must have 1 dimension");
            if (array.GetLength(0) - index < items.Count()) throw new IndexOutOfRangeException("Array is too small");

            foreach (var item in items)
            {
                array.SetValue(item, index + offset);
                offset++;
            }

        }

        public int Count
        {
            get
            {
                return collection.Count;
            }
        }

        public bool IsSynchronized
        {
            get { return queue.Count == 0; }
        }

        public object SyncRoot
        {
            get { return collection; }
        }

        public void Dispose()
        {
            collection.Dispose();
        }

    }
}

You can add the BlockingCollection methods you need by just doing them as a wrapper to collection methods.

Here is some output:

Producer ADDED 'Notification 0', status collection 1 queue 0
Producer ADDED 'Notification 1', status collection 2 queue 0
Producer ADDED 'Notification 2', status collection 3 queue 0
Producer ADDED 'Notification 3', status collection 4 queue 0
Consumer 2 : CONSUMES Notification 0
Consumer 2 : CONSUMES Notification 1
Consumer 2 : CONSUMES Notification 2
Consumer 2 : CONSUMES Notification 3
Consumer 2 : CONSUMES Notification 4
Producer ADDED 'Notification 4', status collection 0 queue 0
Producer ADDED 'Notification 5', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 5
Producer ADDED 'Notification 6', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 6
Producer ADDED 'Notification 7', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 7
Producer ADDED 'Notification 8', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 8
Producer ADDED 'Notification 9', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 9
Producer ADDED 'Notification 10', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 10
Producer ADDED 'Notification 11', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 11
Consumer 2 : CONSUMES Notification 12
Producer ADDED 'Notification 12', status collection 0 queue 0
Producer ADDED 'Notification 13', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 13
Producer ADDED 'Notification 14', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 14
Producer ADDED 'Notification 15', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 15
Producer ADDED 'Notification 16', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 16
Producer ADDED 'Notification 17', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 17
Producer ADDED 'Notification 18', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 18
Producer ADDED 'Notification 19', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 19
Producer ADDED 'Notification 20', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 20
Producer ADDED 'Notification 21', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 21
Producer ADDED 'Notification 22', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 22
Producer ADDED 'Notification 23', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 23
Producer ADDED 'Notification 24', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 24
Producer ADDED 'Notification 25', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 25
Producer ADDED 'Notification 26', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 26
Producer ADDED 'Notification 27', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 27
Producer ADDED 'Notification 28', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 28
Producer ADDED 'Notification 29', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 29
Producer ADDED 'Notification 30', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 30
Producer ADDED 'Notification 31', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 31
Producer ADDED 'Notification 32', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 32
Producer ADDED 'Notification 33', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 33
Producer ADDED 'Notification 34', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 34
Producer ADDED 'Notification 35', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 35
Producer ADDED 'Notification 36', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 36
Producer ADDED 'Notification 37', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 37
Producer ADDED 'Notification 38', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 38
Producer ADDED 'Notification 39', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 39
Producer ADDED 'Notification 40', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 40
Producer ADDED 'Notification 41', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 41
Producer ADDED 'Notification 42', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 42
Producer ADDED 'Notification 43', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 43
Producer ADDED 'Notification 44', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 44
Producer ADDED 'Notification 45', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 45
Producer ADDED 'Notification 46', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 46
Producer ADDED 'Notification 47', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 47
Producer ADDED 'Notification 48', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 48
Producer ADDED 'Notification 49', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 49
Producer ADDED 'Notification 50', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 50
Producer ADDED 'Notification 51', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 51
Producer ADDED 'Notification 52', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 52
Producer ADDED 'Notification 53', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 53
Producer ADDED 'Notification 54', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 54
Producer ADDED 'Notification 55', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 55
Producer ADDED 'Notification 56', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 56
Producer ADDED 'Notification 57', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 57
Producer ADDED 'Notification 58', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 58
Producer ADDED 'Notification 59', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 59
Producer ADDED 'Notification 60', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 60
PAUSE is now: True
Producer ENQUEUED 'Notification 61', Status -> Collection: 0, Queue: 1
Producer ENQUEUED 'Notification 62', Status -> Collection: 0, Queue: 2
Producer ENQUEUED 'Notification 63', Status -> Collection: 0, Queue: 3
Producer ENQUEUED 'Notification 64', Status -> Collection: 0, Queue: 4
Producer ENQUEUED 'Notification 65', Status -> Collection: 0, Queue: 5
Producer ENQUEUED 'Notification 66', Status -> Collection: 0, Queue: 6
Producer ENQUEUED 'Notification 67', Status -> Collection: 0, Queue: 7
Producer ENQUEUED 'Notification 68', Status -> Collection: 0, Queue: 8
Producer ENQUEUED 'Notification 69', Status -> Collection: 0, Queue: 9
Producer ENQUEUED 'Notification 70', Status -> Collection: 0, Queue: 10
Producer ENQUEUED 'Notification 71', Status -> Collection: 0, Queue: 11
Producer ENQUEUED 'Notification 72', Status -> Collection: 0, Queue: 12
Producer ENQUEUED 'Notification 73', Status -> Collection: 0, Queue: 13
Producer ENQUEUED 'Notification 74', Status -> Collection: 0, Queue: 14
Producer ENQUEUED 'Notification 75', Status -> Collection: 0, Queue: 15
Producer ENQUEUED 'Notification 76', Status -> Collection: 0, Queue: 16
Producer ENQUEUED 'Notification 77', Status -> Collection: 0, Queue: 17
Producer ENQUEUED 'Notification 78', Status -> Collection: 0, Queue: 18
Producer ENQUEUED 'Notification 79', Status -> Collection: 0, Queue: 19
Producer ENQUEUED 'Notification 80', Status -> Collection: 0, Queue: 20
Producer ENQUEUED 'Notification 81', Status -> Collection: 0, Queue: 21
Producer ENQUEUED 'Notification 82', Status -> Collection: 0, Queue: 22
Producer ENQUEUED 'Notification 83', Status -> Collection: 0, Queue: 23
Producer ENQUEUED 'Notification 84', Status -> Collection: 0, Queue: 24
Producer ENQUEUED 'Notification 85', Status -> Collection: 0, Queue: 25
Producer ENQUEUED 'Notification 86', Status -> Collection: 0, Queue: 26
Producer ENQUEUED 'Notification 87', Status -> Collection: 0, Queue: 27
Producer ENQUEUED 'Notification 88', Status -> Collection: 0, Queue: 28
Producer ENQUEUED 'Notification 89', Status -> Collection: 0, Queue: 29
Producer ENQUEUED 'Notification 90', Status -> Collection: 0, Queue: 30
Producer ENQUEUED 'Notification 91', Status -> Collection: 0, Queue: 31
Producer ENQUEUED 'Notification 92', Status -> Collection: 0, Queue: 32
Producer ENQUEUED 'Notification 93', Status -> Collection: 0, Queue: 33
Producer ENQUEUED 'Notification 94', Status -> Collection: 0, Queue: 34
Producer ENQUEUED 'Notification 95', Status -> Collection: 0, Queue: 35
Producer ENQUEUED 'Notification 96', Status -> Collection: 0, Queue: 36
Producer ENQUEUED 'Notification 97', Status -> Collection: 0, Queue: 37
The thread '<No Name>' (0x1100) has exited with code 0 (0x0).
Producer ENQUEUED 'Notification 98', Status -> Collection: 0, Queue: 38
Producer ENQUEUED 'Notification 99', Status -> Collection: 0, Queue: 39
PAUSE is now: False
Consumer 1 : CONSUMES Notification 61
SYNCHED Notification 61 TO COLLECTION
SYNCHED Notification 62 TO COLLECTION
Consumer 2 : CONSUMES Notification 62
Consumer 1 : CONSUMES Notification 63
SYNCHED Notification 63 TO COLLECTION
SYNCHED Notification 64 TO COLLECTION
Consumer 2 : CONSUMES Notification 64
Consumer 1 : CONSUMES Notification 65
SYNCHED Notification 65 TO COLLECTION
SYNCHED Notification 66 TO COLLECTION
Consumer 2 : CONSUMES Notification 66
Consumer 1 : CONSUMES Notification 67
SYNCHED Notification 67 TO COLLECTION
SYNCHED Notification 68 TO COLLECTION
Consumer 2 : CONSUMES Notification 68
Consumer 1 : CONSUMES Notification 69
SYNCHED Notification 69 TO COLLECTION
SYNCHED Notification 70 TO COLLECTION
Consumer 2 : CONSUMES Notification 70
Consumer 1 : CONSUMES Notification 71
SYNCHED Notification 71 TO COLLECTION
SYNCHED Notification 72 TO COLLECTION
Consumer 2 : CONSUMES Notification 72
Consumer 1 : CONSUMES Notification 73
SYNCHED Notification 73 TO COLLECTION
SYNCHED Notification 74 TO COLLECTION
SYNCHED Notification 75 TO COLLECTION
SYNCHED Notification 76 TO COLLECTION
SYNCHED Notification 77 TO COLLECTION
SYNCHED Notification 78 TO COLLECTION
SYNCHED Notification 79 TO COLLECTION
SYNCHED Notification 80 TO COLLECTION
SYNCHED Notification 81 TO COLLECTION
SYNCHED Notification 82 TO COLLECTION
SYNCHED Notification 83 TO COLLECTION
SYNCHED Notification 84 TO COLLECTION
SYNCHED Notification 85 TO COLLECTION
SYNCHED Notification 86 TO COLLECTION
SYNCHED Notification 87 TO COLLECTION
SYNCHED Notification 88 TO COLLECTION
SYNCHED Notification 89 TO COLLECTION
SYNCHED Notification 90 TO COLLECTION
SYNCHED Notification 91 TO COLLECTION
SYNCHED Notification 92 TO COLLECTION
SYNCHED Notification 93 TO COLLECTION
SYNCHED Notification 94 TO COLLECTION
SYNCHED Notification 95 TO COLLECTION
SYNCHED Notification 96 TO COLLECTION
SYNCHED Notification 97 TO COLLECTION
SYNCHED Notification 98 TO COLLECTION
SYNCHED Notification 99 TO COLLECTION
Consumer 1 : CONSUMES Notification 74
Consumer 2 : CONSUMES Notification 75
Consumer 1 : CONSUMES Notification 76
Consumer 2 : CONSUMES Notification 77
Consumer 2 : CONSUMES Notification 79
Consumer 1 : CONSUMES Notification 78
Consumer 2 : CONSUMES Notification 80
Consumer 1 : CONSUMES Notification 81
Consumer 2 : CONSUMES Notification 82
Consumer 1 : CONSUMES Notification 83
Consumer 2 : CONSUMES Notification 84
Consumer 1 : CONSUMES Notification 85
Consumer 2 : CONSUMES Notification 86
Consumer 1 : CONSUMES Notification 87
Consumer 2 : CONSUMES Notification 88
Consumer 1 : CONSUMES Notification 89
Consumer 2 : CONSUMES Notification 90
Consumer 1 : CONSUMES Notification 91
Consumer 2 : CONSUMES Notification 92
Consumer 1 : CONSUMES Notification 93
Consumer 2 : CONSUMES Notification 94
Consumer 1 : CONSUMES Notification 95
Consumer 2 : CONSUMES Notification 96
Consumer 1 : CONSUMES Notification 97
Consumer 2 : CONSUMES Notification 98
Consumer 1 : CONSUMES Notification 99
PAUSE is now: True
PAUSE is now: False

This is thread safe...

So you could use X producers and Y consumers if you wish...

If the blocking on ProducerAdd for more producers is a no go then you can use

Monitor.TryEnter(collection)

instead of

lock (collection)

and if it fails use

queue.Enqueue()

which will be synched back to the collection as soon the locks have been released

Or if you wish just use queu.Enqueue() always for any producer and let the sync thread do its job...

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜