开发者

C# Threading race condition

I'm attempting to write a function which starts a thread for each "contact" and then queries (over the network) for results from that contact. I want my waiting function to wait at most 1.5 second for responses, and after that, simply terminate any remaining threads.

The issue I'm having is that the function is returning before all the threads are done, even though according to the logic, this should not be possible. The while loop in the main function should be waiting until all threads have fully completed, yet I get the following output:

FAIL: Storage test 1 exists 0 times in the DHT.
    : Storage test 2 exists 0 times in the DHT.
Added storage test 1 to the entries.
Added storage test 2 to the entries.

(The FAIL lines comes from the main testing program seeing how many results were returned by Get().)

According to what I can see, this shouldn't be possible. Does anyone know where the race condition might be occurring (or any other assumptions I've made that are not correct)?

The function definition is as such:

    public IList<Entry> Get(ID key)
    {
        ConcurrentBag<Entry> entries = new ConcurrentBag<Entry>();
        List<Thread> threads = new List<Thread>();
        foreach (Contact c in this.p_Contacts)
        {
            Thread t = new Thread(delegate()
            {
                try
                {
                    FetchMessage fm = new FetchMessage(this, c, key);
                    fm开发者_如何学Go.Send();
                    int ticks = 0;

                    // Wait until we receive data, or timeout.
                    while (!fm.Received && ticks < 1500)
                    {
                        Thread.Sleep(100);
                        ticks += 100;
                    }

                    if (fm.Received)
                    {
                        foreach (Entry e in fm.Values)
                        {
                            Console.WriteLine("Added " + e.Value + " to the entries.");
                            entries.Add(e);
                        }

                        if (entries.Count == 0)
                            Console.WriteLine("There were no entries to add.");
                    }
                    else
                        Console.WriteLine("The node did not return in time.");
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                }
            }
            );
            t.IsBackground = false;
            t.Start();
        }

        while (true)
        {
            bool stopped = true;
            foreach (Thread t in threads)
            {
                if (t.ThreadState != ThreadState.Stopped)
                    stopped = false;
            }
            if (stopped)
                break;
            Thread.Sleep(100);
        }

        return new List<Entry>(entries.ToArray());
    }


Looks like you're never putting the Thread (t) in the List<Thread> (threads). Your foreach loop never executes.

The main thread just waits 100ms and continues.


@Toby has the correct answer, but if I may introduce some other things to improve the code. Essentially, you are manually managing your own ThreadPool and timeouts--and this is something that .Net provides for you out of the box. See: http://msdn.microsoft.com/en-us/library/system.threading.threadpool(v=VS.100).aspx

If you combine your ThreadPool with a .Net 4 Barrier, you can simplify the code a lot. Essentially, a Barrier will block all threads until they sync up. When you pass in the same barrier to your threads and sync up at the end, you can pause your main thread until all the worker threads are done. The refactored code would look like this:

// For the number of threads + 1 for the main thread
Barrier barrier = new Barrier(this.p_Contacts.count + 1);
ConcurrentBag<Entry> entries = new ConcurrentBag<Entry>();

foreach (Contact c in this.p_Contacts)
{
    ThreadPool.RegisterWaitForSingleObject(
        new EventWaitHandle(false, EventResetMode.AutoReset),
        (stateInfo,timedOut) => {
            try
            {
                FetchMessage fm = new FetchMessage(this, c, key);
                fm.Send();

                while(!fm.Received || !timedOut)
                {
                    Thread.Sleep(100);
                }

                if(fm.Received)
                {
                    foreach (Entry e in fm.Values)
                    {
                        entries.Add(e);
                        Console.WriteLine("Added " + e.Value + " to the entries.");
                    }

                    // avoid counting other thread's work
                    if (fm.Values.count == 0)
                    {
                        Console.WriteLine("There were no entries to add.");
                    }
                }
                else
                {
                    Console.WriteLine("The node did not return in time.");
                }

                barrier.SignalAndWait();
            }
            catch(Exception e)
            {
                Console.WriteLine(e);
            }
        }, null, TimeSpan.FromSeconds(1.5), true);
    );
}

// This limits total time waited to only 1.5 seconds
barrier.SignalAndWait(TimeSpan.FromSeconds(1.5));

return new List<Entry>(entries.ToArray());

Instead of manually managing the spin locks like you were doing, let .Net do it for you.


The Threads don't get added to your list so the while loop will break right away?


The solution to this problem was to use a ConcurrentDictionary to keep track of the contacts that had their threads finished:

    public IList<Entry> Get(ID key)
    {
        ConcurrentBag<Entry> entries = new ConcurrentBag<Entry>();
        ConcurrentDictionary<Contact, bool> done = new ConcurrentDictionary<Contact, bool>();
        List<Thread> threads = new List<Thread>();
        foreach (Contact c in this.p_Contacts)
        {
            Thread t;
            ThreadStart ts = delegate()
            {
                try
                {
                    FetchMessage fm = new FetchMessage(this, c, key);
                    fm.Send();
                    int ticks = 0;

                    // Wait until we receive data, or timeout.
                    while (!fm.Received && ticks < 1500)
                    {
                        Thread.Sleep(100);
                        ticks += 100;
                    }

                    if (fm.Received)
                    {
                        foreach (Entry e in fm.Values)
                        {
                            Console.WriteLine("Added " + e.Value + " to the entries.");
                            entries.Add(e);
                        }

                        if (entries.Count == 0)
                            Console.WriteLine("There were no entries to add.");
                    }
                    else
                        Console.WriteLine("The node did not return in time.");

                    Thread.MemoryBarrier();
                    done[c] = true;
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                }
            };
            t = new Thread(ts);
            done[c] = false;
            t.IsBackground = true;
            t.Start();
        }

        while (true)
        {
            bool stopped = true;
            foreach (Contact c in this.p_Contacts)
            {
                if (!done[c])
                    stopped = false;
            }
            if (stopped)
                break;
            Thread.Sleep(100);
        }

        return new List<Entry>(entries.ToArray());
    }
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜