开发者

Producer-Consumer with a variation - How to synchronize with thread signal/wait?

While working on a large project I realized I was making a lot of calls to be scheduled in the future. Since these were fairly light-weight, I thought it might be better to use a separate scheduler.

ThreadPool.QueueUserWorkItem (() => 
{
    Thread.Sleep (5000);
    Foo (); // Call is to be executed after sometime
});

So I created a separate scheduler class that runs on its own thread and executes these events. I have 2 functions that access a shared queue from separate threads. I'd use a lock, but since one of the threads needs to sleep-wait, I wasn't sure how to release the lock.

class Scheduler
{
    SortedDictionary <DateTime, Action> _queue;
    EventWaitHandle _sync;

    // Runs on its own thread
    void Run ()
    {
        while (true)
        {
            // Calculate time till first event
            // If queue empty, use pre-defined value
            TimeSpan timeDiff = _queue.First().K开发者_C百科ey - DateTime.Now;

            // Execute action if in the next 100ms
            if (timeDiff < 100ms)
                ...
            // Wait on event handle for time
            else
                _sync.WaitOne (timeDiff);
        }
    }

    // Can be called by any thread
    void ScheduleEvent (Action action, DataTime time)
    {
        _queue.Add (time, action);
        // Signal thread to wake up and check again
        _sync.Set ();
    }
}

  • The trouble is, I'm not sure how to synchronize access to the queue between the 2 functions. I can't use a monitor or mutex, because Run() will sleep-wait, thus causing a deadlock. What is the right synchronization mechanism to use here? (If there a mechanism to atomically start the sleep-wait process and immediately release the lock, that might solve my problem)
  • How can I verify there is no race-condition?
  • Is this a variation of the producer consumer problem, or is there a more relevant synchronization problem-description?

    While this is somewhat geared towards C#, I'd be happy to hear a general solution to this. Thanks!


    OK, take 2 with Monitor/Pulse.

        void Run ()    
        {
            while (true)
            {
                Action doit = null;
    
                lock(_queueLock)
                {
                    while (_queue.IsEmpty())
                        Monitor.Wait(_queueLock);
    
                    TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
                    if (timeDiff < 100ms)
                        doit = _queue.Dequeue();
                }
    
                if (doit != null)
                    ; //execute doit
                else
                 _sync.WaitOne (timeDiff);  
            }
        }
    
    
    void ScheduleEvent (Action action, DataTime time)
    {
        lock (_queueLock)
        {
            _queue.Add(time, action);
            // Signal thread to wake up and check again
            _sync.Set ();
            if (_queue.Count == 1)
                 Monitor.Pulse(_queuLock);
        }
    }
    


    The problem is easily solved, make sure the WaitOne is outside the lock.

      //untested
      while (true)
      {
          Action doit = null;
    
          // Calculate time till first event
          // If queue empty, use pre-defined value
          lock(_queueLock)
          {
             TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
             if (timeDiff < 100ms)
                doit = _queue.Dequeue();
          }
          if (doit != null)
            // execute it
          else
             _sync.WaitOne (timeDiff);
      }
    

    _queueLock is a private helper object.


    Since your goal is to schedule a task after a particular period of time, why not just use the System.Threading.Timer? It doesn't require dedicating a thread for the scheduling and takes advantage of the OS to wake up a worker thread. I've used this (removed some comments and other timer service functionality):

    public sealed class TimerService : ITimerService
    {
        public void WhenElapsed(TimeSpan duration, Action callback)
        {
            if (callback == null) throw new ArgumentNullException("callback");
    
            //Set up state to allow cleanup after timer completes
            var timerState = new TimerState(callback);
            var timer = new Timer(OnTimerElapsed, timerState, Timeout.Infinite, Timeout.Infinite);
            timerState.Timer = timer;
    
            //Start the timer
            timer.Change((int) duration.TotalMilliseconds, Timeout.Infinite);
        }
    
        private void OnTimerElapsed(Object state)
        {
            var timerState = (TimerState)state;
            timerState.Timer.Dispose();
            timerState.Callback();
        }
    
        private class TimerState
        {
            public Timer Timer { get; set; }
    
            public Action Callback { get; private set; }
    
            public TimerState(Action callback)
            {
                Callback = callback;
            }
        }
    }
    


    The monitores were created for this kind of situation, simple problems that can cost mutch for the application, i present my solution to this very simple and if u want to make a shutdown easy to implement:

        void Run()
        {
          while(true)
             lock(this)
             {
                int timeToSleep = getTimeToSleep() //check your list and return a value
                if(timeToSleep <= 100) 
                    action...
                else
                {
    
                   int currTime = Datetime.Now;
                   int currCount = yourList.Count;
                   try{
                   do{
                     Monitor.Wait(this,timeToSleep);
    
                     if(Datetime.now >= (tomeToSleep + currtime))
                          break; //time passed
    
                     else if(yourList.Count != currCount)
                        break; //new element added go check it
                     currTime = Datetime.Now;
                   }while(true);
                }
                }catch(ThreadInterruptedException e)
                {
                    //do cleanup code or check for shutdown notification
                }
             }
          }
        }
    
    void ScheduleEvent (Action action, DataTime time)
    {
        lock(this)
        {
           yourlist.add ...
           Monitor.Pulse(this);
    

    } }

  • 0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新问答

    问答排行榜