开发者

Delayed delivery with ActiveMQ 5.3 and NMS

I'm attempting to use the new delayed delivery functionality from NMS.

The schedulerSupport attribute has been set in the config file, and I'm using the following code to attempt to delay delivery of a message until the date/time chosen by the user is reached.

The code (which does not seem to be working currently) is as follows:

var timeDelay = dateTimePicker.Value.Subtract(DateTime.Now).TotalMilliseconds;     
var message = topi开发者_Python百科cPublisher.CreateTextMessage();
message.Properties["AMQ_SCHEDULED_DELAY"] = timeDelay;
message.Text = CM.ToXMLString();

topicPublisher.Send(message);

Can you point out what might be incorrect within this example?

Many thanks!


I don't see anything obvious from the code provided.

You could try turning up the logging in the broker to see if the scheduler receives the message and that the values are correct, that would also confirm that you have indeed enabled scheduler support. You could also try creating a small java program that does something similar to determine if the NMS client is behaving correctly.

I assume you have a consumer running and its connection object has been started?

Regards Tim.

www.fusesource.com


Thanks for your help with this Tim. I have found the cause of the issue. Slightly delayed response on my part - I had to concentrate on other areas of work, but I have today managed to come back to this.

The issue is the C# ".TotalMilliseconds" function - this returns a fractional total.partial milliseconds value, which it is apparent that ActiveMQ is not fond of.

Now I'm converting this millisecond value to an integer, the delayed messaging is working as required.

Log Excerpt Below:

2011-03-07 10:14:44,186 | DEBUG | quasar adding destination: topic://REDACTED | org.apache.activemq.broker.region.AbstractRegion | ActiveMQ Transport: tcp:///REDACTED:50161
2011-03-07 10:14:44,576 | DEBUG | Error occured while processing async command: ActiveMQTextMessage {commandId = 4, responseRequired = false, messageId = ID:HL003323-50159-634350896828327757-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:HL003323-50159-634350896828327757-0:0:1:1, destination = topic://REDACTED, transactionId = null, expiration = 0, timestamp = 1299492884437, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1ae0436, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {targetModule=HL.Services.Blackbird.OrderManager, AMQ_SCHEDULED_DELAY=53564.4233}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = <?xml version="1.0" encoding="utf-16"?>
<Com...mandMessage>}, exception: java.lang.NullPointerException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///REDACTED:50161
java.lang.NullPointerException
        at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:179)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:462)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:595)
2011-03-07 10:14:44,577 | WARN  | Async error occurred: java.lang.NullPointerException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///REDACTED:50161
java.lang.NullPointerException
        at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:179)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:462)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:595)
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜