| POE-Component-Gearman-Client documentation | Contained in the POE-Component-Gearman-Client distribution. |
POE::Component::Gearman::Client - Asynchronous client module for Gearman for POE applications
use POE qw(Component::Gearman::Client);
# Instantiate a new client session.
POE::Component::Gearman::Client->spawn(
alias => 'my_gearman_client',
job_servers => [ '127.0.0.1', '192.168.0.1:123' ],
);
# Overwrite job server list with a new one.
POE::Kernel->post('my_gearman_client' => 'set_job_servers', ['10.0.0.1']);
# Start a task
$task = Gearman::Task->new(...); # with callbacks, etc
POE::Kernel->post('my_gearman_client' => 'add_task', $task);
# if you keep a reference to the client object you can also
# get a list of job servers during runtime:
my $client = POE::Component::Gearman::Client->spawn(...);
$arrayref = $client->job_servers;
@array = $client->job_servers;
This module lets provides an asynchronous interface to submit jobs to Gearman servers in a POE application.
spawnA program must spawn at least one POE::Component::Gearman::Client instance before
it can submit jobs to Gearman servers. A reference to the object is returned if you
need to call methods such as job_servers, otherwise you won't need to store it.
The following parameters can be passed to the spawn constructor.
(Optional) This parameter will be used to set POE's internal session alias. This is useful to post events and is also very important if you instantiate multiple clients. If left empty, the alias will be set to "Gearman".
(Optional) This parameter can contain an arrayref of IP:port host specifications.
job_serversThis method returns an ARRAY or ARRAYREF (depending on the calling context) containing IP:port specification of the configured job servers.
set_job_serversPosting this event to your POE::Component::Gearman::Client client lets you set the current job server list (by overriding the existing one if any).
$kernel->post('Gearman', 'set_job_servers', ['10.0.0.1']);
Gearman is the alias name (see above about alias parameter), and the passed
argument is an ARRAYREF containing the server definitions in IP:port syntax.
add_taskPosting this event to your POE::Component::Gearman::Client client lets you submit a task.
$kernel->post('Gearman', 'add_task', $task);
Gearman is the alias name (see above about alias parameter), and $task is a
Gearman::Task object.
Warning: you can't call POE::Kernel's methods like yield(), delay() etc.
from within a task callback, because callbacks will be executed within
POE::Component::Gearman::Client's session instead of yours. Thus, the only methods
you can call are post() and call() because they let
you specify the destination session. See example:
# WRONG
sub submit_task {
my $kernel = $_[KERNEL];
my $cb = sub {
$kernel->delay('submit_task', 60); # this won't be called within your session!
};
my $task = Gearman::Task->new('do_task', \'', { on_complete => $cb });
POE::Kernel->post('Gearman' => 'add_task', $task);
}
# CORRECT
sub submit_task {
my ($kernel, $session) = @_[KERNEL, SESSION];
my $cb = sub {
$kernel->post($session => 'task_done');
};
my $task = Gearman::Task->new('do_task', \'', { on_complete => $cb });
POE::Kernel->post('Gearman' => 'add_task', $task);
}
sub task_done {
$_[KERNEL]->delay('submit_task', 60);
}
disconnect_allPosting this event to your POE::Component::Gearman::Client client will disconnect the client from all job servers, allowing your POE application to shutdown if you want so.
$kernel->post('Gearman', 'disconnect_all');
Gearman is the alias name (see above about alias parameter).
Copyright Alessandro Ranellucci Some code copyright Six Apart, Ltd.
License granted to use/distribute under the same terms as Perl itself.
This is free software. This comes with no warranty whatsoever.
Alessandro Ranellucci (aar@cpan.org) based on code by Brad Fitzpatrick (brad@danga.com)
| POE-Component-Gearman-Client documentation | Contained in the POE-Component-Gearman-Client distribution. |
# =========================================================================== # POE::Component::Gearman::Client # # POE-based client for Gearman servers # # Author: Alessandro Ranellucci <aar@cpan.org> # # See below for documentation. # package POE::Component::Gearman::Client; use strict; use vars qw($VERSION); use Carp qw(croak); use fields ( 'job_servers', # arrayref of POE::Component::Gearman::Client::Connection objects 't_no_random', # don't randomize job server to use: use first alive one. 't_offline_host', # hashref: hostname -> $bool, if host should act as offline, for testing ); use Gearman::Objects; use Gearman::Task; use Gearman::JobStatus; use List::Util qw(first); use POE::Component::Gearman::Client::Connection; use POE; $VERSION = '0.03'; sub DEBUGGING () { 0 } sub spawn { my ($class, %opts) = @_; my $self = $class; $self = fields::new($class) unless ref $self; $self->{job_servers} = []; $self->{t_offline_host} = {}; my $js = delete $opts{job_servers}; my $alias = delete $opts{alias}; croak "Unknown parameters: " . join(", ", keys %opts) if %opts; # register session with POE POE::Session->create( inline_states => { _start => sub { $_[KERNEL]->alias_set( $alias || 'Gearman' ); # call instead of yield so that the job_servers method is # instantly available $_[KERNEL]->call($_[SESSION], 'set_job_servers', $js) if $js; }, }, object_states => [ $self => [qw(t_set_disable_random t_set_offline_host set_job_servers add_task disconnect_all)] ] ); return $self; } # for testing. sub t_set_disable_random { my $self = $_[OBJECT]; $self->{t_no_random} = shift; } sub t_set_offline_host { my ($self, $host, $val) = @_[OBJECT, ARG0, ARG1]; $val = 1 unless defined $val; $self->{t_offline_host}{$host} = $val; my $conn = first { $_->hostspec eq $host } @{ $self->{job_servers} } or die "No host found with that spec to mark offline"; $conn->t_set_offline($val); } # set job servers, without shutting down dups, and shutting down old ones gracefully sub set_job_servers { my ($self, $js) = @_[OBJECT, ARG0]; my %being_set; # hostspec -> 1 %being_set = map { $_, 1 } @$js; my %exist; # hostspec -> existing conn foreach my $econn (@{ $self->{job_servers} }) { my $spec = $econn->hostspec; if ($being_set{$spec}) { $exist{$spec} = $econn; } else { $econn->close_when_finished; } } my @newlist; foreach (@$js) { push @newlist, $exist{$_} || POE::Component::Gearman::Client::Connection->new( hostspec => $_ ); } $self->{job_servers} = \@newlist; } # getter sub job_servers { my $self = shift; croak "Not a setter" if @_; my @list = map { $_->hostspec } @{ $self->{job_servers} }; return wantarray ? @list : \@list; } sub add_task { my $self = $_[OBJECT]; my Gearman::Task $task = $_[ARG0] or return; my $try_again; $try_again = sub { my @job_servers = grep { $_->alive } @{$self->{job_servers}}; warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING; unless (@job_servers) { $task->final_fail; $try_again = undef; return; } my $js; if (defined( my $hash = $task->hash )) { # Task is hashed, use key to fetch job server $js = @job_servers[$hash % @job_servers]; } else { # Task is not hashed, random job server $js = @job_servers[$self->{t_no_random} ? 0 : int( rand( @job_servers ))]; } # TODO Fix this violation of object privacy. $task->{taskset} = $self; $js->get_in_ready_state( # on_ready: sub { my $timer; if (my $timeout = $task->{timeout}) { # TODO: setup timer #$timer = Danga::Socket->AddTimer($timeout, sub { # $task->final_fail('timeout'); #}); } $task->set_on_post_hooks(sub { $timer->cancel if $timer; # ALSO clean up our $js (connection's) waiting stuff: $js->give_up_on($task); }); $js->add_task( $task ); $try_again = undef; }, # on_error: $try_again, ); }; $try_again->(); } sub disconnect_all { my $self = $_[OBJECT]; warn "Disconnecting all server sockets\n" if DEBUGGING; my @job_servers = grep { $_->alive } @{$self->{job_servers}}; warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING; # TODO: we should better use close_when_finished $_->close for @job_servers; } # POE::Component::Gearman::Client sometimes fakes itself duck-typing style as a # Gearman::Taskset, since a task"set" makes no sense in an async # world, where there's no need to wait on a set of things... since # everything happens at its own pace. so for duck-typing reasons (or, # er, "implementing an interface", say), we need to implement a the # "taskset client method" but in our case, that's just us. sub client { $_[0] } # as a Gearman::Client-like thing, we'll be asked for our prefix, which this module # currently doesn't support, but the base Gearman libraries expect. sub prefix { "" } 1; __END__