POE::Component::Gearman::Client - Asynchronous client module for Gearman for POE applications


POE-Component-Gearman-Client documentation Contained in the POE-Component-Gearman-Client distribution.

Index


Code Index:

NAME

Top

POE::Component::Gearman::Client - Asynchronous client module for Gearman for POE applications

SYNOPSIS

Top

    use POE qw(Component::Gearman::Client);

    # Instantiate a new client session.
    POE::Component::Gearman::Client->spawn(
        alias => 'my_gearman_client',
        job_servers => [ '127.0.0.1', '192.168.0.1:123' ],
    );

    # Overwrite job server list with a new one.
    POE::Kernel->post('my_gearman_client' => 'set_job_servers', ['10.0.0.1']);

    # Start a task
    $task = Gearman::Task->new(...); # with callbacks, etc
    POE::Kernel->post('my_gearman_client' => 'add_task', $task);

    # if you keep a reference to the client object you can also 
    # get a list of job servers during runtime:
    my $client = POE::Component::Gearman::Client->spawn(...);
    $arrayref = $client->job_servers;
    @array = $client->job_servers;

ABSTRACT

Top

This module lets provides an asynchronous interface to submit jobs to Gearman servers in a POE application.

PUBLIC METHODS

Top

spawn

A program must spawn at least one POE::Component::Gearman::Client instance before it can submit jobs to Gearman servers. A reference to the object is returned if you need to call methods such as job_servers, otherwise you won't need to store it.

The following parameters can be passed to the spawn constructor.

alias

(Optional) This parameter will be used to set POE's internal session alias. This is useful to post events and is also very important if you instantiate multiple clients. If left empty, the alias will be set to "Gearman".

job_servers

(Optional) This parameter can contain an arrayref of IP:port host specifications.

job_servers

This method returns an ARRAY or ARRAYREF (depending on the calling context) containing IP:port specification of the configured job servers.

POE EVENTS

Top

set_job_servers

Posting this event to your POE::Component::Gearman::Client client lets you set the current job server list (by overriding the existing one if any).

	$kernel->post('Gearman', 'set_job_servers', ['10.0.0.1']);

Gearman is the alias name (see above about alias parameter), and the passed argument is an ARRAYREF containing the server definitions in IP:port syntax.

add_task

Posting this event to your POE::Component::Gearman::Client client lets you submit a task.

	$kernel->post('Gearman', 'add_task', $task);

Gearman is the alias name (see above about alias parameter), and $task is a Gearman::Task object.

Warning: you can't call POE::Kernel's methods like yield(), delay() etc. from within a task callback, because callbacks will be executed within POE::Component::Gearman::Client's session instead of yours. Thus, the only methods you can call are post() and call() because they let you specify the destination session. See example:

    # WRONG
    sub submit_task {
        my $kernel = $_[KERNEL];
        my $cb = sub {
            $kernel->delay('submit_task', 60);  # this won't be called within your session!
        };
        my $task = Gearman::Task->new('do_task', \'', { on_complete => $cb });
        POE::Kernel->post('Gearman' => 'add_task', $task);
    }

    # CORRECT
    sub submit_task {
        my ($kernel, $session) = @_[KERNEL, SESSION];
        my $cb = sub {
            $kernel->post($session => 'task_done');
        };
        my $task = Gearman::Task->new('do_task', \'', { on_complete => $cb });
        POE::Kernel->post('Gearman' => 'add_task', $task);
    }
    sub task_done {
        $_[KERNEL]->delay('submit_task', 60);
    }

disconnect_all

Posting this event to your POE::Component::Gearman::Client client will disconnect the client from all job servers, allowing your POE application to shutdown if you want so.

	$kernel->post('Gearman', 'disconnect_all');

Gearman is the alias name (see above about alias parameter).

SEE ALSO

Top

Gearman::Task

Gearman::Client::Async

COPYRIGHT

Top

WARRANTY

Top

This is free software. This comes with no warranty whatsoever.

AUTHORS

Top

 Alessandro Ranellucci (aar@cpan.org)
 based on code by Brad Fitzpatrick (brad@danga.com)


POE-Component-Gearman-Client documentation Contained in the POE-Component-Gearman-Client distribution.

# ===========================================================================
# POE::Component::Gearman::Client
# 
# POE-based client for Gearman servers
# 
# Author: Alessandro Ranellucci <aar@cpan.org>
# 
# See below for documentation.
# 

package POE::Component::Gearman::Client;

use strict;
use vars qw($VERSION);

use Carp qw(croak);
use fields (
            'job_servers',   # arrayref of POE::Component::Gearman::Client::Connection objects
            't_no_random',   # don't randomize job server to use:  use first alive one.
            't_offline_host', # hashref: hostname -> $bool, if host should act as offline, for testing
            );
use Gearman::Objects;
use Gearman::Task;
use Gearman::JobStatus;
use List::Util qw(first);
use POE::Component::Gearman::Client::Connection;
use POE;

$VERSION = '0.03';

sub DEBUGGING () { 0 }

sub spawn {
    my ($class, %opts) = @_;
    my $self = $class;
    $self = fields::new($class) unless ref $self;
    
    $self->{job_servers}    = [];
    $self->{t_offline_host} = {};

    my $js = delete $opts{job_servers};
    my $alias = delete $opts{alias};
    croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
    
    # register session with POE
	POE::Session->create(
	    inline_states => {
	        _start => sub {
            	$_[KERNEL]->alias_set( $alias || 'Gearman' );
            	
            	# call instead of yield so that the job_servers method is 
            	# instantly available
            	$_[KERNEL]->call($_[SESSION], 'set_job_servers', $js) if $js;
			},
	    },
		object_states => [
			$self => [qw(t_set_disable_random t_set_offline_host
				        set_job_servers add_task disconnect_all)]
		]
	);
	
	return $self;
}

# for testing.
sub t_set_disable_random {
    my $self = $_[OBJECT];
    $self->{t_no_random} = shift;
}

sub t_set_offline_host {
    my ($self, $host, $val) = @_[OBJECT, ARG0, ARG1];
    $val = 1 unless defined $val;
    $self->{t_offline_host}{$host} = $val;

    my $conn = first { $_->hostspec eq $host } @{ $self->{job_servers} }
        or die "No host found with that spec to mark offline";

    $conn->t_set_offline($val);
}

# set job servers, without shutting down dups, and shutting down old ones gracefully
sub set_job_servers {
    my ($self, $js) = @_[OBJECT, ARG0];

    my %being_set; # hostspec -> 1
    %being_set = map { $_, 1 } @$js;

    my %exist;   # hostspec -> existing conn
    foreach my $econn (@{ $self->{job_servers} }) {
        my $spec = $econn->hostspec;
        if ($being_set{$spec}) {
            $exist{$spec} = $econn;
        } else {
            $econn->close_when_finished;
        }
    }

    my @newlist;
    foreach (@$js) {
        push @newlist, $exist{$_} || POE::Component::Gearman::Client::Connection->new( hostspec => $_ );
    }
    $self->{job_servers} = \@newlist;
}

# getter
sub job_servers {
    my $self = shift;
    croak "Not a setter" if @_;
    my @list = map { $_->hostspec } @{ $self->{job_servers} };
    return wantarray ? @list : \@list;
}

sub add_task {
    my $self = $_[OBJECT];
    my Gearman::Task $task = $_[ARG0] or return;

    my $try_again;
    $try_again = sub {

        my @job_servers = grep { $_->alive } @{$self->{job_servers}};
        warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
        unless (@job_servers) {
            $task->final_fail;
            $try_again = undef;
            return;
        }

        my $js;
        if (defined( my $hash = $task->hash )) {
            # Task is hashed, use key to fetch job server
            $js = @job_servers[$hash % @job_servers];
        }
        else {
            # Task is not hashed, random job server
            $js = @job_servers[$self->{t_no_random} ? 0 :
                               int( rand( @job_servers ))];
        }

        # TODO Fix this violation of object privacy.
        $task->{taskset} = $self;

        $js->get_in_ready_state(
                                # on_ready:
                                sub {
                                    my $timer;
                                    if (my $timeout = $task->{timeout}) {
                                        # TODO: setup timer
                                        #$timer = Danga::Socket->AddTimer($timeout, sub {
                                        #    $task->final_fail('timeout');
                                        #});
                                    }
                                    $task->set_on_post_hooks(sub {
                                        $timer->cancel if $timer;

                                        # ALSO clean up our $js (connection's) waiting stuff:
                                        $js->give_up_on($task);
                                    });
                                    $js->add_task( $task );
                                    $try_again = undef;
                                },
                                # on_error:
                                $try_again,
                                );
    };
    $try_again->();
}

sub disconnect_all {
    my $self = $_[OBJECT];
    warn "Disconnecting all server sockets\n" if DEBUGGING;
    my @job_servers = grep { $_->alive } @{$self->{job_servers}};
    warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
    
    # TODO: we should better use close_when_finished
    $_->close for @job_servers;
}

# POE::Component::Gearman::Client sometimes fakes itself duck-typing style as a
# Gearman::Taskset, since a task"set" makes no sense in an async
# world, where there's no need to wait on a set of things... since
# everything happens at its own pace.  so for duck-typing reasons (or,
# er, "implementing an interface", say), we need to implement a the
# "taskset client method" but in our case, that's just us.
sub client { $_[0] }

# as a Gearman::Client-like thing, we'll be asked for our prefix, which this module
# currently doesn't support, but the base Gearman libraries expect.
sub prefix { "" }


1;
__END__