Proper use of Message Queue on 2008R2
I am not a programmer but I am trying to help them out by giving them some guidance. We no longer have any in house expertise on msmq. We are trying to use this to integrate some functions with a scheduling application.
The scheduling app fires off a job by making a webcall using a custom built dll. The dll calls the weburl. The web app will run its task and send updates to a website about the task it performed. Website writes the message to the queue. The dll which called the site is monitoring the queue for messages with the label that was assigned to that job. When it receives the final status message it closes.
We are getting the following message every few hours. We run close to 100 jobs per hour that use this method. In the code listed at the bottom, the jobid corresponds to the label for the message in the message queue. Each job is issued a jobid at the start and will use that as the label for each message it sends to the msmq for that job.
System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
at System.Messaging.MessageEnumerator.get_Current()
Here is the code for it.
while ( running )
{
// System.Console.WriteLine( "Begin Peek" );
messageQueue.Peek();
//System.Console.WriteLine( "End Peek" );
messageQueue.MessageReadPropertyFilter.SetAll();
using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
{
enumerator.Reset();
while ( enumerator.MoveNext() )
{
Message msg = enumerator.Current;
if ( msg.Label.Equals( this.jobid ) )
{
StringBuilder sb = new StringBuilder();
/*
try
{
sb.Append( "Message Source: " );
//sb.Append( msg.SourceMachine );
sb.Append( " Sent: " );
sb.Append( msg.SentTime );
sb.Append( " Label " );
sb.Append( msg.Label );
sb.Append( " ID: " );
sb.Append( msg.Id );
sb.Append( " CorrelationID: " );
sb.Append( msg.CorrelationId );
sb.Append( " Body Type: " );
sb.Append( msg.BodyType );
}
catch ( Exception )
{
throw;
}
finally
{
System.Console.WriteLine( sb.ToString() );
}
*/
//System.Console.WriteLine( "Receiving Message started" );
using ( Message message = messageQueue.ReceiveById( msg.Id ) )
{
//System.Console.WriteLine( "Receiving Message Complete" );
//sb = new StringBuilder();
string bodyText = string.Empty;
try
{
System.IO.StringWriter sw = new System.IO.StringWriter( sb );
System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );
while ( !sr.EndOfStream )
{
sw.WriteLine( sr.ReadLine() );
}
sr.Close();
sw.Close();
bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
int indx = bodyText.IndexOf( ',' );
string tokens = bodyText.Substring( indx + 1 );
indx = tokens.IndexOf( ',' );
string command = tokens.Substring( 0, indx );
tokens = tokens.Substring( indx + 1 );
if ( command.Equals( COMMAND_STARTED ) )
{
System.Console.WriteLine( "STARTED " + tokens );
}
else if ( command.Equals( COMMAND_UPDATE ) )
{
System.Console.WriteLine( tokens );
}
else if ( command.Equals( COMMAND_ENDED_OK ) )
{
System.Console.WriteLine( tokens );
System.Console.WriteLine( "WEBJOB: Success" );
final开发者_Go百科Results = new FinalResults( 0, 0, "Success" );
running = false;
}
else if ( command.Equals( COMMAND_ENDED_WARNING ) )
{
System.Console.WriteLine( tokens );
System.Console.WriteLine( "WEBJOB: Warning Issued" );
finalResults = new FinalResults( 1, 1, "Warning" );
running = false;
}
else if ( command.Equals( COMMAND_ENDED_FAIL ) )
{
System.Console.WriteLine( tokens );
System.Console.WriteLine( "WEBJOB: Failure" );
finalResults = new FinalResults( 2, 16, "Failure" );
running = false;
}
}
catch ( Exception )
{
throw;
}
finally
{
//System.Console.WriteLine( "Body: " + bodyText );
}
}
}
}
}
}
return finalResults;
}
MessageQueue messageQueue = null;
string webServiceURL = "";
Dictionary<string, string> parms = new Dictionary<string, string>();
string jobid = "NONE";
kprobst's explanation is likely what is happening. Even if you are seeing this particular message is in the queue, if a different application(or different instance of the same application) picks a(any) message from this queue, that will invalidate the cursor.
Inherently this code is not designed to work if multiple processes are feeding off the same queue.
This typically means that the message you're Receiving() is being removed by something else before the receive operation can be completed. Another application, or another thread in the same process as your code using a different queue reference.
Is it possible that you might have two instances of the processor code (I guess it's a console app) running at the same time? On the same or different machines? Or some other application or tool removing messages from the queue?
There used to be a bug in one of the pre-release versions of .NET 2.0 that would cause this under some stress conditions but as far as I remember it was fixed before they shipped.
This is failing because of a concurrency issue in the internal method ReceiveCurrent of MessageQueue. The exception stack trace shows the call originated at enumerator.Current line and exception happened at ReceiveCurrent. Enumerator.Current calls ReceiveCurrent with "peek" option. You can ask, which I had also when I had encountered the same problem, how can a peek fail with "Message is already received" error? It is only trying to peek the next message that is not already received anyway? Well answer to it lies in the ReceiveCurrent code, which is avaliable for review here: https://referencesource.microsoft.com/#System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references
ReceiveCurrent first makes a StaleSafeReceive call to peek the next message. But if this call returns that it needs more memory to receive the whole message (the line with "while (MessageQueue.IsMemoryError(status)" in its source code), it allocates the needed memory and makes another StaleSafeReceive call to get the message. This is very classical Win32 API usage pattern due to its being C based eventually.
The problem here is, if between the first and second call to StaleSafeReceive inside ReceiveCurrent another process or thread "receives", i.e. removes that message from the queue, the second call throws this exact exception. And that is how a "peek" operation fails. Note that it could be any message that is being scanned over by the enumerator causing the exception, not the message that is being looked for. Which explains why the message with that job id is still there int the queue after exception is thrown and method fails.
What can be done is to guard the enumerator.Current call with a try catch and if this particular exception is caught, just continue with the enumeration with the next available message in queue.
I had used the Cursor object rather than the enumerator, but it runs into same problem. But with Cursor usage there is another way to reduce the risk of this happening, that is while scanning/peeking the message is to turn off all the unneeded properties of the current Queue object's MessagePropertyFilter, especially the Body property. Because during peeking the body is usually not needed to be received, but most often the body of the message causes memory to be reallocated and requiring the second StaleSafeReceive call inside ReceiveCurrent. Still a try catch for this exception would be needed with direct Cursor usage too with the peek calls.
精彩评论