开发者

Invoking operations asynchronously with expiration

I have a classic asynchronous message dispatching problem. Essentially, I need to asynchronously dispatch messages and then capture the messag开发者_C百科e response when the dispatch is complete. The problem is, I can't seem to figure out how to make any one request cycle self-expire and shortcircuit.

Here is a sample of the pattern I am using at the moment:

Defined delegate for invokation

private delegate IResponse MessageDispatchDelegate(IRequest request);

Dispatch messages with a callback

var dispatcher = new MessageDispatchDelegate(DispatchMessage);
dispatcher.BeginInvoke(requestMessage, DispatchMessageCallback, null);

Dispatch the message

private IResponse DispatchMessage(IRequest request)
{
  //Dispatch the message and throw exception if it times out
}

Get results of dispatch as either a response or an exception

private void DispatchMessageCallback(IAsyncResult ar)
{
  //Get result from EndInvoke(r) which could be IResponse or a Timeout Exception
}

What I can't figure out is how to cleanly implement the timeout/shortcircuit process in the DispatchMessage method. Any ideas would be appreciated


        var dispatcher = new MessageDispatchDelegate(DispatchMessage);

        var asyncResult = dispatcher.BeginInvoke(requestMessage, DispatchMessageCallback, null);
        if (!asyncResult.AsyncWaitHandle.WaitOne(1000, false))
        {
             /*Timeout action*/
        }
        else
        {
            response = dispatcher.EndInvoke(asyncResult);
        }


After lots of head-scratching I was finally able to find a solution for my original question. First off, let me say I got a lot of great responses and I tested all of them (commenting each with the results). The main problems were that all the proposed solutions led to either dead-locks (leading to 100% timeout scenario's) or made an otherwise Asyncronous process syncronous. I don't like answering my own question (first time ever), but in this case I took the advice of the StackOverflow FAQ since I've truely learned my own lesson and wanted to share it with the community.

In the end, I combined the proposed solutions with the invocation of delagates into alternate AppDomains. It's a bit more code and it's a little more expensive, but this avoids the dead-locks and allows fully asyncronous invocations which is what I required. Here are the bits...

First I needed something to invoke a delegate in another AppDomain

/// <summary>
/// Invokes actions in alternate AppDomains
/// </summary>
public static class DomainInvoker
{
    /// <summary>
    /// Invokes the supplied delegate in a new AppDomain and then unloads when it is complete
    /// </summary>
    public static T ExecuteInNewDomain<T>(Delegate delegateToInvoke, params object[] args)
    {
        AppDomain invocationDomain = AppDomain.CreateDomain("DomainInvoker_" + delegateToInvoke.GetHashCode());

        T returnValue = default(T);
        try
        {
            var context = new InvocationContext(delegateToInvoke, args);
            invocationDomain.DoCallBack(new CrossAppDomainDelegate(context.Invoke));

            returnValue = (T)invocationDomain.GetData("InvocationResult_" + invocationDomain.FriendlyName);
        }
        finally
        {
            AppDomain.Unload(invocationDomain);
        }
        return returnValue;
    }

    [Serializable]
    internal sealed class InvocationContext
    {
        private Delegate _delegateToInvoke;
        private object[] _arguments;

        public InvocationContext(Delegate delegateToInvoke, object[] args)
        {
            _delegateToInvoke = delegateToInvoke;
            _arguments = args;
        }

        public void Invoke()
        {
            if (_delegateToInvoke != null)
                AppDomain.CurrentDomain.SetData("InvocationResult_" + AppDomain.CurrentDomain.FriendlyName,
                    _delegateToInvoke.DynamicInvoke(_arguments));
        }
    }
}

Second I needed something to orchestrate collection of the required parameters and collect/resolve the results. This will also define the timeout and worker processes which will be called asyncronously in an alternate AppDomain

Note: In my tests, I extended the dispatch worker method to take random amounts of time to observe that everything worked as expected in both timeout and non-timeout cases

public delegate IResponse DispatchMessageWithTimeoutDelegate(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs);

[Serializable]
public sealed class MessageDispatcher
{
    public const int DefaultTimeoutMs = 500;

    /// <summary>
    /// Public method called on one more many threads to send a request with a timeout
    /// </summary>
    public IResponse SendRequest(IRequest request, int timeout)
    {
        var dispatcher = new DispatchMessageWithTimeoutDelegate(SendRequestWithTimeout);
        return DomainInvoker.ExecuteInNewDomain<Response>(dispatcher, request, timeout);
    }

    /// <summary>
    /// Worker method invoked by the <see cref="DomainInvoker.ExecuteInNewDomain<>"/> process 
    /// </summary>
    private IResponse SendRequestWithTimeout(IRequest request, int timeout)
    {
        IResponse response = null;

        var dispatcher = new DispatchMessageDelegate(DispatchMessage);

        //Request Dispatch
        var asyncResult = dispatcher.BeginInvoke(request, null, null);

        //Wait for dispatch to complete or short-circuit if it takes too long
        if (!asyncResult.AsyncWaitHandle.WaitOne(timeout, false))
        {
            /* Timeout action */
            response = null;
        }
        else
        {
            /* Invoked call ended within the timeout period */
            response = dispatcher.EndInvoke(asyncResult);
        }

        return response;
    }

    /// <summary>
    /// Worker method to do the actual dispatch work while being monitored for timeout
    /// </summary>
    private IResponse DispatchMessage(IRequest request)
    {
        /* Do real dispatch work here */
        return new Response();
    }
}

Third I need something to stand-in for the actual thing that is asyncronously triggering the dispatches

Note: This is just to demonstrate the asyncronous behaviours I required. In reality, the First and Second items above demonstrate the isolation of timeout behaviours on alternate threads. This just demonstrates how the above resources are used

public delegate IResponse DispatchMessageDelegate(IRequest request);

class Program
{
    static int _responsesReceived;

    static void Main()
    {
        const int max = 500;

        for (int i = 0; i < max; i++)
        {
            SendRequest(new Request());
        }

        while (_responsesReceived < max)
        {
            Thread.Sleep(5);
        }
    }

    static void SendRequest(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs)
    {
        var dispatcher = new DispatchMessageWithTimeoutDelegate(SendRequestWithTimeout);
        dispatcher.BeginInvoke(request, timeout, SendMessageCallback, request);
    }

    static IResponse SendRequestWithTimeout(IRequest request, int timeout = MessageDispatcher.DefaultTimeoutMs)
    {
        var dispatcher = new MessageDispatcher();
        return dispatcher.SendRequest(request, timeout);
    }

    static void SendMessageCallback(IAsyncResult ar)
    {
        var result = (AsyncResult)ar;
        var caller = (DispatchMessageWithTimeoutDelegate)result.AsyncDelegate;

        Response response;

        try
        {
            response = (Response)caller.EndInvoke(ar);
        }
        catch (Exception)
        {
            response = null;
        }

        Interlocked.Increment(ref _responsesReceived);
    }
}

In retrospect, this approach has some unintended consequences. Since the worker method occurs in an alternate AppDomain, this adds addition protections for exceptions (although it can also hide them), allows you to load and unload other managed assemblies (if required), and allows you to define highly constrained or specialized security contexts. This requires a bit more productionization but provided the framework to answer my original question. Hope this helps someone.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜