开发者

How to implement a message pump in Non-UI thread in .NET?

I am reading this blo开发者_C百科g: Use Threads Correctly, and I am wondering:

How to implement a message (Note: I don't mean Windows Message here) pump in a non-UI thread?

What I want is that the message can be an object or a command, say an Action<T>/Func<T>, etc. Do I have to use separate queue for different type of message? Say one queue for object, one queue for Action<T>/Func<T>?

Given that the type of messages vary, how to implement it?

EDIT:

What I want to complete is a producer/consumer model, producer/consumer share queue for communication, the queue can not only contain object for the consumer to consume, but also some 'command' can be passed to the consumer for execution.


The term "message pump" refers specifically to GUI threads in Windows (even if they don't actually create a GUI).

It sounds like maybe you're talking about a message queue, but even a message queue isn't going to accept actions or functions, it's going to accept specific kinds of messages.

I've certainly never heard of passing a Func<T> to a message queue - what would be done with the result? Are you going to store it somewhere? Send a message back to the caller? It doesn't seem very useful to me.

Whatever it is that you're trying to do sounds like it would be better handled by the built in .NET Thread Pool. That is actually designed for executing code asynchronously, as opposed to queuing and processing messages.


Made this a seperate answer for code formatting

Ok so after reading your update I think you want what I describe in the "second case" you simply want

Broadcast<T>("Foo") 

where T is a delegate.

Then your consumer will do

Subscribe<T>("Foo",HandlerMethod)

So a producer consumer scenario would look like this

internal static class MessagePump
    {

        public static void Subscribe<T>(String foo, Action<String> handlerMethod)
        {
            throw new NotImplementedException();
        }

        public static void BroadcastMessage<T>(String foo, Action<String> someAction)
        {
            throw new NotImplementedException();
        }
    }

    public class Producer
    {
        void SendMessage()
        {
            MessagePump.BroadcastMessage<Action<String>>("Foo", SomeAction);
        }

        void SomeAction(String param)
        {
            //Do Something
        }
    }


    public class Consumer
    {

        public Consumer()
        {
            MessagePump.Subscribe<Action<String>>("Foo", HandlerMethod);
        }

        void HandlerMethod(String param)
        {
            // Do Something
        }

    }

This is just something off the top of my head and is a contrived example so take it with a grain of salt. This is nearly exactly what I am doing in the courier framework I posted earlier. You may want to dive into that code to get a more concrete implementation example.

You need to think about how you will manage consumers, how you validate Broadcast and subscriptions and for your specific case how are you going to ensure the delegate you are passing around is invoked correctly? Or do you care?

Does this help?


I am a little confused on what your end goal is but it seems like you are looking for two things.

First in the case where you want to pass objects around (presumably you are thinking of a message queue for the purposes of cross thread communication?) it sounds like you want a pub/sub type implementation? That is pretty well documented and there are many examples out there in C#

For the second case where you want to pass a delegate as the payload of the message I am guessing you are trying to implement some sort of two way communication between publisher and subscriber? Something like a callback?

This is where I am getting confused. What exactly is the question / problem here? You could implement a single message queue that understands how to handle different message payload types. Something like BroadcastMessage where T is object for your first case and delegate (Func / action) for the second case.

I have a codeplex project that is a simple implementation of a messaging queue that I use for specific purposes within MVC / MVVM applications. Not sure this is what you are looking for but it might help you clarify your question further?

http://courier.codeplex.com/


Well, you'll need Thread.SetApartmentState() to switch the Thread to STA before you start it, then call Application.Run() in the thread function to start the message loop.

This has nothing to do with the Action and Func delegates though. The message queue is an internal Windows structure that stores mouse and keyboard messages. It is not suitable to store your own objects. Not sure where you want to go with this.


A regular System.Collections.Generic.Queue<> object can store any kind of object, including an object of a class that represents a "command". Make it thread-safe with a ReaderWriterLockSlim.


Example of a rudimentary message queue

I'd like to share a rudimentary message queue that I once wrote for my own researches. This might help to have an idea of a possible message queue implementation. There is no claim for completeness, integrity or absence of errors, and probably in most cases there are better solutions than using a custom message queue.

public void run()
{
    while (running)
    {
        mainLoopWaitHandle.WaitOne();

        EventHandlerFunction f = null;
        while (running)
        {
            f = popEvent();
            if (f == null) break;

            f();
        }
    }
}

private void pushEvent(EventHandlerFunction handlerFunction)
{
    lock (eventQueueLock)
    {
        int b = (queueInIndex + 1) & 255;
        if (b == queueOutIndex)
        {
            throw new Exception("Buffer overflow in event queue.");
        }

        eventQueue[queueInIndex] = handlerFunction;
        queueInIndex = b;

        mainLoopWaitHandle.Set();
    }
}


private EventHandlerFunction popEvent()
{
    EventHandlerFunction ret = null;

    lock(eventQueueLock)
    {
        int b = (queueOutIndex + 1) & 255;

        if (queueOutIndex == queueInIndex)
        {
            mainLoopWaitHandle.Reset();
            return null;
        }

        ret = eventQueue[queueOutIndex];
        eventQueue[queueOutIndex] = null;
        queueOutIndex = b;
    }

    return ret;
}

The main thread starts using the message queue by running run(). run() is a blocking method that does not return until the class attribute running is set to false. This can be done from an invoker method, explained below.

For invoking a method on the main thread, there are actually two methods needed. One EventHandlerFunction (let's say method A()) that is actually getting invoked on the main thread, and method B() that is executed on the caller's thread. I see this analogous to the UI's functions where B() is a method of the form, and A() get's Invoked from within B().

B() invokes A() by calling

pushEvent(A);

while pushEvent() and popEvent() are thread save.

The purpose of method B() is to store any objects or parameters on some data structure for data transfer, that represents the parameters (or todo's) for method A(). This structure can be a List<> with work items for example. Method A() and B() both need to take care of proper locking this structure for thread safety.

Method A() should also take into account that a buffer could run full either on the message queue or it's own data transfer structure) and needs to care about the consequences (e.g. drop the invoke or block the call until there is room on the stack).

Hope it helps. Contributions welcome.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜