Safely synchronizing a COM thread
I've done a lot with multithreading in the past, but I'm fairly new to COM. Anyway, here's my issue:
I create a worker thread, which registers as an STA, and creates a COM object. Then the worker thread and the main thread try to communicate with each other. Using CoMarshalInterThreadInterfaceInStream
and CoGetInterfaceAndReleaseStream
, I can get the threads to call methods on the COM objects in the other thread.
Here's what the worker thread looks like:
void workerThread()
{
CoInitialize(NULL);
MyLib::IFooPtr foo = ...; // create my COM object
// Marshall it so the main thread can talk to it
HRESULT hr = CoMarshalInterThreadInterfaceInStream(foo.GetIID(),
foo.GetInterfacePtr(),
&m_stream);
if (FAILED(hr)) {
// handle failure
}
// begin message loop, to keep this STA alive
MSG msg;
BOOL bRet;
while( (bRet = GetMessage( &msg, NULL, 0, 0 开发者_运维技巧)) != 0)
{
if (bRet == -1) break;
DispatchMessage(&msg);
}
}
In the main thread:
// launch the thread
m_worker = boost::thread (&workerThread);
// get the interface proxy
MyLib::IFooPtr foo;
LPVOID vp (NULL);
HRESULT hr = CoGetInterfaceAndReleaseStream(m_stream, foo.GetIID(), &vp);
if (SUCCEEDED(hr)) foo.Attach(static_cast<MyLib::IFoo*>(vp));
This creates the object (which takes a while to initialize), and allows the main thread to talk to it, and everything is properly synchronized with the COM Apartment stuff. As far as I can tell from reading msdn, this seems to be the right way to do things. Now the main thread can use its proxy to call methods on my COM object, and the worker thread will receive those calls over the message queue, properly dispatching them.
However, what about synchronizing these threads?
Obviously in this case I want the main thread to wait to call CoGetInterfaceAndReleaseStream
until after the worker thread has created that stream via CoMarshalInterThreadInterfaceInStream
. But how can I safely do that?
From MSDN, I should be using something like MsgWaitForMultipleObjects
, so I can wait for my_condition OR new_message_arrived, and then I can do something like:
// verbatim from msdn
while (TRUE)
{
// wait for the event and for messages
DWORD dwReturn = ::MsgWaitForMultipleObjects(1,
&m_hDoneLoading, FALSE, INFINITE, QS_ALLINPUT);
// this thread has been reawakened. Determine why
// and handle appropriately.
if (dwReturn == WAIT_OBJECT_0)
// our event happened.
break ;
else if (dwReturn == WAIT_OBJECT_0 + 1)
{
// handle windows messages to maintain
// client liveness
MSG msg ;
while(::PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
::DispatchMessage(&msg) ;
}
}
But how do I mix boost::thread.join()
and boost::condition.wait()
with MsgWaitForMultipleObjects
? Is that even possible, or do I have to do something else to avoid a race condition?
Your main thread has a message queue (must be, since is an STA host), why not simply post a message to it, PostThreadMessage
? Post an user message (WM_USER +X) and your normal main thread message pump can handle this user message as a notification that the COM object has marshaled the interface into the stream and the main thread is safe to call CoGetInterfaceAndReleaseStream
.
I must call out though that with your current design your worker thread does basically nothing more than just run an additional message pump. Any call to any method on your interface from the main thread will block, wait for the worker thread to pick up the message from its message queue, process the call, respond, and then the main thread will resume. All operations will be at least as slow as having the COM object hosted in the main thread, plus the overhead of COM marshaling back and forth between the two STAs. Basically there is no concurrency whatsoever between the two threads, because of how COM STA works. Are you sure is this what you want?
Edit
(omitting a bunch of details like number of threads, timeout handling, assignment of a stream/IID/CLSID for each worker etc etc)
in the .h:
HANDLE m_startupDone;
volatile int m_threadStartCount;
worker thread:
void workerThread()
{
CoInitialize(NULL);
MyLib::IFooPtr foo = ...; // create my COM object
// Marshall it so the main thread can talk to it
HRESULT hr = CoMarshalInterThreadInterfaceInStream(foo.GetIID(),
foo.GetInterfacePtr(),
&m_stream);
if (FAILED(hr)) {
// handle failure
// remember to decrement and signal *even on failure*
}
if (0 == InterlockedDecrement(&m_threadStartCount))
{
SetEvent (m_startupDone);
}
// begin message loop, to keep this STA alive
MSG msg;
BOOL bRet;
while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0)
{
if (bRet == -1) break;
DispatchMessage(&msg);
}
}
in the main thread:
m_startupDone = CreateEvent (NULL, FALSE, FALSE, NULL);
m_threadStartCount = <number of workerthreads>
// launch the thread(s)
m_worker = boost::thread (&workerThread);
m_worker2 = boost::thread (&workerThread);
...
// now wait for tall the threads to create the COM object(s)
if (WAIT_OBJECT0 != WaitForSingleObject(m_startupDone, ...))
{
// handle failure like timeout
}
// By now all COM objects are guaranteed created and marshaled, unmarshall them all in main
// here must check if all threads actually succeeded (could be as simple as m_stream is not NULL)
// get the interface proxy
MyLib::IFooPtr foo;
LPVOID vp (NULL);
HRESULT hr = CoGetInterfaceAndReleaseStream(m_stream, foo.GetIID(), &vp);
if (SUCCEEDED(hr)) foo.Attach(static_cast<MyLib::IFoo*>(vp));
精彩评论