MooseX::Workers::Engine - Provide the workhorse to MooseX::Workers


MooseX-Workers documentation Contained in the MooseX-Workers distribution.

Index


Code Index:

NAME

Top

MooseX::Workers::Engine - Provide the workhorse to MooseX::Workers

SYNOPSIS

Top

    package MooseX::Workers;

    has Engine => (
        isa      => 'MooseX::Workers::Engine',
        is       => 'ro',
        lazy     => 1,
        required => 1,
        default  => sub { MooseX::Workers::Engine->new( visitor => $_[0] ) },
        handles  => [
            qw(
              max_workers
              has_workers
              num_workers
              put_worker
              kill_worker
              )
        ],
    );

DESCRIPTION

Top

MooseX::Workers::Engine provides the main functionality to MooseX::Workers. It wraps a POE::Session and as many POE::Wheel::Run objects as it needs.

ATTRIBUTES

Top

visitor

Hold a reference to our main object so we can use the callbacks on it.

max_workers

An Integer specifying the maximum number of workers we have.

workers

An ArrayRef of POE::Wheel::Run objects that are our workers.

session

Contains the POE::Session that controls the workers.

METHODS

Top

yield

Helper method to post events to our internal manager session.

call

Helper method to call events to our internal manager session. This is synchronous and will block incoming data from the children if it takes too long to return.

set_worker($key)

Set the worker at $key

get_worker($key)

Retrieve the worker at $key

delete_worker($key)

Remove the worker atx $key

has_workers

Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.

num_workers

Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.

has_manager

Check to see if we have a manager session.

remove_manager

Remove the manager session.

meta

The Metaclass for MooseX::Workers::Engine see Moose's documentation.

EVENTS

Top

add_worker ($command)

Create a POE::Wheel::Run object to handle $command. If $command holds a scalar, it will be executed as exec($scalar). Shell metacharacters will be expanded in this form. If $command holds an array reference, it will executed as exec(@$array). This form of exec() doesn't expand shell metacharacters. If $command holds a code reference, it will be called in the forked child process, and then the child will exit.

See POE::Wheel::Run for more details.

INTERFACE

Top

MooseX::Worker::Engine fires the following callbacks to its visitor object:

worker_manager_start

Called when the managing session is started.

worker_manager_stop

Called when the managing session stops.

max_workers_reached

Called when we reach the maximum number of workers.

worker_stdout

Called when a child prints to STDOUT.

worker_stderr

Called when a child prints to STDERR.

worker_error

Called when there is an error condition detected with the child.

worker_done

Called when a worker completes $command.

worker_started

Called when a worker starts $command.

sig_child($PID, $ret)

Called when the managing session receives a SIG CHLD event.

sig_*

Called when the underlying POE Kernel receives a signal; this is not limited to OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept arbitrary POE signals (sent via POE::Kernel->signal), but does exclude SIGCHLD/SIGCHILD, which is instead handled by sig_child above.

These interface methods are automatically inserted when MooseX::Worker::Engine detects that the visitor object contains any methods beginning with sig_. Signals are case-sensitive, so if you wish to handle a TERM signal, you must define a sig_TERM() method. Note also that this action is performed upon MooseX::Worker::Engine startup, so any run-time modification of the visitor object is not likely to be detected.


MooseX-Workers documentation Contained in the MooseX-Workers distribution.

package MooseX::Workers::Engine;
use Moose;
use POE qw(Wheel::Run);

has visitor => (
    is       => 'ro',
    does     => 'MooseX::Workers',
);

has max_workers => (
    isa     => 'Int',
    is      => 'rw',
    default => sub { 5 },
);

# Processes currently running
has process_list => (
    traits     => [ 'Hash' ],
    isa        => 'HashRef',
    default    => sub { {} },
    handles    => {
        set_process    => 'set',
        get_process    => 'get',
        remove_process => 'delete',
        process_list   => 'kv',
    }
);

# Processes waiting to run
has process_queue => (
    traits     => [ 'Array' ],
    isa        => 'ArrayRef',
    default    => sub { [] },
    handles    => {
        enqueue_process => 'push',
        dequeue_process => 'shift',
        process_queue   => 'elements',
    }
);

has workers => (
    traits    => [ 'Hash' ],
    isa       => 'HashRef',
    is        => 'rw',
    lazy      => 1,
    required  => 1,
    default   => sub { {} },
    handles   => {
        set_worker     => 'set',
        get_worker     => 'get',
        remove_worker  => 'delete',
        has_workers    => 'count',
        num_workers    => 'count',
        get_worker_ids => 'keys',
    },
);

has jobs => (
    traits    => [ 'Hash' ],
    isa       => 'HashRef',
    is        => 'rw',
    lazy      => 1,
    required  => 1,
    default   => sub { {} },
    handles   => {
        set_job    => 'set',
        get_job    => 'get',
        remove_job => 'delete',
        has_jobs   => 'count',
        num_jobs   => 'count',
    },
);

has session => (
    isa      => 'POE::Session',
    is       => 'ro',
    required => 1,
    lazy     => 1,
    default  => sub {
        POE::Session->create(
            object_states => [
                $_[0] => [
                    qw(
                      _start
                      _stop
                      _worker_stdout
                      _worker_stderr
                      _worker_error
                      _worker_done
                      _worker_started
                      _sig_child
                      add_worker
                      _kill_worker
                      )
                ],
            ],
        );
    },
    clearer   => 'remove_manager',
    predicate => 'has_manager',
);

sub yield {
    my $self = shift;
    $poe_kernel->post( $self->session => @_ );
}

sub call {
    my $self = shift;
    return $poe_kernel->call( $self->session => @_ );
}

sub put_worker {
    my ( $self, $wheel_id ) = splice @_, 0, 2;
    $self->get_worker($wheel_id)->put(@_);
}

sub kill_worker {
    my ( $self, $wheel_id ) = splice @_, 0, 2;
    $self->get_worker($wheel_id)->kill(@_);
    $self->remove_worker($wheel_id);
}

sub stdout_filter {
	my $self = $_[OBJECT];
	$self->visitor->stdout_filter;
}

sub stderr_filter {
	my $self = $_[OBJECT];
	$self->visitor->stderr_filter;
}

#
# EVENTS
#

sub add_worker {
    my ( $self, $job, $args, $kernel, $heap ) = @_[ OBJECT, ARG0, ARG1, KERNEL, HEAP ];

    # if we've reached the worker threashold, set off a warning
    if ( $self->num_workers >= $self->max_workers ) {
        if ( $args->{enqueue} ) {
            $self->enqueue_process([$job, $args]);
            return;
        } else {
            $self->visitor->max_workers_reached($job);
            return;
        }
    }

    my $command;
    if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) {
        $command = $job->command;
        $args = $job->args;
    }
    else {
        $command = $job;
    }

	my @optional_io_filters;
	push @optional_io_filters, 'StdoutFilter', $self->stdout_filter   if $self->stdout_filter;
	push @optional_io_filters, 'StderrFilter', $self->stderr_filter   if $self->stderr_filter;
	
    $args = [$args] unless ref $args eq 'ARRAY';

    my $wheel = POE::Wheel::Run->new(
        Program     => $command,
        ProgramArgs => $args,
		@optional_io_filters,
        StdoutEvent => '_worker_stdout',
        StderrEvent => '_worker_stderr',
        ErrorEvent  => '_worker_error',
        CloseEvent  => '_worker_done',
    );
    $kernel->sig_child($wheel->PID, "_sig_child");

    $self->set_worker( $wheel->ID => $wheel );
    $self->set_process( $wheel->PID => $wheel->ID );
    if ( blessed($job) && $job->isa('MooseX::Workers::Job') ) {
       $job->ID($wheel->ID);
       $job->PID($wheel->PID);
       $self->set_job( $wheel->ID => $job );
       if ($job->timeout) {
          $heap->{wheel_to_timer}{$wheel->ID} =
             $kernel->delay_set('_kill_worker', $job->timeout, $wheel->ID);
       }
    } 
    $self->yield( '_worker_started' => $wheel->ID => $job );
    return ( $wheel->ID => $wheel->PID );
}

sub _kill_worker {
    my ( $self, $wheel_id ) = @_[ OBJECT, ARG0 ];
    my $job = $self->get_job($wheel_id);
    $self->visitor->worker_timeout( $job )
      if $self->visitor->can('worker_timeout');
    $self->get_worker($wheel_id)->kill;
}

sub _start {
    my ($self) = $_[OBJECT];
    $self->visitor->worker_manager_start()
      if $self->visitor->can('worker_manager_start');

    # Set an alias to ensure our manager session is not cleaned up.
    $_[KERNEL]->alias_set("manager");

    # Register the generic signal handler for any signals our visitor
    # class wishes to receive.
    my @visitor_methods = map { $_->name } $self->visitor->meta->get_all_methods;
    for my $sig_handler (grep { /^sig_/ } @visitor_methods){
        (my $sig) = ($sig_handler =~ /^sig_(.*)/);
        next if uc($sig) eq 'CHLD' or uc($sig) eq 'CHILD';

        $poe_kernel->state( $sig_handler, $self, '_sig_handler' );
        $poe_kernel->sig( $sig => $sig_handler );
    }
}

sub _stop {
    my ($self) = $_[OBJECT];
    $self->visitor->worker_manager_stop()
      if $self->visitor->can('worker_manager_stop');
    $self->remove_manager;
}

sub _sig_child {
    my ($self) = $_[OBJECT];
    $self->visitor->sig_child( $self->get_process($_[ARG1]), $_[ARG2] )
      if $self->visitor->can('sig_child');
    $self->remove_process( $_[ARG1] );
    $_[KERNEL]->sig_handled();
}

# A generic sig handler (for everything except SIGCHLD)
sub _sig_handler {
    my ($self, $state) = @_[OBJECT,STATE];
    $self->visitor->$state( @_[ARG0..ARG9] );
    $_[KERNEL]->sig_handled();
}

sub _worker_stdout {
    my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
    my $job = $self->get_job($wheel_id);
    $self->visitor->worker_stdout( $input, $job || $wheel_id )
      if $self->visitor->can('worker_stdout');
}

sub _worker_stderr {
    my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
    $wheel_id =~ tr[ -~][]cd;
    my $job = $self->get_job($wheel_id);
    $self->visitor->worker_stderr( $input, $job || $wheel_id )
      if $self->visitor->can('worker_stderr');
}

sub _worker_error {
    my ($self) = $_[OBJECT];
    return if $_[ARG0] eq "read" && $_[ARG1] == 0;

    # $operation, $errnum, $errstr, $wheel_id
    $self->visitor->worker_error( @_[ ARG0 .. ARG3 ] )
      if $self->visitor->can('worker_error');
}

sub _worker_done {
    my ($self, $wheel_id, $kernel, $heap) = @_[ OBJECT, ARG0, KERNEL, HEAP ];
    my $job = $self->get_job($wheel_id);
    $kernel->alarm_remove(delete $heap->{wheel_to_timer}{$wheel_id}) if $heap->{wheel_to_timer}{$wheel_id};
    if ($self->visitor->can('worker_done')) {
        if ($job) {
            $self->visitor->worker_done( $job );
        } else {
            $self->visitor->worker_done( $wheel_id );
        }
    }
    $self->delete_worker( $wheel_id );

    # If we have free workers and processes in queue, then dequeue one of them.
    while ( $self->num_workers < $self->max_workers && 
            (my $jobref = $self->dequeue_process)
    ) {
        my ($cmd, $args) = @$jobref;
        # This has to be call(), not yield() so num_workers increments before
        # next loop above.
        $self->call(add_worker => $cmd, $args);
    }
}

sub delete_worker {
    my ( $self, $wheelID ) = @_;
    my $wheel = $self->get_worker($wheelID);
    $self->remove_worker( $wheel->ID );
}

sub _worker_started {
    my ( $self, $wheel_id, $command ) = @_[ OBJECT, ARG0, ARG1 ];
    my $job = $self->get_job($wheel_id);
    if ($self->visitor->can('worker_started')) {
        if ($job) {
            $self->visitor->worker_started( $job )
        } else {
            $self->visitor->worker_started( $wheel_id, $command )
        }
    }
}


no Moose;
1;
__END__

1;