开发者

How would you implement this "WorkerChain" functionality in .NET?

EDIT: It kind of occurred to me too late (?) that all the code I posted in my first update to this question was way too much for most readers. I've actually gone ahead and written a blog post about this topic for anyone who cares to read it.

In the meantime, I've left the original question in place, to give a brief glimpse at the problem I'd like to solve.

I'll also just note that the code I have posted (on my blog) has, thus far, stood up pretty well to testing. But I'm still interested in any and all feedback people are willing to give me on how clean/robust/performant* it is.

*I love how that word doesn't really mean what we think, but we developers use it all the time anyway.


Original Question

Sorry for the vague question title -- not sure how to encapsulate what I'm asking below succinctly. (If someone with editing privileges can think of a more descriptive title, feel free to change it.)

The behavior I need is this. I am envisioning a worker class that accepts a single delegate task in its constructor (for simplicity, I would make it immutable -- no more tasks can be added after instantiation). I'll call this task T. The class should have a simple method, something like GetToWork, that will exhibit this behavior:

  1. If the worker is not currently running T, then it will start doing so right now.
  2. If the worker is currently running T, then once it is finished, it will start T again immediately.
  3. GetToWork can be called any number of times while the worker is running T; the simple rule is that, during any execution of T, if GetToWork was called at least once, T will run again upon completion (and then if GetToWork is called while T is running that time, it will repeat itself again, etc.).

Now, this is pretty straightforward with a boolean switch. But this class needs to be thread-safe, by which I mean, steps 1 and 2 above need to comprise atomic operations (at least I think they do).

There is an added layer of complexity. I have need of a "worker chain" class that will consist of many of these workers linked together. As soon as the first worker completes, it essentially calls GetToWork on the worker after it; meanwhile, if its own GetToWork has been called, it restarts itself as well. Logically calling GetToWork on the chain is essentially the same as calling GetToWork on the first worker in the chain (I would fully intend that the chain's workers not be publicly accessible).

One way to imagine how this hypothetical "worker chain" would behave is by comparing it to a team in a relay race. Suppose there are four runners, W1 through W4, and let the chain be called C. If I call C.StartWork(), what should happen is this:

  1. If W1 is at his starting point (i.e., doing nothing), he will start running towards W2.
  2. If W1 is already running towards W2 (i.e., executing his task), then once he reaches W2, he will signal to W2 to get started, immediately return to his starting point and, since StartWork has been called, start running towards W2 again.
  3. When W1 reaches W2's starting point, he'll immediately return to his own starting point.
    1. If W2 is just sitting around, he'll start running immediately towards W3.
    2. If W2 is already off running towards W3, then W2 will simply go again once he's reached W3 and returned to his starting point.

The above is probably a little convoluted and written out poorly. But hopefully you get the basic idea. Obviously, these workers will be running on their own threads.

Also, I guess it's possible this functionality already exists somewh开发者_StackOverflow社区ere? If that's the case, definitely let me know!


Use semaphores. Each worker is a thread with the following code (pseudocode):

WHILE(TRUE)
    WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
    RESET_SEMAPHORE(WORKER_ID)
    /* DO WORK */
    POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END

A non-zero semaphore means that someone signaled the current thread to do the work. After gets a non zero semaphore in its entry line, it resets the semaphore (mark as no one signaled), do the work (meanwhile the semaphore can be posted again) and post the semaphore for the next worker. The story repeats in the next worker(s).


A naive implementation that you may get some mileage from.

Note:

It is my understanding that scalar types, r.e. the bool flags controlling execution, have atomic assignment making them as thread safe as you would need/want in this scenario.

There are much more complex possibilities involving semaphores and other strategies, but if simple works....

using System;
using System.Threading;

namespace FlaggedWorkerChain
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
            FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);

            Thread t = new Thread(outerWorker.GetToWork);
            t.Start();

            // flag outer to do work again
            outerWorker.GetToWork();

            Console.WriteLine("press the any key");
            Console.ReadKey();
        }
    }

    public sealed class FlaggedChainedWorker
    {
        private readonly string _id;
        private readonly FlaggedChainedWorker _innerWorker;
        private readonly Action _work;
        private bool _busy;
        private bool _flagged;

        public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
        {
            _id = id;
            _work = work;
            _innerWorker = innerWorker;
        }

        public void GetToWork()
        {
            if (_busy)
            {
                _flagged = true;
                return;
            }

            do
            {
                _flagged = false;
                _busy = true;
                Console.WriteLine(String.Format("{0} begin", _id));

                _work.Invoke();

                if (_innerWorker != null)
                {
                    _innerWorker.GetToWork();
                }
                Console.WriteLine(String.Format("{0} end", _id));

                _busy = false;
            } while (_flagged);
        }
    }
}


Seems to me that you're overcomplicating this. I've written these "pipeline" classes before; all you need is a queue of workers each with a wait handle that gets signaled after the action is complete.

public class Pipeline : IDisposable
{
    private readonly IEnumerable<Stage> stages;

    public Pipeline(IEnumerable<Action> actions)
    {
        if (actions == null)
            throw new ArgumentNullException("actions");
        stages = actions.Select(a => new Stage(a)).ToList();
    }

    public Pipeline(params Action[] actions)
        : this(actions as IEnumerable<Action>)
    {
    }

    public void Dispose()
    {
        foreach (Stage stage in stages)
            stage.Dispose();
    }

    public void Start()
    {
        foreach (Stage currentStage in stages)
            currentStage.Execute();
    }

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        public void Execute()
        {
            readyEvent.WaitOne();
            action();
            readyEvent.Set();
        }
    }
}

And here's a test program, which you can use to verify that actions always get executed in the correct order and only one of the same action can ever execute at once:

class Program
{
    static void Main(string[] args)
    {
        Action firstAction = GetTestAction(1);
        Action secondAction = GetTestAction(2);
        Action thirdAction = GetTestAction(3);
        Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
        for (int i = 0; i < 10; i++)
        {
            ThreadPool.QueueUserWorkItem(s => pipeline.Start());
        }
    }

    static Action GetTestAction(int index)
    {
        return () =>
        {
            Console.WriteLine("Action started: {0}", index);
            Thread.Sleep(100);
            Console.WriteLine("Action finished: {0}", index);
        };
    }
}

Short, simple, completely thread-safe.

If for some reason you need to start working at a specific step in the chain instead, then you can just add an overload for Start:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        currentStage.Execute();
}

Edit

Based on comments, I think a few minor changes to the inner Stage class should be enough to get the kind of behaviour you want. We just need to add a "queued" event in addition to the "ready" event.

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;
        private readonly EventWaitHandle queuedEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
            this.queuedEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        private bool CanExecute()
        {
            if (readyEvent.WaitOne(0, true))
                return true;
            if (!queuedEvent.WaitOne(0, true))
                return false;
            readyEvent.WaitOne();
            queuedEvent.Set();
            return true;
        }

        public bool Execute()
        {
            if (!CanExecute())
                return false;
            action();
            readyEvent.Set();
            return true;
        }
    }

Also change the pipeline's Start method to break if a stage can't execute (i.e. is already queued):

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        if (!currentStage.Execute())
            break;
}

The concept here is pretty simple, again:

  • A stage first tries to immediately acquire the ready state. If it succeeds, then it starts running.
  • If it fails to acquire the ready state (i.e. the task is already running), then it tries to acquire the queued state.
    • If it gets the queued state, then it waits for the ready state to become available and then releases the queued state.
    • If it can't get the queued state either, then it gives up.

I've read over your question and comments again and I'm pretty sure this is exactly what you're trying to do, and gives the best trade-off between safety, throughput, and throttling.

Because the ThreadPool can sometimes take a while to respond, you should up the delay in the test program to 1000 instead of 100 if you want to really see the "skips" happening.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜