开发者

how to use Net::Stomp and transactions with receive_frame

I'm in the process of adapting some existing code using Net::Stomp from being able to handle a single topic to being able to work on multiple topics. Can anyone tell me if this approach is even possible? It's not working now because where it expects a transaction receipt, it's getting the first message on another topic. I'd like to know if I'm just barking up the wrong tree before I go about trying to fix it.

Here's what the workflow looks like:

# first subscribe to three different queues
for $job (qw/ JOB1 JOB2 JOB3 /){
$stomp->subscribe({
   "ack" => "client",
   "destination" => "/queue/$job"
});

# listen on those three channels...
while($stomp->can_read){

   $frame = $stomp->receive_frame();

   # ... receives a message for JOB1
   # and to start a transaction send a BEGIN frame that looks like this:

    bless({
    command => "BEGIN",
    headers => {
             receipt => "0002",
            transaction => "0001",
       },
    }, "Net::Stomp::Frame")

   # Then looks for a receipt on that frame by calling
   $receipt = $stomp->receive_frame()

Unfortunately, where it's expecting a RECEIPT frame, it actually gets the next MESSAGE frame that's waiting in the JOB2 queue.

My question is, is there any way for that to work, to be both subscribed to multiple topics and to be able to receive receipts on transactions? Or is there a better/more standard way to handle it?

Any tips or suggestions would be most welcome, thanks! I'm cross-posting this question on the ActiveMQ list too, hope that's ok :-/

* update *

Here's a complete repro case:

use Net::Stomp;

use strict;

my $stomp = Net::Stomp->new( { hostname => 'bpdeb', port => '61612' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );

# pre-populate the two queues
$stomp->send( { destination => '/queue/FOO.BAR', body => 'test message' } );
$stomp->send( { destina开发者_运维知识库tion => '/queue/FOO.BAR2', body => 'test message' } );


# now subscribe to them
$stomp->subscribe({ destination => '/queue/FOO.BAR',
                   'ack'        => 'client',
                   'activemq.prefetchSize' => 1
});
$stomp->subscribe({ destination => '/queue/FOO.BAR2',
                   'ack'        => 'client',
                   'activemq.prefetchSize' => 1
});

# read one frame, then start a transaction asking for a receipt of the 
# BEGIN message
while ($stomp->can_read()){

    my $frame = $stomp->receive_frame; 
    print STDERR "got frame ".$frame->as_string()."\n";


    print STDERR "sending a BEGIN\n";
    my($frame) = Net::Stomp::Frame->new({
        command => 'BEGIN',
            headers => {
            transaction => 123,
            receipt     => 456,
        },
    });

    $stomp->send_frame($frame);

    my $expected_receipt = $stomp->receive_frame;
    print STDERR "expected RECEIPT but got ".$expected_receipt->as_string()."\n";

    exit;
}

This outputs (with the details elided)

got frame MESSAGE
destination:/queue/FOO.BAR
....

sending a BEGIN

expected RECEIPT but got MESSAGE
destination:/queue/FOO.BAR2
....

Looking at the network traffic, as soon as a SUBSCRIBE request is sent, the first message in the queue goes over the wire to the client. So the first message from FOO.BAR2 is already waiting in the client's network buffer when I send the BEGIN message, and the client reads the FOO.BAR2 straight from its buffer.

So either I'm doing something wrong, or it can't work this way.


Ok, I tried it and it works fine. But you are the one receiving a frame. So why should the server send you a receipt frame?

You're setting "ack" => "client" and that means, that the server will consider the frame as "not delivered" until you say otherwise. Just change the line $receipt = $stomp->receive_frame() to $stomp->ack( { frame => $frame } );.

Update

Ah, ok you want to secure the ack by using a transaction. So let's have a look at the source: There is a method send_transactional which does probably what you want to do (but it's using a SEND frame instead of ACK).

Perhaps you should also have a look at the submitted patch from cloudmark, which adds several "security features" to the module (unfortunately the module author didn't say anything about merging that patch, when i asked him).

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜