开发者

How can I modify a queue collection in a loop?

I have a scenario where I need to remove an item for the queue as soon as been processed. I understand I cannot remove an item from a collection whilst in loop but was wondering if something could be done with the Enumerator etc...

This is just a basic example throwing an error "Collection was modified after the enumerator was instantiated."

Any suggestions? Thanks a lot!!!

Code is as follows:

     class Program
        {
            static void Main()
            {

                Queue<Order> queueList = GetQueueList();

                foreach (Order orderItem in queueList)
                {
                    Save(orderItem);
                    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
                    queueList.Dequeue();
                }
                Console.Read();

            }

            private static void Save(Order orderItem)
            {
               //we are pretending to save or do something.
            }

            private static Queue<Order>Ge开发者_运维问答tQueueList()
            {
                Queue<Order> orderQueue = new Queue<Order>();
                orderQueue.Enqueue(new Order { Id = 1, Name = "Order 1" });
                orderQueue.Enqueue(new Order { Id = 1, Name = "Order 2" });
                orderQueue.Enqueue(new Order { Id = 2, Name = "Order 3" });
                orderQueue.Enqueue(new Order { Id = 3, Name = "Order 4" });
                orderQueue.Enqueue(new Order { Id = 4, Name = "Order 5" });
                return orderQueue;
            }
        }

        public  class Order
        {
            public int Id { get; set; }
            public string Name { get; set; }
        }


Change your foreach to:

while (queueList.Count > 0)
{
    Order orderItem = queueList.Dequeue();
    Save(orderItem);
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
}

Edit:

To reprocess if save fails do something like:

while (queueList.Count > 0)
{
    Order orderItem = queueList.Dequeue();

    if (!Save(orderItem))
    {
        queueList.Enqueue(orderItem); // Reprocess the failed save, probably want more logic to prevent infinite loop
    }
    else
    {
        Console.WriteLine("Successfully saved: {0} Name {1} ", orderItem.Id, orderItem.Name);
    }
}

Edit:

John K mentions thread safety which is a valid concern if you have multiple threads accessing the same Queue. See http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487 for a ThreadSafeQueue class that covers simple thread safety issues.


Edit: Here's the thread safety example I keep pointing everyone to :-)

Here's an example of the thread safety issues mentioned. As shown the default Queue can "miss" items while still decreasing the count.

Updated: To better represent the problem. I never add a null item to the Queue but the standard Queue.Dequeue() returns several null values. This alone would be fine but in doing so a valid item is removed from the internal collection and the Count decreases. It's a safe assumption, in this specific example, that every null item returned from a Queue.Dequeue() operation represents a valid item that was never processed.

using System;
using System.Collections.Generic;
using System.Threading;

namespace SO_ThreadSafeQueue
{
    class Program
    {
        static int _QueueExceptions;
        static int _QueueNull;
        static int _QueueProcessed;

        static int _ThreadSafeQueueExceptions;
        static int _ThreadSafeQueueNull;
        static int _ThreadSafeQueueProcessed;

        static readonly Queue<Guid?> _Queue = new Queue<Guid?>();
        static readonly ThreadSafeQueue<Guid?> _ThreadSafeQueue = new ThreadSafeQueue<Guid?>();
        static readonly Random _Random = new Random();

        const int Expected = 10000000;

        static void Main()
        {
            Console.Clear();
            Console.SetCursorPosition(0, 0);
            Console.WriteLine("Creating queues...");

            for (int i = 0; i < Expected; i++)
            {
                Guid guid = Guid.NewGuid();
                _Queue.Enqueue(guid);
                _ThreadSafeQueue.Enqueue(guid);
            }

            Console.SetCursorPosition(0, 0);
            Console.WriteLine("Processing queues...");

            for (int i = 0; i < 100; i++)
            {
                ThreadPool.QueueUserWorkItem(ProcessQueue);
                ThreadPool.QueueUserWorkItem(ProcessThreadSafeQueue);
            }

            int progress = 0;

            while (_Queue.Count > 0 || _ThreadSafeQueue.Count > 0)
            {
                Console.SetCursorPosition(0, 0);

                switch (progress)
                {
                    case 0:
                        {
                            Console.WriteLine("Processing queues... |");
                            progress = 1;
                            break;
                        }
                    case 1:
                        {
                            Console.WriteLine("Processing queues... /");
                            progress = 2;
                            break;
                        }
                    case 2:
                        {
                            Console.WriteLine("Processing queues... -");
                            progress = 3;
                            break;
                        }
                    case 3:
                        {
                            Console.WriteLine("Processing queues... \\");
                            progress = 0;
                            break;
                        }
                }

                Thread.Sleep(200);
            }

            Console.SetCursorPosition(0, 0);
            Console.WriteLine("Finished processing queues...");
            Console.WriteLine("\r\nQueue Count:           {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _Queue.Count, _QueueProcessed, _QueueExceptions, _QueueNull);
            Console.WriteLine("ThreadSafeQueue Count: {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _ThreadSafeQueue.Count, _ThreadSafeQueueProcessed, _ThreadSafeQueueExceptions, _ThreadSafeQueueNull);

            Console.WriteLine("\r\nPress any key...");
            Console.ReadKey();
        }

        static void ProcessQueue(object nothing)
        {
            while (_Queue.Count > 0)
            {
                Guid? currentItem = null;

                try
                {
                    currentItem = _Queue.Dequeue();
                }
                catch (Exception)
                {
                    Interlocked.Increment(ref _QueueExceptions);
                }

                if (currentItem != null)
                {
                    Interlocked.Increment(ref _QueueProcessed);
                }
                else
                {
                    Interlocked.Increment(ref _QueueNull);
                }

                Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times
            }
        }

        static void ProcessThreadSafeQueue(object nothing)
        {
            while (_ThreadSafeQueue.Count > 0)
            {
                Guid? currentItem = null;

                try
                {
                    currentItem = _ThreadSafeQueue.Dequeue();
                }
                catch (Exception)
                {
                    Interlocked.Increment(ref _ThreadSafeQueueExceptions);
                }

                if (currentItem != null)
                {
                    Interlocked.Increment(ref _ThreadSafeQueueProcessed);
                }
                else
                {
                    Interlocked.Increment(ref _ThreadSafeQueueNull);
                }

                Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times
            }
        }

        /// <summary>
        /// Represents a thread safe <see cref="Queue{T}"/>
        /// </summary>
        /// <typeparam name="T"></typeparam>
        public class ThreadSafeQueue<T> : Queue<T>
        {
            #region Private Fields
            private readonly object _LockObject = new object();
            #endregion

            #region Public Properties
            /// <summary>
            /// Gets the number of elements contained in the <see cref="ThreadSafeQueue{T}"/>
            /// </summary>
            public new int Count
            {
                get
                {
                    int returnValue;

                    lock (_LockObject)
                    {
                        returnValue = base.Count;
                    }

                    return returnValue;
                }
            }
            #endregion

            #region Public Methods
            /// <summary>
            /// Removes all objects from the <see cref="ThreadSafeQueue{T}"/>
            /// </summary>
            public new void Clear()
            {
                lock (_LockObject)
                {
                    base.Clear();
                }
            }

            /// <summary>
            /// Removes and returns the object at the beggining of the <see cref="ThreadSafeQueue{T}"/>
            /// </summary>
            /// <returns></returns>
            public new T Dequeue()
            {
                T returnValue;

                lock (_LockObject)
                {
                    returnValue = base.Dequeue();
                }

                return returnValue;
            }

            /// <summary>
            /// Adds an object to the end of the <see cref="ThreadSafeQueue{T}"/>
            /// </summary>
            /// <param name="item">The object to add to the <see cref="ThreadSafeQueue{T}"/></param>
            public new void Enqueue(T item)
            {
                lock (_LockObject)
                {
                    base.Enqueue(item);
                }
            }

            /// <summary>
            /// Set the capacity to the actual number of elements in the <see cref="ThreadSafeQueue{T}"/>, if that number is less than 90 percent of current capactity.
            /// </summary>
            public new void TrimExcess()
            {
                lock (_LockObject)
                {
                    base.TrimExcess();
                }
            }
            #endregion
        }

    }
}


foreach as a reasonable way to iterate through the queue when you are not removing items

When you want to remove and process items, the thread-safe, proper way is just remove them one at a time and process them after they have been removed.

One way is this

// the non-thread safe way
//
while (queueList.Count > 0)
{
    Order orderItem = queueList.Dequeue();
    Save(orderItem);
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
}

It's possible for the number of items in the queue to change between queueList.Count and queueList.Dequeue(), so to be thread safe, you have to just use Dequeue, but Dequeue will throw when the queue is empty, so you have to use an exception handler.

// the thread safe way.
//
while (true)
{
    Order orderItem = NULL;
    try { orderItem = queueList.Dequeue(); } catch { break; }
    if (null != OrderItem)
    {
        Save(orderItem);
        Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
    }
}


To me, it looks like you are trying to process the element in queue one-by-one.

How about wrapping this in while loop and process each element from Dequeue, until the queue is empty?

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜