| MooseX-Workers documentation | Contained in the MooseX-Workers distribution. |
MooseX::Workers::Engine - Provide the workhorse to MooseX::Workers
package MooseX::Workers;
has Engine => (
isa => 'MooseX::Workers::Engine',
is => 'ro',
lazy => 1,
required => 1,
default => sub { MooseX::Workers::Engine->new( visitor => $_[0] ) },
handles => [
qw(
max_workers
has_workers
num_workers
put_worker
kill_worker
)
],
);
MooseX::Workers::Engine provides the main functionality to MooseX::Workers. It wraps a POE::Session and as many POE::Wheel::Run objects as it needs.
Hold a reference to our main object so we can use the callbacks on it.
An Integer specifying the maximum number of workers we have.
An ArrayRef of POE::Wheel::Run objects that are our workers.
Contains the POE::Session that controls the workers.
Helper method to post events to our internal manager session.
Helper method to call events to our internal manager session. This is synchronous and will block incoming data from the children if it takes too long to return.
Set the worker at $key
Retrieve the worker at $key
Remove the worker atx $key
Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.
Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.
Check to see if we have a manager session.
Remove the manager session.
The Metaclass for MooseX::Workers::Engine see Moose's documentation.
Create a POE::Wheel::Run object to handle $command. If $command holds a scalar, it will be executed as exec($scalar). Shell metacharacters will be expanded in this form. If $command holds an array reference, it will executed as exec(@$array). This form of exec() doesn't expand shell metacharacters. If $command holds a code reference, it will be called in the forked child process, and then the child will exit.
See POE::Wheel::Run for more details.
MooseX::Worker::Engine fires the following callbacks to its visitor object:
Called when the managing session is started.
Called when the managing session stops.
Called when we reach the maximum number of workers.
Called when a child prints to STDOUT.
Called when a child prints to STDERR.
Called when there is an error condition detected with the child.
Called when a worker completes $command.
Called when a worker starts $command.
Called when the managing session receives a SIG CHLD event.
Called when the underlying POE Kernel receives a signal; this is not limited to OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept arbitrary POE signals (sent via POE::Kernel->signal), but does exclude SIGCHLD/SIGCHILD, which is instead handled by sig_child above.
These interface methods are automatically inserted when MooseX::Worker::Engine detects that the visitor object contains any methods beginning with sig_. Signals are case-sensitive, so if you wish to handle a TERM signal, you must define a sig_TERM() method. Note also that this action is performed upon MooseX::Worker::Engine startup, so any run-time modification of the visitor object is not likely to be detected.
| MooseX-Workers documentation | Contained in the MooseX-Workers distribution. |
package MooseX::Workers::Engine; use Moose; use POE qw(Wheel::Run); has visitor => ( is => 'ro', does => 'MooseX::Workers', ); has max_workers => ( isa => 'Int', is => 'rw', default => sub { 5 }, ); # Processes currently running has process_list => ( traits => [ 'Hash' ], isa => 'HashRef', default => sub { {} }, handles => { set_process => 'set', get_process => 'get', remove_process => 'delete', process_list => 'kv', } ); # Processes waiting to run has process_queue => ( traits => [ 'Array' ], isa => 'ArrayRef', default => sub { [] }, handles => { enqueue_process => 'push', dequeue_process => 'shift', process_queue => 'elements', } ); has workers => ( traits => [ 'Hash' ], isa => 'HashRef', is => 'rw', lazy => 1, required => 1, default => sub { {} }, handles => { set_worker => 'set', get_worker => 'get', remove_worker => 'delete', has_workers => 'count', num_workers => 'count', get_worker_ids => 'keys', }, ); has jobs => ( traits => [ 'Hash' ], isa => 'HashRef', is => 'rw', lazy => 1, required => 1, default => sub { {} }, handles => { set_job => 'set', get_job => 'get', remove_job => 'delete', has_jobs => 'count', num_jobs => 'count', }, ); has session => ( isa => 'POE::Session', is => 'ro', required => 1, lazy => 1, default => sub { POE::Session->create( object_states => [ $_[0] => [ qw( _start _stop _worker_stdout _worker_stderr _worker_error _worker_done _worker_started _sig_child add_worker _kill_worker ) ], ], ); }, clearer => 'remove_manager', predicate => 'has_manager', ); sub yield { my $self = shift; $poe_kernel->post( $self->session => @_ ); } sub call { my $self = shift; return $poe_kernel->call( $self->session => @_ ); } sub put_worker { my ( $self, $wheel_id ) = splice @_, 0, 2; $self->get_worker($wheel_id)->put(@_); } sub kill_worker { my ( $self, $wheel_id ) = splice @_, 0, 2; $self->get_worker($wheel_id)->kill(@_); $self->remove_worker($wheel_id); } sub stdout_filter { my $self = $_[OBJECT]; $self->visitor->stdout_filter; } sub stderr_filter { my $self = $_[OBJECT]; $self->visitor->stderr_filter; } # # EVENTS # sub add_worker { my ( $self, $job, $args, $kernel, $heap ) = @_[ OBJECT, ARG0, ARG1, KERNEL, HEAP ]; # if we've reached the worker threashold, set off a warning if ( $self->num_workers >= $self->max_workers ) { if ( $args->{enqueue} ) { $self->enqueue_process([$job, $args]); return; } else { $self->visitor->max_workers_reached($job); return; } } my $command; if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) { $command = $job->command; $args = $job->args; } else { $command = $job; } my @optional_io_filters; push @optional_io_filters, 'StdoutFilter', $self->stdout_filter if $self->stdout_filter; push @optional_io_filters, 'StderrFilter', $self->stderr_filter if $self->stderr_filter; $args = [$args] unless ref $args eq 'ARRAY'; my $wheel = POE::Wheel::Run->new( Program => $command, ProgramArgs => $args, @optional_io_filters, StdoutEvent => '_worker_stdout', StderrEvent => '_worker_stderr', ErrorEvent => '_worker_error', CloseEvent => '_worker_done', ); $kernel->sig_child($wheel->PID, "_sig_child"); $self->set_worker( $wheel->ID => $wheel ); $self->set_process( $wheel->PID => $wheel->ID ); if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) { $job->ID($wheel->ID); $job->PID($wheel->PID); $self->set_job( $wheel->ID => $job ); if ($job->timeout) { $heap->{wheel_to_timer}{$wheel->ID} = $kernel->delay_set('_kill_worker', $job->timeout, $wheel->ID); } } $self->yield( '_worker_started' => $wheel->ID => $job ); return ( $wheel->ID => $wheel->PID ); } sub _kill_worker { my ( $self, $wheel_id ) = @_[ OBJECT, ARG0 ]; my $job = $self->get_job($wheel_id); $self->visitor->worker_timeout( $job ) if $self->visitor->can('worker_timeout'); $self->get_worker($wheel_id)->kill; } sub _start { my ($self) = $_[OBJECT]; $self->visitor->worker_manager_start() if $self->visitor->can('worker_manager_start'); # Set an alias to ensure our manager session is not cleaned up. $_[KERNEL]->alias_set("manager"); # Register the generic signal handler for any signals our visitor # class wishes to receive. my @visitor_methods = map { $_->name } $self->visitor->meta->get_all_methods; for my $sig_handler (grep { /^sig_/ } @visitor_methods){ (my $sig) = ($sig_handler =~ /^sig_(.*)/); next if uc($sig) eq 'CHLD' or uc($sig) eq 'CHILD'; $poe_kernel->state( $sig_handler, $self, '_sig_handler' ); $poe_kernel->sig( $sig => $sig_handler ); } } sub _stop { my ($self) = $_[OBJECT]; $self->visitor->worker_manager_stop() if $self->visitor->can('worker_manager_stop'); $self->remove_manager; } sub _sig_child { my ($self) = $_[OBJECT]; $self->visitor->sig_child( $self->get_process($_[ARG1]), $_[ARG2] ) if $self->visitor->can('sig_child'); $self->remove_process( $_[ARG1] ); $_[KERNEL]->sig_handled(); } # A generic sig handler (for everything except SIGCHLD) sub _sig_handler { my ($self, $state) = @_[OBJECT,STATE]; $self->visitor->$state( @_[ARG0..ARG9] ); $_[KERNEL]->sig_handled(); } sub _worker_stdout { my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ]; my $job = $self->get_job($wheel_id); $self->visitor->worker_stdout( $input, $job || $wheel_id ) if $self->visitor->can('worker_stdout'); } sub _worker_stderr { my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ]; $wheel_id =~ tr[ -~][]cd; my $job = $self->get_job($wheel_id); $self->visitor->worker_stderr( $input, $job || $wheel_id ) if $self->visitor->can('worker_stderr'); } sub _worker_error { my ($self) = $_[OBJECT]; return if $_[ARG0] eq "read" && $_[ARG1] == 0; # $operation, $errnum, $errstr, $wheel_id $self->visitor->worker_error( @_[ ARG0 .. ARG3 ] ) if $self->visitor->can('worker_error'); } sub _worker_done { my ($self, $wheel_id, $kernel, $heap) = @_[ OBJECT, ARG0, KERNEL, HEAP ]; my $job = $self->get_job($wheel_id); $kernel->alarm_remove(delete $heap->{wheel_to_timer}{$wheel_id}) if $heap->{wheel_to_timer}{$wheel_id}; if ($self->visitor->can('worker_done')) { if ($job) { $self->visitor->worker_done( $job ); } else { $self->visitor->worker_done( $wheel_id ); } } $self->delete_worker( $wheel_id ); # If we have free workers and processes in queue, then dequeue one of them. while ( $self->num_workers < $self->max_workers && (my $jobref = $self->dequeue_process) ) { my ($cmd, $args) = @$jobref; # This has to be call(), not yield() so num_workers increments before # next loop above. $self->call(add_worker => $cmd, $args); } } sub delete_worker { my ( $self, $wheelID ) = @_; my $wheel = $self->get_worker($wheelID); $self->remove_worker( $wheel->ID ); } sub _worker_started { my ( $self, $wheel_id, $command ) = @_[ OBJECT, ARG0, ARG1 ]; my $job = $self->get_job($wheel_id); if ($self->visitor->can('worker_started')) { if ($job) { $self->visitor->worker_started( $job ) } else { $self->visitor->worker_started( $wheel_id, $command ) } } } no Moose; 1; __END__
1;