| POE-Component-Server-Bayeux documentation | Contained in the POE-Component-Server-Bayeux distribution. |
POE::Component::Client::Bayeux - Bayeux/cometd client implementation in POE
use POE qw(Component::Client::Bayeux);
POE::Component::Client::Bayeux->spawn(
Host => '127.0.0.1',
Alias => 'comet',
);
POE::Session->create(
inline_states => {
_start => sub {
my ($kernel, $heap) = @_[KERNEL, HEAP];
$kernel->alias_set('my_client');
$kernel->post('comet', 'init');
$kernel->post('comet', 'subscribe', '/chat/demo', 'events');
$kernel->post('comet', 'publish', '/chat/demo', {
user => "POE",
chat => "POE has joined",
join => JSON::XS::true,
});
},
events => sub {
my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0];
print STDERR "Client got subscribed message:\n" . Dumper($message);
},
},
);
$poe_kernel->run();
This module implements the Bayeux Protocol (1.0draft1) from the Dojo Foundation. Also called cometd, Bayeux is a low-latency routing protocol for JSON encoded events between clients and servers in a publish-subscribe model.
This is the client implementation. It is not feature complete, but works at the moment for testing a Bayeux server.
Create a new Bayeux client. Arguments to this method:
Connect to this host.
Connect to this port.
Use SSL on connection
The POE session alias for local sessions to interact with.
Either 0 or 1, indicates level of logging.
Logfile to write output to.
If false, no logger output to STDOUT.
Enables cross domain protocol of messaging.
Provide a coderef that will receive a message hashref of any failed messages (erorrs in protocol, or simply unhandled messages).
Returns a class object with methods of interest:
The POE::Session object returned from an internal create() call.
The following are states you can post to to interact with the client.
Initializes the client, connecting to the server, and sets up long polling.
Publishes arbitrary message to the channel given. Message will have 'clientId' and 'id' fields auto-populated.
Subscribes client to the channel given. Callback can either be a coderef or the name of a state in the calling session. Callback will get one arg, the message that was posted to the channel subscribed to.
Unsubscribes from channel.
Sends a disconnect request.
Disconnect and reconnect
Lots of stuff.
The code currently implements only the long-polling transport and doesn't yet strictly follow all the directives in the protocol document http://svn.xantus.org/shortbus/trunk/bayeux/bayeux.html
No known bugs, but I'm sure you can find some.
Copyright (c) 2008 Eric Waters and XMission LLC (http://www.xmission.com/). All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
The full text of the license can be found in the LICENSE file included with this module.
Eric Waters <ewaters@uarc.com>
| POE-Component-Server-Bayeux documentation | Contained in the POE-Component-Server-Bayeux distribution. |
package POE::Component::Client::Bayeux;
use strict; use warnings; use POE qw(Component::Client::HTTP Component::Client::Bayeux::Transport); use Params::Validate; use Data::Dumper; use JSON::Any; use Data::UUID; use HTTP::Request::Common; use Log::Log4perl qw(get_logger :levels); use Log::Log4perl::Appender; use Log::Log4perl::Layout; use POE::Component::Client::Bayeux::Utilities qw(decode_json_response); use POE::Component::Server::Bayeux::Utilities qw(channel_match); use base qw(Class::Accessor Exporter); __PACKAGE__->mk_accessors(qw(session clientId logger)); our @EXPORT_OK = qw(decode_json_response); my $protocol_version = '1.0'; our $VERSION = '0.03';
sub spawn { my $class = shift; my %args = validate(@_, { Host => 1, Port => { default => 80 }, Path => { default => '/cometd' }, SSL => { default => 0 }, Alias => { default => 'bayeux_client' }, CrossDomain => { default => 0 }, Debug => { default => 0 }, ErrorCallback => 0, LogFile => 0, LogStdout => { default => 1 }, }); if ($args{CrossDomain}) { # TODO die __PACKAGE__ . " doesn't yet support cross domain protocol.\n"; } my $ua_alias = $args{Alias} . '_ua'; my $cometd_url = sprintf 'http%s://%s:%s%s', ($args{SSL} ? 's' : ''), $args{Host}, $args{Port}, $args{Path}; POE::Component::Client::HTTP->spawn( Alias => $ua_alias, ); my $self = bless { %args }, $class; my $session = POE::Session->create( inline_states => { _start => \&client_start, _stop => \&client_stop, shutdown => \&client_shutdown, # Public methods init => \&init, publish => \&publish, subscribe => \&subscribe, unsubscribe => \&unsubscribe, disconnect => \&disconnect, reconnect => \&reconnect, # Internal handshake => \&handshake, handshake_response => \&handshake_response, send_message => \&send_message, ua_response => \&ua_response, deliver => \&deliver, flush_queue => \&flush_queue, send_transport => \&send_transport, }, heap => { args => \%args, ua => $ua_alias, remote_url => $cometd_url, json => JSON::Any->new(), uuid => Data::UUID->new(), subscriptions => {}, client => $self, }, ($ENV{POE_DEBUG} ? ( options => { trace => 1, debug => 1 }, ) : ()), ); # Setup logger my $logger = Log::Log4perl->get_logger('bayeux_client'); { my $logger_layout = Log::Log4perl::Layout::PatternLayout->new("[\%d] \%p: \%m\%n"); $logger->level($args{Debug} ? $DEBUG : $INFO); if ($args{LogFile}) { my $file_appender = Log::Log4perl::Appender->new( 'Log::Log4perl::Appender::File', name => 'filelog', filename => $args{LogFile}, ); $file_appender->layout( $logger_layout ); $logger->add_appender($file_appender); } if ($args{LogStdout}) { my $stdout_appender = Log::Log4perl::Appender->new( 'Log::Log4perl::Appender::Screen', name => 'screenlog', stderr => 0, ); $stdout_appender->layout($logger_layout); $logger->add_appender($stdout_appender); } } $self->{logger} = $logger; $self->{session} = $session->ID; return $self; } sub client_start { my ($kernel, $heap) = @_[KERNEL, HEAP]; $kernel->alias_set( $heap->{args}{Alias} ); if ($ENV{POE_DEBUG}) { $kernel->alias_resolve($heap->{ua})->option( trace => 1, debug => 1 ); } } sub client_stop { my ($kernel, $heap) = @_[KERNEL, HEAP]; } sub client_shutdown { my ($kernel, $heap) = @_[KERNEL, HEAP]; $heap->{_shutdown} = 1; $kernel->call( $heap->{ua}, 'shutdown' ); if ($heap->{transport}) { $kernel->call( $heap->{transport}, 'shutdown' ); } $kernel->alias_remove( $heap->{args}{Alias} ); } ## Public States ###
sub init { my ($kernel, $heap) = @_[KERNEL, HEAP]; $kernel->yield('handshake'); } sub handshake { my ($kernel, $heap, %ext) = @_[KERNEL, HEAP, ARG0 .. $#_]; my %handshake = ( channel => '/meta/handshake', version => $protocol_version, minimumVersion => $protocol_version, supportedConnectionTypes => [ 'long-polling' ], ext => { 'json-comment-filtered' => 1, %ext, } ); $kernel->yield('send_message', 'handshake_response', \%handshake); # Unsubscribe from all TODO $heap->{_initialized} = 1; $heap->{_connected} = 0; }
sub publish { my ($kernel, $heap, $channel, $message) = @_[KERNEL, HEAP, ARG0, ARG1]; $kernel->call($_[SESSION], 'send_transport', { channel => $channel, data => $message, }); }
sub subscribe { my ($kernel, $heap, $channel, $callback) = @_[KERNEL, HEAP, ARG0, ARG1]; return if $heap->{subscriptions}{$channel} && $heap->{subscriptions}{$channel}{callback} eq $callback && $heap->{subscriptions}{$channel}{session} eq $_[SENDER]; $heap->{subscriptions}{$channel} = { callback => $callback, session => $_[SENDER], }; $kernel->call($_[SESSION], 'send_transport', { channel => '/meta/subscribe', subscription => $channel, }); }
sub unsubscribe { my ($kernel, $heap, $channel) = @_[KERNEL, HEAP, ARG0]; delete $heap->{subscriptions}{$channel}; $kernel->call($_[SESSION], 'send_transport', { channel => '/meta/unsubscribe', subscription => $channel, }); }
sub disconnect { my ($kernel, $heap) = @_[KERNEL, HEAP]; $kernel->call($_[SESSION], 'send_transport', { channel => '/meta/disconnect', }); $heap->{_disconnect} = 1; }
sub reconnect { my ($kernel, $heap) = @_[KERNEL, HEAP]; $kernel->call($_[SESSION], 'send_transport', { channel => '/meta/disconnect', }); $heap->{_reconnect} = 1; } ## Internal Main States ### sub handshake_response { my ($kernel, $heap, $session, $response) = @_[KERNEL, HEAP, SESSION, ARG0]; if (! $response || ! ref $response || ! ref $response eq 'HASH') { die "Invalid response from handshake\n"; } if ($response->{version} && $protocol_version < $response->{version}) { die "Can't connect to server: version $$response{version} is > my supported version $protocol_version\n"; } if (! $response->{successful}) { die "Unsuccessful handshake.\n" . Dumper($response); } # Store client id for all future requests $heap->{clientId} = $response->{clientId}; $heap->{client}->clientId( $heap->{clientId} ); # Store advice $heap->{advice} = $response->{advice} || {}; # Choose a transport, build it, and ask it to connect # TODO: make sure it's one of the returned supportedConnectionTypes $heap->{transport} = POE::Component::Client::Bayeux::Transport->spawn( type => 'long-polling', parent => $session, parent_heap => $heap, ); $kernel->post($heap->{transport}, 'tunnelInit'); } sub deliver { my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0]; if (! $message || ! ref $message || ! ref $message eq 'HASH' || ! $message->{channel}) { die "deliver(): Invalid message\n"; } # If the message has an id, see if I have a record of the instigating request my $request; if ($message->{id}) { $request = delete $heap->{messages}{ $message->{id} }; } # Handle /meta/ channel responses if (my ($meta_channel) = $message->{channel} =~ m{^/meta/(.+)$}) { if ($meta_channel eq 'connect') { if ($message->{successful} && ! $heap->{_connected}) { $heap->{_connected} = 1; } elsif (! $heap->{_initialized}) { $heap->{_connected} = 0; } $kernel->yield('flush_queue'); return; } } # Publishes to a non-private channel MAY yield a simple successful message. Ignore those. if ($request && $request->{caller_state} eq 'publish' && $message->{successful} && $message->{channel} !~ m{^/service/}) { return; } # Check if I have a subscription for the channel my $matching_subscription; foreach my $subscription (keys %{ $heap->{subscriptions} }) { next unless channel_match($message->{channel}, $subscription); $matching_subscription = $subscription; last; } # Call the callback if so for each subscription if ($matching_subscription) { my $sub_details = $heap->{subscriptions}{$matching_subscription}; if ($sub_details->{callback}) { if (ref $sub_details->{callback}) { $sub_details->{callback}($message, $heap); return; } elsif ($_[SESSION] ne $sub_details->{session}) { $kernel->post( $sub_details->{session}, $sub_details->{callback}, $message, $heap ); return; } } } # Call generic callback for all non-successful messages if (defined $message->{successful} && ! $message->{successful} && $heap->{args}{ErrorCallback}) { $heap->{args}{ErrorCallback}($message); } $heap->{client}->logger->debug("deliver() couldn't handle message:\n" . Dumper($message)); } ## Utilities ### sub send_message { my ($kernel, $heap, $callback_state, @args) = @_[KERNEL, HEAP, ARG0 .. $#_]; $heap->{client}->logger->debug(" >>> Pre-transport >>>\n" . Dumper(\@args)); # Create an HTTP POST request, encoding the args into JSON my $request = POST $heap->{remote_url}, [ message => $heap->{json}->encode(\@args) ]; # Create a UUID so I can collect meta info about this request my $uuid = $heap->{uuid}->create_str(); $heap->{_ua_requests}{$uuid} = { json_callback => $callback_state }; # Send the request to the user agent $kernel->post( $heap->{ua}, 'request', 'ua_response', $request, $uuid ); } sub ua_response { my ($kernel, $heap, $request_packet, $response_packet) = @_[KERNEL, HEAP, ARG0, ARG1]; my $request_object = $request_packet->[0]; my $request_tag = $request_packet->[1]; # from the 'request' post my $response_object = $response_packet->[0]; my $meta = delete $heap->{_ua_requests}{$request_tag}; if ($meta && $meta->{json_callback}) { my $json; eval { $json = decode_json_response($response_object); }; if ($@) { # Ignore errors if shutting down return if $heap->{_shutdown}; die $@; } $heap->{client}->logger->debug("<<< Pre-transport <<<\n" . Dumper($json)); $kernel->yield( $meta->{json_callback}, @$json ); } } sub send_transport { my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0]; # Add unique ID to each message my $msg_id = ++$heap->{message_id}; $message->{id} = $msg_id; # Store a copy of this message $heap->{messages}{$msg_id} = { %$message, caller_session => $_[SENDER], caller_state => $_[CALLER_STATE], }; if ($heap->{transport}) { $kernel->post( $heap->{transport}, 'sendMessages', [ $message ]); } else { $heap->{client}->logger->debug("Queueing message ".Dumper($message)." as no active transport"); push @{ $heap->{message_queue} }, $message; } return $msg_id; } sub flush_queue { my ($kernel, $heap) = @_[KERNEL, HEAP]; return unless $heap->{message_queue} && ref $heap->{message_queue} && int @{ $heap->{message_queue} }; return unless $heap->{transport}; $heap->{client}->logger->debug("Flushing queue to transport"); $kernel->post($heap->{transport}, 'sendMessages', [ @{ $heap->{message_queue} } ]); $heap->{message_queue} = []; }
1;