producer-consumer with a resource
I'm trying to implement the producer/consumer pattern with a set of resources, so each thread has one resource associated with it. For example, I may have a queue of tasks where each task requires a StreamWriter
to write its result. Each task also has to have parameters passed to it.
I started with Joseph Albahari's implementation (see below for my modified version).
I replaced the queue of Action
with a queue of Action<T>
where T
is the resource, and pass the resource associated with the thread to the Action
. But, this leaves me with the problem of how to pass parameters to the Action
. Obviously, the Action
must be replaced with a delegate but this leaves the problem of how to pass parameters when tasks are enqueued (from outside the ProducerConsumerQueue
class). Any ideas on how to do this?
class ProducerConsumerQueue<T>
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action<T>> _itemQ = new Queue<Action<T>>();
public ProducerConsumerQueue(T[] resources)
{
_workers = new Thread[resources.Length];
// Create and start a separate thread for each worker
for (int i = 0; i < resources.Length; i++)
{
Thread thread = new Thread(() => Consume(resources[i]));
thread.SetApartmentState(ApartmentState.STA);
_workers[i] = thread;
_workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
for开发者_运维知识库each (Thread worker in _workers)
EnqueueItem(null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action<T> item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}
void Consume(T parameter)
{
while (true) // Keep consuming until
{ // told otherwise.
Action<T> item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(parameter); // Execute item.
}
}
}
The type T
in ProducerConsumerQueue<T>
doesn't have to be your resource it can be a composite type that contains your resource. With .NET4 the easiest way to do this is with Tuple<StreamWriter, YourParameterType>
. The produce/consumer queue just eats and spits out T
so in your Action<T>
you can just use properties to get the resource and the parameter. If you are using Tuple
you would use Item1
to get the resource and Item2
to get the parameter.
If you are not use .NET4, the process is similar but you just create your own class:
public class WorkItem<T>
{
private StreamWriter resource;
private T parameter;
public WorkItem(StreamWriter resource, T parameter)
{
this.resource = resource;
this.parameter = parameter;
}
public StreamWriter Resource { get { return resource; } }
public T Parameter { get { return parameter; } }
}
In fact, making it generic may be overdesigning for your situation. You can just define T to be the type you want it to be.
Also, for reference, there are new ways to do multi-threading included in .NET4 that may applicable to your use case such as concurrent queues and the Parallel Task Library. They can also be combined with traditional approaches such as semaphores.
Edit:
Continuing with this approach, here is a small sample class that demonstrates using:
- a semaphore to control access to a limited resource
- a concurrent queue to manage that resource safely between threads
- task management using the Task Parallel Library
Here is the Processor
class:
public class Processor
{
private const int count = 3;
private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
private Semaphore semaphore = new Semaphore(count, count);
public Processor()
{
// Populate the resource queue.
for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
}
public void Process(int parameter)
{
// Wait for one of our resources to become free.
semaphore.WaitOne();
StreamWriter resource;
queue.TryDequeue(out resource);
// Dispatch the work to a task.
Task.Factory.StartNew(() => Process(resource, parameter));
}
private Random random = new Random();
private void Process(StreamWriter resource, int parameter)
{
// Do work in background with resource.
Thread.Sleep(random.Next(10) * 100);
resource.WriteLine("Parameter = {0}", parameter);
queue.Enqueue(resource);
semaphore.Release();
}
}
and now we can use the class like this:
var processor = new Processor();
for (int i = 0; i < 10; i++)
processor.Process(i);
and no more than three tasks will be scheduled at the same time, each with their own StreamWriter
resource which is recycled.
精彩评论