| Distributed-Process documentation | Contained in the Distributed-Process distribution. |
Distributed::Process::RemoteWorker - a class to control from the server side a worker object running on the client side.
The following list describes the attributes of this class. They must only be accessed through their accessors. When called with an argument, the accessor methods set their attribute's value to that argument and return its former value. When called without arguments, they return the current value.
Cédric Bouvier, <cbouvi@cpan.org>
Please report any bugs or feature requests to
bug-distributed-process@rt.cpan.org, or through the web interface at
http://rt.cpan.org. I will be notified, and then you'll automatically
be notified of progress on your bug as I make changes.
Copyright 2005 Cédric Bouvier, All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| Distributed-Process documentation | Contained in the Distributed-Process distribution. |
package Distributed::Process::RemoteWorker; use strict; use warnings;
use Distributed::Process; import Distributed::Process; use threads; use Thread::Queue; use IO::Select; use Distributed::Process::Interface; use Distributed::Process::BaseWorker; our @ISA = qw/ Distributed::Process::BaseWorker Distributed::Process::Interface /; sub new { my $class = shift; my $self = $class->SUPER::new(@_); $self->ignore_queue(); $self; } sub out_handle { my $self = shift; $self->in_handle(@_); }
sub go_remote { my $self = shift; no strict 'refs'; no warnings 'redefine'; my $package = ref($self) || $self; *{$package . '::run'} = *run; } sub is_ready { my $self = shift; return defined($self->id()); } sub get_id { my $self = shift; $self->id((split /\s+/, ($self->wait_for_pattern(qr|^/worker|))[-1])[-1]); } sub available_for_reading { my $self = shift; return 1 if $self->is_ignoring_queue(); my $s = new IO::Select $self->in_handle(); while ( 1 ) { return 1 if $s->can_read($self->timeout() || .1); return 0 if $self->in_queue()->pending(); } } sub ignore_queue { shift->{_ignore_queue} = 1 } sub heed_queue { shift->{_ignore_queue} = 0 } sub is_ignoring_queue { shift->{_ignore_queue} } sub is_heeding_queue { !(shift->{_ignore_queue}) } sub run { my $self = shift; async { while ( 1 ) { # my $msg = $self->in_queue()->dequeue(); # die "Unexpected order from master" unless $msg eq '/run'; # $self->send('/run'); while ( 1 ) { $self->heed_queue(); my @res = $self->wait_for_pattern(qr{^/(?:run_method|synchro|run_done|delay)}); if ( @res ) { my ($command, @arg) = split /\s+/, $res[0]; for ( $command ) { $_ eq '/run_method' and do { my $method = shift @arg; my @r = $self->$method(@arg); $self->send('/begin_method_result', @r, 'ok'); last; }; $_ eq '/synchro' || $_ eq '/delay' and do { $self->out_queue()->enqueue($res[0]); $self->in_queue()->dequeue(); $self->send($res[0]); last; }; $_ eq '/run_done' and do { $self->out_queue()->enqueue('/run_done'); 1 until $self->in_queue()->pending(); last; }; } } else { my $cmd = $self->in_queue()->dequeue(); $cmd eq '/run' and $self->send('/run'); $cmd eq '/reset' and $self->send('/reset'); $cmd eq '/quit' and $self->send('/quit'), return; } } } }->detach(); } sub result { my $self = shift; $self->send('/get_results'); $self->ignore_queue(); $self->wait_for_pattern(qr{^/begin_result}); my @result = $self->wait_for_pattern(qr/^ok$/); pop @result; return @result; }
foreach my $method ( qw/ master in_queue out_queue timeout / ) { no strict 'refs'; *$method = sub { my $self = shift; my $old = $self->{"_$method"}; $self->{"_$method"} = $_[0] if @_; return $old; }; }
1; # End of Distributed::Process::RemoteWorker