| Net-Stomp documentation | Contained in the Net-Stomp distribution. |
Net::Stomp - A Streaming Text Orientated Messaging Protocol Client
# send a message to the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
$stomp->send(
{ destination => '/queue/foo', body => 'test message' } );
$stomp->disconnect;
# subscribe to messages from the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
$stomp->subscribe(
{ destination => '/queue/foo',
'ack' => 'client',
'activemq.prefetchSize' => 1
}
);
while (1) {
my $frame = $stomp->receive_frame;
warn $frame->body; # do something here
$stomp->ack( { frame => $frame } );
}
$stomp->disconnect;
# write your own frame
my $frame = Net::Stomp::Frame->new(
{ command => $command, headers => $conf, body => $body } );
$self->send_frame($frame);
# connect with failover supporting similar URI to ActiveMQ
$stomp = Net::Stomp->new({ failover => "failover://tcp://primary:61616" })
# "?randomize=..." and other parameters are ignored currently
$stomp = Net::Stomp->new({ failover => "failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false" })
# Or in a more natural perl way
$stomp = Net::Stomp->new({ hosts => [
{ hostname => 'primary', port => 61616 },
{ hostname => 'secondary', port => 61616 },
] });
This module allows you to write a Stomp client. Stomp is the Streaming Text Orientated Messaging Protocol (or the Protocol Briefly Known as TTMP and Represented by the symbol :ttmp). It's a simple and easy to implement protocol for working with Message Orientated Middleware from any language. Net::Stomp is useful for talking to Apache ActiveMQ, an open source (Apache 2.0 licensed) Java Message Service 1.1 (JMS) message broker packed with many enterprise features.
A Stomp frame consists of a command, a series of headers and a body - see Net::Stomp::Frame for more details.
For details on the protocol see http://stomp.codehaus.org/Protocol.
To enable the ActiveMQ Broker for Stomp add the following to the activemq.xml configuration inside the <transportConnectors> section:
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
To enable the ActiveMQ Broker for Stomp and SSL add the following inside the <transportConnectors> section:
<transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
For details on Stomp in ActiveMQ See http://activemq.apache.org/stomp.html.
The constructor creates a new object. You must pass in a hostname and a port or set a failover configuration:
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
If you want to use SSL, make sure you have IO::Socket::SSL and pass in the SSL flag:
my $stomp = Net::Stomp->new( {
hostname => 'localhost',
port => '61612',
ssl => 1,
} );
If you want to pass in IO::Socket::SSL options:
my $stomp = Net::Stomp->new( {
hostname => 'localhost',
port => '61612',
ssl => 1,
ssl_options => { SSL_cipher_list => 'ALL:!EXPORT' },
} );
There is experiemental failover support in Net::Stomp. You can specify failover
in a similar maner to ActiveMQ
(http://activemq.apache.org/failover-transport-reference.html) for
similarity with Java configs or using a more natural method to perl of passing
in an array-of-hashrefs in the hosts parameter.
Currently when ever Net::Stomp connects or reconnects it will simply try the next host in the list.
This connects to the Stomp server. You may pass in a login and
passcode options.
You may also pass in 'client-id', which specifies the JMS Client ID which is used in combination to the activemqq.subscriptionName to denote a durable subscriber.
$stomp->connect( { login => 'hello', passcode => 'there' } );
This sends a message to a queue or topic. You must pass in a destination and a body.
$stomp->send(
{ destination => '/queue/foo', body => 'test message' } );
To send a BytesMessage, you should set the field 'bytes_message' to 1.
This sends a message in transactional mode and fails if the receipt of the message is not acknowledged by the server:
$stomp->send_transactional(
{ destination => '/queue/foo', body => 'test message' }
) or die "Couldn't send the message!";
If using ActiveMQ, you might also want to make the message persistent:
$stomp->send_transactional(
{ destination => '/queue/foo', body => 'test message', persistent => 'true' }
) or die "Couldn't send the message!";
This disconnects from the Stomp server:
$stomp->disconnect;
This subscribes you to a queue or topic. You must pass in a destination.
The acknowledge mode defaults to 'auto', which means that frames will be considered delivered after they have been sent to a client. The other option is 'client', which means that messages will only be considered delivered after the client specifically acknowledges them with an ACK frame.
Other options:
'selector': which specifies a JMS Selector using SQL 92 syntax as specified in the JMS 1.1 specificiation. This allows a filter to be applied to each message as part of the subscription.
'activemq.dispatchAsync': should messages be dispatched synchronously or asynchronously from the producer thread for non-durable topics in the broker. For fast consumers set this to false. For slow consumers set it to true so that dispatching will not block fast consumers.
'activemq.exclusive': Would I like to be an Exclusive Consumer on a queue.
'activemq.maximumPendingMessageLimit': For Slow Consumer Handlingon non-durable topics by dropping old messages - we can set a maximum pending limit which once a slow consumer backs up to this high water mark we begin to discard old messages.
'activemq.noLocal': Specifies whether or not locally sent messages should be ignored for subscriptions. Set to true to filter out locally sent messages.
'activemq.prefetchSize': Specifies the maximum number of pending messages that will be dispatched to the client. Once this maximum is reached no more messages are dispatched until the client acknowledges a message. Set to 1 for very fair distribution of messages across consumers where processing messages can be slow.
'activemq.priority': Sets the priority of the consumer so that dispatching can be weighted in priority order.
'activemq.retroactive': For non-durable topics do you wish this subscription to the retroactive.
'activemq.subscriptionName': For durable topic subscriptions you must specify the same clientId on the connection and subscriberName on the subscribe.
$stomp->subscribe(
{ destination => '/queue/foo',
'ack' => 'client',
'activemq.prefetchSize' => 1
}
);
This unsubscribes you to a queue or topic. You must pass in a destination:
$stomp->unsubcribe({ destination => '/queue/foo' });
This blocks and returns you the next Stomp frame.
my $frame = $stomp->receive_frame; warn $frame->body; # do something here
The header bytes_message is 1 if the message was a BytesMessage.
By default this method will block until a frame can be returned. If you wish to
wait for a specified time pass a timeout argument:
# Wait half a second for a frame, else return undef
$stomp->receive_frame({ timeout => 0.5 })
This returns whether there is new data is waiting to be read from the STOMP server. Optionally takes a timeout in seconds:
my $can_read = $stomp->can_read;
my $can_read = $stomp->can_read({ timeout => '0.1' });
undef says block until something can be read, 0 says to poll and return
immediately.
This acknowledges that you have received and processed a frame (if you are using client acknowledgements):
$stomp->ack( { frame => $frame } );
If this module does not provide enough help for sending frames, you may construct your own frame and send it:
# write your own frame
my $frame = Net::Stomp::Frame->new(
{ command => $command, headers => $conf, body => $body } );
$self->send_frame($frame);
Leon Brocard <acme@astray.com>, Thom May <thom.may@betfair.com>, Ash Berlin <ash_github@firemirror.com>, Michael S. Fischer <michael@dynamine.net> Vigith Maurice <vigith@yahoo-inc.com>
Copyright (C) 2006-9, Leon Brocard Copyright (C) 2009, Thom May, Betfair.com Copyright (C) 2010, Ash Berlin, Net-a-Porter.com Copyright (C) 2010, Michael S. Fischer
This module is free software; you can redistribute it or modify it under the same terms as Perl itself.
| Net-Stomp documentation | Contained in the Net-Stomp distribution. |
package Net::Stomp; use strict; use warnings; use IO::Socket::INET; use IO::Select; use Net::Stomp::Frame; use Carp; use base 'Class::Accessor::Fast'; our $VERSION = '0.41'; __PACKAGE__->mk_accessors( qw( _cur_host failover hostname hosts port select serial session_id socket ssl ssl_options subscriptions _connect_headers bufsize ) ); sub new { my $class = shift; my $self = $class->SUPER::new(@_); $self->bufsize(8192) unless $self->bufsize; $self->{_framebuf} = ""; # We are not subscribed to anything at the start $self->subscriptions( {} ); $self->select( IO::Select->new ); my @hosts = (); # failover://tcp://primary:61616 # failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false if ($self->failover) { my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix; confess "Unable to parse failover uri: " . $self->failover unless $uris; foreach my $host (split(/,/,$uris)) { $host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || confess "Unable to parse failover component: '$host'"; my ($hostname, $port) = ($1, $2); push(@hosts, {hostname => $hostname, port => $port}); } } elsif ($self->hosts) { ## @hosts is used inside the while loop later to decide whether we have ## cycled through all setup hosts. @hosts = @{$self->hosts}; } $self->hosts(@hosts); my $err; { local $@ = 'run me!'; while($@) { eval { $self->_get_connection }; last unless $@; if (!@hosts || $self->_cur_host == $#hosts ) { # We've cycled through all setup hosts. Die now. Can't die because # $@ is localized. $err = $@; last; } sleep(5); } } die $err if $err; return $self; } sub _get_connection { my $self = shift; if (my $hosts = $self->hosts) { if (defined $self->_cur_host && ($self->_cur_host < $#{$hosts} ) ) { $self->_cur_host($self->_cur_host+1); } else { $self->_cur_host(0); } $self->hostname($hosts->[$self->_cur_host]->{hostname}); $self->port($hosts->[$self->_cur_host]->{port}); } my ($socket); my %sockopts = ( PeerAddr => $self->hostname, PeerPort => $self->port, Proto => 'tcp', Timeout => 5 ); if ( $self->ssl ) { eval { require IO::Socket::SSL }; die "You should install the IO::Socket::SSL module for SSL support in Net::Stomp" if $@; %sockopts = ( %sockopts, %{ $self->ssl_options || {} } ); $socket = IO::Socket::SSL->new(%sockopts); } else { $socket = IO::Socket::INET->new(%sockopts); binmode($socket) if $socket; } die "Error connecting to " . $self->hostname . ':' . $self->port . ": $@" unless $socket; $self->select->remove($self->socket) if $self->socket; $self->select->add($socket); $self->socket($socket); } sub connect { my ( $self, $conf ) = @_; my $frame = Net::Stomp::Frame->new( { command => 'CONNECT', headers => $conf } ); $self->send_frame($frame); $frame = $self->receive_frame; # Setting initial values for session id, as given from # the stomp server $self->session_id( $frame->headers->{session} ); $self->_connect_headers( $conf ); return $frame; } sub disconnect { my $self = shift; my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } ); $self->send_frame($frame); $self->socket->close; $self->select->remove($self->socket); } sub _reconnect { my $self = shift; if ($self->socket) { $self->socket->close; } eval { $self->_get_connection }; while ($@) { sleep(5); eval { $self->_get_connection }; } $self->connect( $self->_connect_headers ); for my $sub(keys %{$self->subscriptions}) { $self->subscribe($self->subscriptions->{$sub}); } } sub can_read { my ( $self, $conf ) = @_; # If there is any data left in the framebuffer that we haven't read, return # 'true'. But we don't want to spin endlessly, so only return true the # first time. (Anything touching the _framebuf should update this flag when # it does something. if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) { $self->{_framebuf_changed} = 0; return 1; } $conf ||= {}; my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef; return $self->select->can_read($timeout) || 0; } sub send { my ( $self, $conf ) = @_; my $body = $conf->{body}; delete $conf->{body}; my $frame = Net::Stomp::Frame->new( { command => 'SEND', headers => $conf, body => $body } ); $self->send_frame($frame); } sub send_transactional { my ( $self, $conf ) = @_; my $body = $conf->{body}; delete $conf->{body}; # begin the transaction my $transaction_id = $self->_get_next_transaction; my $begin_frame = Net::Stomp::Frame->new( { command => 'BEGIN', headers => { transaction => $transaction_id } } ); $self->send_frame($begin_frame); # send the message my $receipt_id = $self->_get_next_transaction; $conf->{receipt} = $receipt_id; my $message_frame = Net::Stomp::Frame->new( { command => 'SEND', headers => $conf, body => $body } ); $self->send_frame($message_frame); # check the receipt my $receipt_frame = $self->receive_frame; if ( $receipt_frame->command eq 'RECEIPT' && $receipt_frame->headers->{'receipt-id'} eq $receipt_id ) { # success, commit the transaction my $frame_commit = Net::Stomp::Frame->new( { command => 'COMMIT', headers => { transaction => $transaction_id } } ); return $self->send_frame($frame_commit); } else { # some failure, abort transaction my $frame_abort = Net::Stomp::Frame->new( { command => 'ABORT', headers => { transaction => $transaction_id } } ); $self->send_frame($frame_abort); return 0; } } sub subscribe { my ( $self, $conf ) = @_; my $frame = Net::Stomp::Frame->new( { command => 'SUBSCRIBE', headers => $conf } ); $self->send_frame($frame); my $subs = $self->subscriptions; $subs->{$conf->{'destination'}} = $conf; } sub unsubscribe { my ( $self, $conf ) = @_; my $frame = Net::Stomp::Frame->new( { command => 'UNSUBSCRIBE', headers => $conf } ); $self->send_frame($frame); my $subs = $self->subscriptions; delete $subs->{$conf->{'destination'}}; } sub ack { my ( $self, $conf ) = @_; my $id = $conf->{frame}->headers->{'message-id'}; my $frame = Net::Stomp::Frame->new( { command => 'ACK', headers => { 'message-id' => $id } } ); $self->send_frame($frame); } sub send_frame { my ( $self, $frame ) = @_; # warn "send [" . $frame->as_string . "]\n"; $self->socket->syswrite( $frame->as_string ); my $connected = $self->socket->connected; unless (defined $connected) { $self->_reconnect; $self->send_frame($frame); } } sub _read_data { my ($self, $timeout) = @_; return unless $self->select->can_read($timeout); my $len = $self->socket->sysread($self->{_framebuf}, $self->bufsize, length($self->{_framebuf} || '')); if ($len && $len > 0) { $self->{_framebuf_changed} = 1; } else { # EOF detected - connection is gone. We have to reset the framebuf in # case we had a partial frame in there that will never arrive. $self->{_framebuf} = ""; delete $self->{_command}; delete $self->{_headers}; } return $len; } sub _read_headers { my ($self) = @_; if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) { $self->{_framebuf_changed} = 1; my $raw_headers = $1; if ($raw_headers =~ s/^(.+)\n//) { $self->{_command} = $1; } foreach my $line (split(/\n/, $raw_headers)) { my ($key, $value) = split(/\s*:\s*/, $line, 2); $self->{_headers}->{$key} = $value; } return 1; } return 0; } sub _read_body { my ($self) = @_; my $h = $self->{_headers}; if ($h->{'content-length'}) { if (length($self->{_framebuf}) >= $h->{'content-length'}) { $self->{_framebuf_changed} = 1; my $body = substr($self->{_framebuf}, 0, $h->{'content-length'}, '' ); # Trim the trailer off the frame. $self->{_framebuf} =~ s/^.*?\000\n*//s; return Net::Stomp::Frame->new({ command => delete $self->{_command}, headers => delete $self->{_headers}, body => $body }); } } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) { # No content-length header. my $body = $1; $self->{_framebuf_changed} = 1; return Net::Stomp::Frame->new({ command => delete $self->{_command}, headers => delete $self->{_headers}, body => $body }); } return 0; } sub receive_frame { my ($self, $conf) = @_; my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef; my $connected = $self->socket->connected; unless (defined $connected) { $self->_reconnect; } my $done = 0; while ( not $done = $self->_read_headers ) { return undef unless $self->_read_data($timeout); } while ( not $done = $self->_read_body ) { return undef unless $self->_read_data($timeout); } return $done; } sub _get_next_transaction { my $self = shift; my $serial = $self->serial || 0; $serial++; $self->serial($serial); return $self->session_id . '-' . $serial; } 1; __END__