| GearmanX-Worker documentation | Contained in the GearmanX-Worker distribution. |
GearmanX::Worker - Working class for the Gearmand job server
#-- define a class for your workers:
package MyWorker;
use base qw(GearmanX::Worker);
sub echo :Expose {
my $param = shift;
# ... compute something
return $result;
}
1;
#-- in the meantime in the worker server
my $w = new MyWorker;
$w->run_as_thread;
#-- or alternatively
$w->run; # block here
# somewhere else in a gearman client
use GearmanX::Client;
my $c = new GearmanX::Client;
my $r = $c->job ('echo', '1+2');
This class implements the necessary infrastructure to comfortably write a gearman
(http://www.gearman.org/) server. Instead of messing around with a task object where you get your
arguments, you simply derive a subclass of GearmanX::Worker and define some methods (subs
actually) which can handle certain gearman jobs. For that you mark these methods with an
attribute Expose.
Every job handler receives exactly one parameter. That can be a scalar, a list reference or a hash reference. This is the data sent from the client, which may use GearmanX::Client.
Every job handler is supposed to return a result. That should be a scalar, a list reference or a hash reference. This data will be sent back to the client.
As the gearman system only allows strings to be passed between clients and workers, there is a special encoding for list and hash references. See the implementation for details.
The constructor expects the following fields:
SERVERS (optional, default: 127.0.0.1)This field controls where the jobs servers are.
With this attribute you signal to the constructor that you intent a certain method to be exposed to the gearman systems as a job handler. At constructor time of your worker this method will be registered with the gearman server.
This starts the worker and blocks there. This method will never terminate. Well, unless the world explodes.
This method launches a thread and detaches it. It will not block and returns the thread object.
Robert Barta, <rho at devc.at>
Please report any bugs or feature requests to bug-gearmanx-worker at rt.cpan.org, or through the
web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=GearmanX-Worker. I will be
notified, and then you'll automatically be notified of progress on your bug as I make changes.
Copyright 2009 Robert Barta, all rights reserved.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| GearmanX-Worker documentation | Contained in the GearmanX-Worker distribution. |
package GearmanX::Worker; use warnings; use strict; use Attribute::Handlers; use Data::Dumper;
our $VERSION = '0.02';
my %exposed; sub _register { my $self = shift; foreach my $ref (keys %{$self->{exposes}}) { next unless $self->{exposes}->{$ref}->{name}; # nonames we do not like $self->{worker}->register_function ( $self->{exposes}->{$ref}->{name} => $self->{exposes}->{$ref}->{'exec'} ); } } sub new { my $class = shift; my $exposes = $exposed{$class}; # TODO die if empty my %options = @_; $options{SERVERS} ||= "127.0.0.1" ; return bless { exposes => $exposes, %options }, $class; }
sub Expose :ATTR { my ($package, $symbol, $referent, $attr, $data, $phase, $filename, $linenum) = @_; my $name; $name ||= shift @$data if ref($data) eq 'ARRAY'; $name ||= *{$symbol}{NAME}; # fall back to name of routine $exposed{$package}->{"$referent"}->{'name'} ||= $name; $exposed{$package}->{"$referent"}->{'exec'} ||= sub { my $task = $_[0]; my $s = $task->arg; # warn "in default stub arg ".Dumper $s; use GearmanX::Serialization; my $p = GearmanX::Serialization::_deserialize (\ $s); # warn "before referent ".Dumper $p; my $r = &$referent ($p); # warn "after referent ".Dumper $r; $s = GearmanX::Serialization::_serialize ($r); # warn "after serializ ".Dumper $s; return \ $s; }; }
sub run { my $self = shift; # warn "in run self $self". Dumper $self->{SERVERS}; use Gearman::Worker; $self->{worker} = Gearman::Worker->new; $self->{worker}->job_servers( $self->{SERVERS} ); $self->_register; # warn "WORKER starting worker\n"; $self->{worker}->work while (1); # warn "WORKER ending worker\n"; }
sub run_as_thread { my $self = shift; use threads; my $thr = threads->new (\&run, $self); $thr->detach; return $thr; }
"against all gods"; __END__ sub storable :ATTR { warn "storable data ".Dumper \@_; my ($package, $symbol, $referent, $attr, $data, $phase, $filename, $linenum) = @_; $data ||= [ ]; warn Dumper $data; my $in_storable = grep { $_ eq 'in' } @$data; my $out_storable = grep { $_ eq 'out' } @$data; $exposed{$package}->{"$referent"}->{'exec'} = sub { warn "before xxxx ".Dumper \@_; my $task = $_[0]; my $s; if ($in_storable) { warn "taking care of in storable"; use Storable qw(thaw); $s = thaw $task->arg; } else { $s = $task->arg; } warn "before return ".Dumper $s; return \ $s; } }