GearmanX::Worker - Working class for the Gearmand job server


GearmanX-Worker documentation Contained in the GearmanX-Worker distribution.

Index


Code Index:

NAME

Top

GearmanX::Worker - Working class for the Gearmand job server

SYNOPSIS

Top

    #-- define a class for your workers:
    package MyWorker;
    use base qw(GearmanX::Worker);

    sub echo  :Expose  {
	my $param = shift;
        # ... compute something
        return $result;
    }

    1;

    #-- in the meantime in the worker server
    my $w = new MyWorker;
    $w->run_as_thread;

    #-- or alternatively
    $w->run; # block here

    # somewhere else in a gearman client
    use GearmanX::Client;
    my $c = new GearmanX::Client;
    my $r = $c->job ('echo', '1+2');

DESCRIPTION

Top

This class implements the necessary infrastructure to comfortably write a gearman (http://www.gearman.org/) server. Instead of messing around with a task object where you get your arguments, you simply derive a subclass of GearmanX::Worker and define some methods (subs actually) which can handle certain gearman jobs. For that you mark these methods with an attribute Expose.

Parameter Handling

Every job handler receives exactly one parameter. That can be a scalar, a list reference or a hash reference. This is the data sent from the client, which may use GearmanX::Client.

Result Handling

Every job handler is supposed to return a result. That should be a scalar, a list reference or a hash reference. This data will be sent back to the client.

Owning the Protocol

As the gearman system only allows strings to be passed between clients and workers, there is a special encoding for list and hash references. See the implementation for details.

INTERFACE

Top

Constructor

The constructor expects the following fields:

SERVERS (optional, default: 127.0.0.1)

This field controls where the jobs servers are.

Attributes

Expose

With this attribute you signal to the constructor that you intent a certain method to be exposed to the gearman systems as a job handler. At constructor time of your worker this method will be registered with the gearman server.

Methods

run

This starts the worker and blocks there. This method will never terminate. Well, unless the world explodes.

run_as_thread

This method launches a thread and detaches it. It will not block and returns the thread object.

AUTHOR

Top

Robert Barta, <rho at devc.at>

BUGS

Top

Please report any bugs or feature requests to bug-gearmanx-worker at rt.cpan.org, or through the web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=GearmanX-Worker. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.

COPYRIGHT & LICENSE

Top


GearmanX-Worker documentation Contained in the GearmanX-Worker distribution.
package GearmanX::Worker;

use warnings;
use strict;

use Attribute::Handlers;
use Data::Dumper;

our $VERSION = '0.02';

my %exposed;
    
sub _register {
    my $self = shift;
    
    foreach my $ref (keys %{$self->{exposes}}) {
	next unless $self->{exposes}->{$ref}->{name};                                  # nonames we do not like
	$self->{worker}->register_function ( 
	    $self->{exposes}->{$ref}->{name} => $self->{exposes}->{$ref}->{'exec'}
	    );
    }
}

sub new {
    my $class = shift;
    my $exposes = $exposed{$class};                                                     # TODO die if empty
    my %options = @_;
    $options{SERVERS} ||= "127.0.0.1" ;
    return bless { exposes => $exposes, %options }, $class;
}

sub Expose :ATTR {
    my ($package, $symbol, $referent, $attr, $data, $phase, $filename, $linenum) = @_;
    my $name;
    $name ||= shift @$data if ref($data) eq 'ARRAY';
    $name ||= *{$symbol}{NAME};                                                         # fall back to name of routine

    $exposed{$package}->{"$referent"}->{'name'} ||= $name;
    $exposed{$package}->{"$referent"}->{'exec'} ||= sub {
	my $task = $_[0];
	my $s = $task->arg;
#	warn "in default stub arg ".Dumper $s;

	use GearmanX::Serialization;
	my $p = GearmanX::Serialization::_deserialize (\ $s);
#	warn "before referent ".Dumper $p;
	my $r = &$referent ($p);
#	warn "after referent ".Dumper $r;
	$s = GearmanX::Serialization::_serialize ($r);
#	warn "after serializ ".Dumper $s;

	return \ $s;
    };
}

sub run {
    my $self = shift;
#    warn "in run self $self". Dumper $self->{SERVERS};
    use Gearman::Worker;
    $self->{worker} = Gearman::Worker->new;
    $self->{worker}->job_servers( $self->{SERVERS} );
    $self->_register;
#    warn "WORKER starting worker\n";
    
    $self->{worker}->work while (1);
#    warn "WORKER ending worker\n";
}


sub run_as_thread {
    my $self = shift;
    
    use threads;
    my $thr = threads->new (\&run, $self);
    $thr->detach;
    return $thr;
}

"against all gods";

__END__



sub storable :ATTR {
    warn "storable data ".Dumper \@_;
    my ($package, $symbol, $referent, $attr, $data, $phase, $filename, $linenum) = @_;
    $data ||= [ ];
    warn Dumper $data;
    my $in_storable  = grep { $_ eq 'in'  } @$data;
    my $out_storable = grep { $_ eq 'out' } @$data;

    $exposed{$package}->{"$referent"}->{'exec'} = sub {
	warn "before xxxx ".Dumper \@_;
	my $task = $_[0];
	my $s;
	if ($in_storable) {
	    warn "taking care of in storable";
	    use Storable qw(thaw);
	    $s = thaw $task->arg;
	} else {
	    $s = $task->arg;
	}

	warn "before return ".Dumper $s;
	return \ $s;
    }
}