开发者

C# threading pattern that will let me flush

I have a class that implements the Begin/End Invocation pattern where I initially used ThreadPool.QueueUserWorkItem() to thread my work. The work done on the thread doesn't loop but does takes a bit of time to process so the work itself is not easily stopped.

I now have the side effect where someone using my class is calling the Begin (with callback) a ton of times to do a lot of processing so ThreadPool.QueueUserWorkItem is creating a ton of threads to do the processing. That in itself isn't bad but there are instances where they want to abandon the processing and start a new process but they are forced to wait for their first request to finish.

Since ThreadPool.QueueUseWorkItem() doesn't allow me to cancel the threads I am trying to come up with a better way to queue up the work and maybe use an explicit FlushQueue() method in my class to allow the caller to abandon work in my queue.

Anyone have any suggestion on a threading pattern that fits my needs?

Edit: I'm currently targeting the 2.0 framework. I'm开发者_如何学运维 currently thinking that a Consumer/Producer queue might work. Does anyone have thoughts on the idea of flushing the queue?

Edit 2 Problem Clarification: Since I'm using the Begin/End pattern in my class every time the caller uses the Begin with callback I create a whole new thread on the thread pool. This call does a very small amount of processing and is not where I want to cancel. It's the uncompleted jobs in the queue I wish to stop.

The fact that the ThreadPool will create 250 threads per processor by default means if you ask the ThreadPool to queue a large amount of items with QueueUserWorkItem() you end up creating a huge amount of concurrent threads that you have no way of stopping.

The caller is able to push the CPU to 100% with not only the work but the creation of the work because of the way I queued the threads.

I was thinking by using the Producer/Consumer pattern I could queue these threads into my own queue that would allow me to moderate how many threads I create to avoid the CPU spike creating all the concurrent threads. And that I might be able to allow the caller of my class to flush all the jobs in the queue when they are abandoning the requests.

I am currently trying to implement this myself but figured SO was a good place to have someone say look at this code or you won't be able to flush because of this or flushing isn't the right term you mean this.


EDIT My answer does not apply since OP is using 2.0. Leaving up and switching to CW for anyone who reads this question and using 4.0

If you are using C# 4.0, or can take a depedency on one of the earlier version of the parallel frameworks, you can use their built-in cancellation support. It's not as easy as cancelling a thread but the framework is much more reliable (cancelling a thread is very attractive but also very dangerous).

Reed did an excellent article on this you should take a look at

  • http://reedcopsey.com/2010/02/17/parallelism-in-net-part-10-cancellation-in-plinq-and-the-parallel-class/


A method I've used in the past, though it's certainly not a best practice is to dedicate a class instance to each thread, and have an abort flag on the class. Then create a ThrowIfAborting method on the class that is called periodically from the thread (particularly if the thread's running a loop, just call it every iteration). If the flag has been set, ThrowIfAborting will simply throw an exception, which is caught in the main method for the thread. Just make sure to clean up your resources as you're aborting.


You could extend the Begin/End pattern to become the Begin/Cancel/End pattern. The Cancel method could set a cancel flag that the worker thread polls periodically. When the worker thread detects a cancel request, it can stop its work, clean-up resources as needed, and report that the operation was canceled as part of the End arguments.


I've solved what I believe to be your exact problem by using a wrapper class around 1+ BackgroundWorker instances.

Unfortunately, I'm not able to post my entire class, but here's the basic concept along with it's limitations.

Usage: You simply create an instance and call RunOrReplace(...) when you want to cancel your old worker and start a new one. If the old worker was busy, it is asked to cancel and then another worker is used to immediately execute your request.

public class BackgroundWorkerReplaceable : IDisposable
{

BackgroupWorker activeWorker = null;
object activeWorkerSyncRoot = new object();
List<BackgroupWorker> workerPool = new List<BackgroupWorker>();

DoWorkEventHandler doWork;
RunWorkerCompletedEventHandler runWorkerCompleted;

public bool IsBusy
{
    get { return activeWorker != null ? activeWorker.IsBusy; : false }
}

public BackgroundWorkerReplaceable(DoWorkEventHandler doWork, RunWorkerCompletedEventHandler runWorkerCompleted)
{
    this.doWork = doWork;
    this.runWorkerCompleted = runWorkerCompleted;
    ResetActiveWorker();
}

public void RunOrReplace(Object param, ...) // Overloads could include ProgressChangedEventHandler and other stuff
{
    try
    {
        lock(activeWorkerSyncRoot)
        {
            if(activeWorker.IsBusy)
            {
                ResetActiveWorker();
            }

            // This works because if IsBusy was false above, there is no way for it to become true without another thread obtaining a lock
            if(!activeWorker.IsBusy)
            {
                // Optionally handle ProgressChangedEventHandler and other features (under the lock!)

                // Work on this new param
                activeWorker.RunWorkerAsync(param);
            }
            else
            { // This should never happen since we create new workers when there's none available!
                throw new LogicException(...); // assert or similar
            }
        }
    }
    catch(...) // InvalidOperationException and Exception
    { // In my experience, it's safe to just show the user an error and ignore these, but that's going to depend on what you use this for and where you want the exception handling to be
    }
}

public void Cancel()
{
    ResetActiveWorker();
}

public void Dispose()
{ // You should implement a proper Dispose/Finalizer pattern
    if(activeWorker != null)
    {
        activeWorker.CancelAsync();
    }
    foreach(BackgroundWorker worker in workerPool)
    {
        worker.CancelAsync();
        worker.Dispose();
        // perhaps use a for loop instead so you can set worker to null?  This might help the GC, but it's probably not needed
    }
}

void ResetActiveWorker()
{
    lock(activeWorkerSyncRoot)
    {
        if(activeWorker == null)
        {
            activeWorker = GetAvailableWorker();
        }
        else if(activeWorker.IsBusy)
        { // Current worker is busy - issue a cancel and set another active worker
            activeWorker.CancelAsync(); // Make sure WorkerSupportsCancellation must be set to true [Link9372]
            // Optionally handle ProgressEventHandler -=
            activeWorker = GetAvailableWorker(); // Ensure that the activeWorker is available
        }
        //else - do nothing, activeWorker is already ready for work!
    }
}
BackgroupdWorker GetAvailableWorker()
{
    // Loop through workerPool and return a worker if IsBusy is false
    // if the loop exits without returning...
    if(activeWorker != null)
    {
        workerPool.Add(activeWorker); // Save the old worker for possible future use
    }
    return GenerateNewWorker();
}
BackgroundWorker GenerateNewWorker()
{
    BackgroundWorker worker = new BackgroundWorker();
    worker.WorkerSupportsCancellation = true; // [Link9372]
    //worker.WorkerReportsProgress
    worker.DoWork += doWork;
    worker.RunWorkerCompleted += runWorkerCompleted;
    // Other stuff
    return worker;
}

} // class

Pro/Con:

This has the benefit of having a very low delay in starting your new execution, since new threads don't have to wait for old ones to finish.

This comes at the cost of a theoretical never-ending growth of BackgroundWorker objects that never get GC'd. However, in practice the code below attempts to recycle old workers so you shouldn't normally encounter a large pool of ideal threads. If you are worried about this because of how you plan to use this class, you could implement a Timer which fires a CleanUpExcessWorkers(...) method, or have ResetActiveWorker() do this cleanup (at the cost of a longer RunOrReplace(...) delay).

The main cost from using this is precisely why it's beneficial - it doesn't wait for the previous thread to exit, so for example, if DoWork is performing a database call and you execute RunOrReplace(...) 10 times in rapid succession, the database call might not be immediately canceled when the thread is - so you'll have 10 queries running, making all of them slow! This generally tends to work fine with Oracle, causing only minor delays, but I do not have experiences with other databases (to speed up the cleanup, I have the canceled worker tell Oracle to cancel the command). Proper use of the EventArgs described below mostly solves this.

Another minor cost is that whatever code this BackgroundWorker is performing must be compatible with this concept - it must be able to safely recover from being canceled. The DoWorkEventArgs and RunWorkerCompletedEventArgs have a Cancel/Cancelled property which you should use. For example, if you do Database calls in the DoWork method (mainly what I use this class for), you need to make sure you periodically check these properties and take perform the appropriate clean-up.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜