开发者

Azure Queue Worker Role Multi Thread Example

We have 4 Azure Queues which gets populated either by direct REST API or a WCF Service that we provide.

  1. We would like to have ONE worker role to monitor all these 4 queues.
  2. I'm thinking of using multi thread that reads the queue name etc. from config and spins the process method (which reads the message from queue and does the processing)

Could someone please provide me an example or guidance on how to achieve this in a W开发者_运维百科orker role please?

Not too sure if above can be achieved without multi threading as I'm quiet new to multi threading.

Thank you


You can fire off different threads for the different tasks, but also consider the non-threaded approach (which may perform better or worse depending on what you do with the messages):

while (true)
{
    var msg = queue1.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue1.DeleteMessage(msg);
    }
    msg = queue2.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue2.DeleteMessage(msg);
    }
    // ...
    if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}


Here is our current implementation to do exactly what you're requesting but in a better way (or so we think). That said, this code needs some heavy cleaning-up still. This is functional version 0.1 of this, though.

public class WorkerRole : RoleEntryPoint
{
    public override void Run()
    {
        var logic = new WorkerAgent();
        logic.Go(false);
    }

    public override bool OnStart()
    {
        // Initialize our Cloud Storage Configuration.
        AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);

        return base.OnStart();
    }
}

public class WorkerAgent
{
    private const int _resistance_to_scaling_larger_queues = 9;
    private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
                                                       {
                                                           {typeof (Queue1.Processor), 1},
                                                           {typeof (Queue2.Processor), 1},
                                                           {typeof (Queue3.Processor), 1},
                                                           {typeof (Queue4.Processor), 1},
                                                       };

    private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
    private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
    protected TimeSpan CurrentDelay { get; set; }

    public Func<string> GetSpecificQueueTypeToProcess { get; set; }

    /// <summary>
    /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
    /// </summary>
    public Dictionary<Type, int> QueueWeights
    {
        get
        {
            return _queueWeights;
        }
        set
        {
            _queueWeights = value;
        }
    }

    public static TimeSpan QueueWeightCalibrationDelay
    {
        get { return TimeSpan.FromMinutes(15); }
    }


    protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();


    protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }

    public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
    {
        CurrentDelay = _minDelay;
        GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
    }

    protected IProcessQueues CurrentProcessor { get; set; }

    /// <summary>
    /// Processes queue request(s).
    /// </summary>
    /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
    public void Go(bool onlyProcessOnce)
    {
        if (onlyProcessOnce)
        {
            ProcessOnce(false);
        }
        else
        {
            ProcessContinuously();
        }
    }

    public void ProcessContinuously()
    {
        while (true)
        {
            // temporary hack to get this started.
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// Attempts to fetch and process a single queued request.
    /// </summary>
    public void ProcessOnce(bool shouldDelay)
    {
        PopulateQueueMetaData(QueueWeightCalibrationDelay);

        if (shouldDelay)
        {
            Thread.Sleep(CurrentDelay);
        }

        var typesToPickFrom = new List<Type>();
        foreach(var item in QueueWeights)
        {
            for (var i = 0; i < item.Value; i++)
            {
                typesToPickFrom.Add(item.Key);
            }
        }

        var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
        var typeToTryAndProcess = typesToPickFrom[randomIndex];

        CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
        CleanQueueDelays();

        if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
        {
            var errors = CurrentProcessor.Go();

            var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
                               ? _maxDelay // the queue was empty
                               : _minDelay; // else

            QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
        }
        else
        {
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// This method populates/refreshes the QueueMetaData collection.
    /// </summary>
    /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
    private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
    {
        if (QueueMetaData == null)
        {
            QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
        }

        var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
        var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
        var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
        var results = new Dictionary<Type, AzureQueueMetaData>();

        foreach (var queueProcessorType in queuesWithoutMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);

                    QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
                                                  ? 1
                                                  : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
                }
            }
        }

        foreach (var queueProcessorType in expiredQueueMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);
                }
            }
        }

        QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
    }

    private void CleanQueueDelays()
    {
        QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
    }
}

With this, we have a separate class that knows how to process each queue, and it implements IProcessQueues. We load up the _queueWeights collection with each of those types that we want it to process. We set the _resistance_to_scaling_larger_queues constant to control how we want this to scale. Note that this scales on a logarithmic fashion (see the PopulateQueueMetaData method). No queue has a weight of less than 1, even if it has 0 items. If you set PopulateQueueMetaData to 10, then for every increase in magnitude by an order of 10, that type's "weight" gets increased by 1. For example, if you have QueueA with 0 items, QueueB with 0 items, and QueueC with 10 items, then your respective weights are 1, 1, and 2. This means QueueC has a 50% chance of being processed next while QueueA and QueueB each only have a 25% chance to be processed. If QueueC has 100 items, then your weights are 1, 1, 3 and your chances to be processed are 20%, 20%, 60%. This ensures that your empty queues don't get forgotten about.

The other thing this does is that it has _minDelay and _maxDelay. If this code thinks a queue has at least 1 item in it, then it will keep processing it as fast as at the _minDelay rate. However, if it last had 0 items in it, then it will not allow it to be processed faster than the _maxDelay rate. So this means if the random number generator pulls up the queue (regardless of weight) that has 0 items, it will simply skip trying to process it and move on to the next iteration. (Some additional optimization can be put into this part for better storage transaction efficiency but this is a neat little addition.)

We have a couple custom classes in here (such as AzureQueue and AzureQueueMetaData) - one is essentially a wrapper for a CloudQueue and the other stores some info, such as the approximate count of the Queue - nothing interesting in there (just a way to to simplify code).

Again, I don't call this "pretty" code but some fairly clever concepts are both implemented and functional in this code. Use it for whatever reason you wish to. :)

Lastly, writing this code like this allows us to have a single project that can process MANY more queues. If we find that this simply isn't keeping up, we can easily scale it to a larger number of instances and that scales up for ALL queues. In a minimal scenario, you could deploy one instance of this to monitor 3 queues. However, if the 4th queue starts to affect performance (or you need higher availability), then increase this up to 2 instances. Once you hit 15 queues, add in a third. 25 queues, add a 4th instance. Get a new customer and need to process MANY queue requests all across the systems, that's fine. Spin this one role up to 20 instanes until it's done and then spin them back down. Have a particularly nasty queue? Comment that queue out of _queueWeights collection, deploy to manage the rest of your queues, then redeploy it again with all other queues except this one commented out of the _queueWeights collection, and then deploy it again to a different set of instances and do your debugging without a) having the other QueueProcessors interfering with your debugging and b) your debugging interfering with your other QueueProcessors. Ultimately, this provides a LOT of flexibility and efficiencies.


Inside of the while loop of the worker role, start 4 threads as if you are writing a multi-threaded C# application. Of course, you need to have four different thread functions defined and those functions should have separate while loops to poll queues. At the end of the worker's while loop, just wait threads to finish.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜