Gearman::Driver::Job - Handles the POE magic


Gearman-Driver documentation Contained in the Gearman-Driver distribution.

Index


Code Index:

NAME

Top

Gearman::Driver::Job - Handles the POE magic

DESCRIPTION

Top

This class is responsible for starting/stopping processes as well as handling all pipes (STDOUT/STDERR/STDIN) of the processes. All events are written to a logfile. Possible events are:

* Starting processes
* STDOUT of processes
* STDERR of processes
* Stopping processes

The current interface may only be interesting for people subclassing Gearman::Driver or for people writing commands/extensions for Gearman::Driver::Console.

ATTRIBUTES

Top

driver

Reference to the Gearman::Driver instance.

name

The job's name.

methods

ArrayRef of Gearman::Driver::Job::Method objects.

max_processes

Maximum number of concurrent processes this job may have.

min_processes

Minimum number of concurrent processes this job may have.

processes

This attribute stores a key/value pair containing: $pid => $job

It provides following methods:

* count_processes()
* delete_process($pid)
* get_process($pid)
* get_processes()
* get_pids()
* set_process($pid = $job)>

gearman

Instance of Gearman::Driver::Adaptor.

session

Instance of POE::Session.

lastrun

Each time this job is called it stores time() in this attribute.

lasterror

Each time this job failed it stores time() in this attribute.

lasterror_msg

Each time this job failed it stores the error message in this attribute.

worker

Reference to the worker object.

METHODS

Top

add_process

Starts/forks/adds another process of this job.

remove_process

Removes/kills one process of this job.

AUTHOR

Top

See Gearman::Driver.

COPYRIGHT AND LICENSE

Top

SEE ALSO

Top

* Gearman::Driver
* Gearman::Driver::Adaptor
* Gearman::Driver::Console
* Gearman::Driver::Console::Basic
* Gearman::Driver::Console::Client
* Gearman::Driver::Job::Method
* Gearman::Driver::Loader
* Gearman::Driver::Observer
* Gearman::Driver::Worker

Gearman-Driver documentation Contained in the Gearman-Driver distribution.
package Gearman::Driver::Job;

use Moose;
use Gearman::Driver::Adaptor;
use POE qw(Wheel::Run);

has 'driver' => (
    handles  => { log => 'log' },
    is       => 'rw',
    isa      => 'Gearman::Driver',
    required => 1,
    weak_ref => 1,
);

has 'name' => (
    is       => 'rw',
    isa      => 'Str',
    required => 1,
);

has 'methods' => (
    is       => 'rw',
    isa      => 'ArrayRef[Gearman::Driver::Job::Method]',
    required => 1,
);

has 'max_processes' => (
    default  => 1,
    is       => 'rw',
    isa      => 'Int',
    required => 1,
);

has 'min_processes' => (
    default  => 1,
    is       => 'rw',
    isa      => 'Int',
    required => 1,
);

has 'processes' => (
    default => sub { {} },
    handles => {
        count_processes => 'count',
        delete_process  => 'delete',
        get_process     => 'get',
        get_processes   => 'values',
        get_pids        => 'keys',
        set_process     => 'set',
    },
    is     => 'ro',
    isa    => 'HashRef',
    traits => [qw(Hash)],
);

has 'gearman' => (
    is  => 'ro',
    isa => 'Gearman::Driver::Adaptor',
);

has 'session' => (
    is  => 'ro',
    isa => 'POE::Session',
);

has 'lastrun' => (
    default => 0,
    is      => 'rw',
    isa     => 'Int',
);

has 'lasterror' => (
    default => 0,
    is      => 'rw',
    isa     => 'Int',
);

has 'lasterror_msg' => (
    default => '',
    is      => 'rw',
    isa     => 'Str',
);

has 'worker' => (
    is       => 'rw',
    isa      => 'Any',
    required => 1,
);

sub add_process {
    my ($self) = @_;
    POE::Kernel->post( $self->session => 'add_process' );
}

sub remove_process {
    my ($self) = @_;
    POE::Kernel->post( $self->session => 'remove_process' );
}

sub BUILD {
    my ($self) = @_;

    $self->{gearman} = Gearman::Driver::Adaptor->new( server => $self->driver->server );

    foreach my $method ( @{ $self->methods } ) {
        $self->gearman->add_function( $method->name => $method->wrapper );
    }

    $self->{session} = POE::Session->create(
        object_states => [
            $self => {
                _start             => '_start',
                got_process_stdout => '_on_process_stdout',
                got_process_stderr => '_on_process_stderr',
                got_process_close  => '_on_process_close',
                got_process_signal => '_on_process_signal',
                add_process        => '_add_process',
                remove_process     => '_remove_process',
            }
        ]
    );
}

sub _start {
    $_[KERNEL]->alias_set( $_[OBJECT]->name );
}

sub _add_process {
    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
    my $process = POE::Wheel::Run->new(
        Program => sub {
            POE::Kernel->stop();

            if ( my $process_name = $self->worker->process_name( $0, $self->name ) ) {
                $0 = $process_name;
            }

            $self->gearman->work;
        },
        StdoutEvent => "got_process_stdout",
        StderrEvent => "got_process_stderr",
        CloseEvent  => "got_process_close",
    );
    $kernel->sig_child( $process->PID, "got_process_signal" );

    # Wheel events include the wheel's ID.
    $heap->{wheels}{ $process->ID } = $process;

    $self->log->info( sprintf '(%d) [%s] Process started', $process->PID, $self->name );

    $self->set_process( $process->PID => $process );
}

sub _remove_process {
    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
    my ($pid) = ( $self->get_pids )[0];
    return unless $pid;
    my $process = $self->delete_process($pid);
    $process->kill();
    $self->log->info( sprintf '(%d) [%s] Process killed', $process->PID, $self->name );
}

sub _on_process_stdout {
    my ( $self, $heap, $stdout, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ];
    my $process = $heap->{wheels}{$wid};
    my ( $attr, $value ) = $stdout =~ /^(\w+) (.*?)$/;
    return if !defined $attr || !defined $value;
    $self->$attr($value) if $self->can($attr);
}

sub _on_process_stderr {
    my ( $self, $heap, $stderr, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ];
    my $process = $heap->{wheels}{$wid};
    $self->log->info( sprintf '(%d) [%s] STDERR: %s', $process->PID, $self->name, $stderr );
}

sub _on_process_close {
    my ( $self, $heap, $wid ) = @_[ OBJECT, HEAP, ARG0 ];

    my $process = delete $heap->{wheels}{$wid};

    # May have been reaped by got_process_signal
    return unless defined $process;

    $self->delete_process( $process->PID );
}

sub _on_process_signal {
    my ( $self, $heap, $pid, $status ) = @_[ OBJECT, HEAP, ARG1 .. ARG2 ];

    my $process = $self->delete_process($pid);

    $self->log->info( sprintf '(%d) [%s] Exited with status %s', $pid, $self->name, $status );

    # May have been reaped by got_process_close
    return unless defined $process;

    delete $heap->{wheels}{ $process->ID };
}

no Moose;

__PACKAGE__->meta->make_immutable;

1;