| Gearman-Driver documentation | Contained in the Gearman-Driver distribution. |
Gearman::Driver::Job - Handles the POE magic
This class is responsible for starting/stopping processes as well as handling all pipes (STDOUT/STDERR/STDIN) of the processes. All events are written to a logfile. Possible events are:
The current interface may only be interesting for people subclassing Gearman::Driver or for people writing commands/extensions for Gearman::Driver::Console.
Reference to the Gearman::Driver instance.
The job's name.
ArrayRef of Gearman::Driver::Job::Method objects.
Maximum number of concurrent processes this job may have.
Minimum number of concurrent processes this job may have.
This attribute stores a key/value pair containing:
$pid => $job
It provides following methods:
count_processes()delete_process($pid)get_process($pid)get_processes()get_pids()set_process($pid = $job)>Instance of Gearman::Driver::Adaptor.
Instance of POE::Session.
Each time this job is called it stores time() in this attribute.
Each time this job failed it stores time() in this attribute.
Each time this job failed it stores the error message in this attribute.
Reference to the worker object.
Starts/forks/adds another process of this job.
Removes/kills one process of this job.
See Gearman::Driver.
See Gearman::Driver.
| Gearman-Driver documentation | Contained in the Gearman-Driver distribution. |
package Gearman::Driver::Job; use Moose; use Gearman::Driver::Adaptor; use POE qw(Wheel::Run);
has 'driver' => ( handles => { log => 'log' }, is => 'rw', isa => 'Gearman::Driver', required => 1, weak_ref => 1, );
has 'name' => ( is => 'rw', isa => 'Str', required => 1, );
has 'methods' => ( is => 'rw', isa => 'ArrayRef[Gearman::Driver::Job::Method]', required => 1, );
has 'max_processes' => ( default => 1, is => 'rw', isa => 'Int', required => 1, );
has 'min_processes' => ( default => 1, is => 'rw', isa => 'Int', required => 1, );
has 'processes' => ( default => sub { {} }, handles => { count_processes => 'count', delete_process => 'delete', get_process => 'get', get_processes => 'values', get_pids => 'keys', set_process => 'set', }, is => 'ro', isa => 'HashRef', traits => [qw(Hash)], );
has 'gearman' => ( is => 'ro', isa => 'Gearman::Driver::Adaptor', );
has 'session' => ( is => 'ro', isa => 'POE::Session', );
has 'lastrun' => ( default => 0, is => 'rw', isa => 'Int', );
has 'lasterror' => ( default => 0, is => 'rw', isa => 'Int', );
has 'lasterror_msg' => ( default => '', is => 'rw', isa => 'Str', );
has 'worker' => ( is => 'rw', isa => 'Any', required => 1, );
sub add_process { my ($self) = @_; POE::Kernel->post( $self->session => 'add_process' ); }
sub remove_process { my ($self) = @_; POE::Kernel->post( $self->session => 'remove_process' ); } sub BUILD { my ($self) = @_; $self->{gearman} = Gearman::Driver::Adaptor->new( server => $self->driver->server ); foreach my $method ( @{ $self->methods } ) { $self->gearman->add_function( $method->name => $method->wrapper ); } $self->{session} = POE::Session->create( object_states => [ $self => { _start => '_start', got_process_stdout => '_on_process_stdout', got_process_stderr => '_on_process_stderr', got_process_close => '_on_process_close', got_process_signal => '_on_process_signal', add_process => '_add_process', remove_process => '_remove_process', } ] ); } sub _start { $_[KERNEL]->alias_set( $_[OBJECT]->name ); } sub _add_process { my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; my $process = POE::Wheel::Run->new( Program => sub { POE::Kernel->stop(); if ( my $process_name = $self->worker->process_name( $0, $self->name ) ) { $0 = $process_name; } $self->gearman->work; }, StdoutEvent => "got_process_stdout", StderrEvent => "got_process_stderr", CloseEvent => "got_process_close", ); $kernel->sig_child( $process->PID, "got_process_signal" ); # Wheel events include the wheel's ID. $heap->{wheels}{ $process->ID } = $process; $self->log->info( sprintf '(%d) [%s] Process started', $process->PID, $self->name ); $self->set_process( $process->PID => $process ); } sub _remove_process { my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; my ($pid) = ( $self->get_pids )[0]; return unless $pid; my $process = $self->delete_process($pid); $process->kill(); $self->log->info( sprintf '(%d) [%s] Process killed', $process->PID, $self->name ); } sub _on_process_stdout { my ( $self, $heap, $stdout, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ]; my $process = $heap->{wheels}{$wid}; my ( $attr, $value ) = $stdout =~ /^(\w+) (.*?)$/; return if !defined $attr || !defined $value; $self->$attr($value) if $self->can($attr); } sub _on_process_stderr { my ( $self, $heap, $stderr, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ]; my $process = $heap->{wheels}{$wid}; $self->log->info( sprintf '(%d) [%s] STDERR: %s', $process->PID, $self->name, $stderr ); } sub _on_process_close { my ( $self, $heap, $wid ) = @_[ OBJECT, HEAP, ARG0 ]; my $process = delete $heap->{wheels}{$wid}; # May have been reaped by got_process_signal return unless defined $process; $self->delete_process( $process->PID ); } sub _on_process_signal { my ( $self, $heap, $pid, $status ) = @_[ OBJECT, HEAP, ARG1 .. ARG2 ]; my $process = $self->delete_process($pid); $self->log->info( sprintf '(%d) [%s] Exited with status %s', $pid, $self->name, $status ); # May have been reaped by got_process_close return unless defined $process; delete $heap->{wheels}{ $process->ID }; } no Moose; __PACKAGE__->meta->make_immutable;
1;