POEx::WorkerPool::Role::WorkerPool::Worker::Guts - A role that provides common semantics for Worker guts


POEx-WorkerPool documentation Contained in the POEx-WorkerPool distribution.

Index


Code Index:

NAME

Top

POEx::WorkerPool::Role::WorkerPool::Worker::Guts - A role that provides common semantics for Worker guts

VERSION

Top

version 1.102740

PUBLIC_METHODS

Top

process_job

 (DoesJob $job) is Event

process_job takes the initialized job and calls ->execute_step on the job. If there is more than one step, another process_job will be queued up via POE with the same job as the argument. Each step along the way returns a JobStatus which is then sent on to send_message which communicates with the parent process

send_message

 (JobStatus $status) is Event

send_messge communicates with the parent process each JobStatus it receives.

PROTECTED_METHODS

Top

after _start

 is Event

_start is advised to buid the communication wheel back to the parent process and also register a signal handler for DIE so we can communicate exceptions back to the parent

init_job

 (DoesJob $job, WheelID $wheel) is Event

init_job is the InputEvent on the ReadWrite wheel that accepts input from the parent process. It attempts to call ->init_job on the job it receives. If that is successful it will then proceed on to send a return message of PXWP_JOB_START and yield to process_job()

die_signal

 (Str $signal, HashRef $stuff) is Event

die_signal is our signal handler if something unexpected happens.

AUTHOR

Top

Nicholas R. Perez <nperez@cpan.org>

COPYRIGHT AND LICENSE

Top


POEx-WorkerPool documentation Contained in the POEx-WorkerPool distribution.

package POEx::WorkerPool::Role::WorkerPool::Worker::Guts;
BEGIN {
  $POEx::WorkerPool::Role::WorkerPool::Worker::Guts::VERSION = '1.102740';
}

#ABSTRACT: A role that provides common semantics for Worker guts

use MooseX::Declare;

role POEx::WorkerPool::Role::WorkerPool::Worker::Guts {
    with 'POEx::Role::SessionInstantiation';
    
    use Try::Tiny;
    
    use POEx::Types(':all');
    use POEx::WorkerPool::Types(':all');
    use MooseX::Types::Moose(':all');
    
    use Data::UUID;
    use POE::Filter::Reference;
    use POE::Wheel::ReadWrite;
    
    use POEx::WorkerPool::Error::JobError;

    use POEx::WorkerPool::WorkerEvents(':all');

    use aliased 'POEx::Role::Event';
    use aliased 'POEx::WorkerPool::Error::JobError';
    
    # As of POE v1.266 Wheel::ReadWrite does not subclass from POE::Wheel. bleh
    has host => ( is => 'rw', isa => Object );


    after _start is Event {
        my $wheel = POE::Wheel::ReadWrite->new
        (   
            'InputHandle'   => \*STDIN,
            'OutputHandle'  => \*STDOUT,
            'Filter'        => POE::Filter::Reference->new(),
            'InputEvent'    => 'init_job',
        );

        $self->host($wheel);
        $self->poe->kernel->sig( 'DIE' => 'die_signal');
    }


    method init_job(DoesJob $job, WheelID $wheel) is Event {
        try {
            $job->init_job();
            $self->yield('send_message', { ID => $job->ID, type => +PXWP_JOB_START, msg => \time() });
            $self->yield('process_job', $job);
            return $job;
        }
        catch {
            my $err = $_;
            $self->call($self, 'send_message', { ID => $job->ID, type => +PXWP_JOB_FAILED, msg => \$err, } );
        }
    }


    method process_job(DoesJob $job) is Event {
        try {
            my $status = $job->execute_step();
            die "No Status" if not is_JobStatus($status);
            $self->yield('send_message', $status);
            
            if($job->count_steps > 0) {
                $self->yield('process_job', $job);
            }

            return $status;
        }
        catch {
            my $err = $_;

            if(is_IsaError($err))
            {
                $self->call($self, 'send_message', $err->job_status);
            }
            else
            {
                $self->call($self, 'send_message', { ID => $job->ID, type => +PXWP_JOB_FAILED, msg => \$err } );
            }
        }
    }


    method send_message(JobStatus $status) is Event {
        if(!defined($self->host)) {
            die "Unable to communicate with the host";
        }

        $self->host()->put($status);
    }


    method die_signal(Str $signal, HashRef $stuff) is Event {
        $self->call($self, 'send_message', { ID => 0, type => +PXWP_WORKER_INTERNAL_ERROR, msg => $stuff });
    }
}

1;



__END__