| AnyEvent-MPRPC documentation | Contained in the AnyEvent-MPRPC distribution. |
AnyEvent::MPRPC::Server - Simple TCP-based MessagePack RPC server
use AnyEvent::MPRPC::Server;
my $server = AnyEvent::MPRPC::Server->new( port => 4423 );
$server->reg_cb(
echo => sub {
my ($res_cv, @params) = @_;
$res_cv->result(@params);
},
sum => sub {
my ($res_cv, @params) = @_;
$res_cv->result( $params[0] + $params[1] );
},
);
This module is server part of AnyEvent::MPRPC.
Create server object, start listening socket, and return object.
my $server = AnyEvent::MPRPC::Server->new(
port => 4423,
);
Available %options are:
Listening port or path to unix socket (Required)
Bind address. Default to undef: This means server binds all interfaces by default.
If you want to use unix socket, this option should be set to "unix/"
Error callback which is called when some errors occured. This is actually AnyEvent::Handle's on_error.
EOF callback. same as AnyEvent::Handle's on_eof callback.
Hashref options of AnyEvent::Handle that is used to handle client connections.
Register MessagePack RPC methods.
$server->reg_cb(
echo => sub {
my ($res_cv, @params) = @_;
$res_cv->result(@params);
},
sum => sub {
my ($res_cv, @params) = @_;
$res_cv->result( $params[0] + $params[1] );
},
);
MessagePack RPC callback arguments consists of $result_cv, and request @params.
my ($result_cv, @params) = @_;
$result_cv is AnyEvent::MPRPC::CondVar object.
Callback must be call <$result_cv-result>> to return result or <$result_cv-error>> to return error.
If $result_cv is not defined, it is notify request, so you don't have to return response. See AnyEvent::MPRPC::Client notify method.
@params is same as request parameter.
Tokuhiro Matsuno <tokuhirom@cpan.org>
Copyright (c) 2009 by tokuhirom.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
The full text of the license can be found in the LICENSE file included with this module.
| AnyEvent-MPRPC documentation | Contained in the AnyEvent-MPRPC distribution. |
package AnyEvent::MPRPC::Server; use Any::Moose; use Carp; use Scalar::Util 'weaken'; use AnyEvent::Handle; use AnyEvent::Socket; use AnyEvent::MPRPC::CondVar; use AnyEvent::MessagePack; use AnyEvent::MPRPC::Constant; has address => ( is => 'ro', isa => 'Maybe[Str]', default => undef, ); has port => ( is => 'ro', isa => 'Int|Str', required => 1, ); has server => ( is => 'rw', isa => 'Object', ); has on_error => ( is => 'rw', isa => 'CodeRef', lazy => 1, default => sub { return sub { my ($handle, $fatal, $message) = @_; carp sprintf "Server got error: %s", $message; }; }, ); has on_eof => ( is => 'rw', isa => 'CodeRef', lazy => 1, default => sub { return sub { }; }, ); has handler_options => ( is => 'ro', isa => 'HashRef', default => sub { {} }, ); has _handlers => ( is => 'ro', isa => 'ArrayRef', default => sub { [] }, ); has _callbacks => ( is => 'ro', isa => 'HashRef', lazy => 1, default => sub { {} }, ); no Any::Moose; sub BUILD { my $self = shift; my $server = tcp_server $self->address, $self->port, sub { my ($fh, $host, $port) = @_; my $indicator = "$host:$port"; my $handle = AnyEvent::Handle->new( on_error => sub { my ($h, $fatal, $msg) = @_; $self->on_error->(@_); $h->destroy; }, on_eof => sub { my ($h) = @_; # client disconnected $self->on_eof->(@_); $h->destroy; }, %{ $self->handler_options }, fh => $fh, ); $handle->on_read(sub { shift->unshift_read( msgpack => sub { $self->_dispatch($indicator, @_); }), }); $self->_handlers->[ fileno($fh) ] = $handle; }; $self->server($server); weaken $self; $self; } sub reg_cb { my ($self, %callbacks) = @_; while (my ($method, $callback) = each %callbacks) { $self->_callbacks->{ $method } = $callback; } } sub _dispatch { my ($self, $indicator, $handle, $request) = @_; return unless $request and ref $request eq 'ARRAY'; my $target = $self->_callbacks->{ $request->[MP_REQ_METHOD] }; my $id = $request->[MP_REQ_MSGID]; $indicator = "$indicator:$id"; my $res_cb = sub { my $type = shift; my $result = @_ > 1 ? \@_ : $_[0]; $handle->push_write( msgpack => [ MP_TYPE_RESPONSE, int($id), # should be IV. $type eq 'error' ? $result : undef, $type eq 'result' ? $result : undef, ]) if $handle; }; weaken $handle; my $cv = AnyEvent::MPRPC::CondVar->new; $cv->_cb( sub { $res_cb->( result => $_[0]->recv ) }, sub { $res_cb->( error => $_[0]->recv ) }, ); $target ||= sub { shift->error(qq/No such method "@{[ $request->[MP_REQ_METHOD] ]}" found/) }; $target->( $cv, $request->[MP_REQ_PARAMS] ); } __PACKAGE__->meta->make_immutable; __END__