Parallel::SubArray - Execute forked subref array and join return values, timeouts and error captures.


Parallel-SubArray documentation Contained in the Parallel-SubArray distribution.

Index


Code Index:

NAME

Top

Parallel::SubArray - Execute forked subref array and join return values, timeouts and error captures.

SYNOPSIS

Top

  use Parallel::SubArray 'par';

  my $sub_arrayref = [
    sub{ sleep(1); [1]  }, # simple structure
    sub{ bless {}, 'a'  }, # blessed structure
    sub{ while(1){$_++} }, # runaway routine
    sub{ die 'TEST'     }, # bad code
    sub{ par()->([         # nested parallelism
           sub{ sleep(1);  [1]  },
           sub{ sleep(2); {2,3} },
         ]),
       },
  ];

  my($result_arrayref, $error_arrayref) = par(3)->($sub_arrayref);
  ## or you can ignore errors
  # my $result_arrayref = par(3)->($sub_arrayref);

  $result_arrayref == [
    [ 1 ],
    {}, # blessed into 'a'
    undef,
    undef,
    [ [1], { 2 => 3 } ]
  ];

  $error_arrayref == [
    undef,
    undef,
    'TIMEOUT',
    # can capture normal blessed exceptions aswell
    'TEST at some file on some line',
    undef,
  ];

DESCRIPTION

Top

I want fast, safe, and simple parallelism. Current offerings did not satisfy me. Most are not enough while remaining are too complex or tedious. Palallel::SubArray scratches my itch: forking, joining, timeouts, return values, and error handling done simply.

EXPORTS

Top

Nothing automatically. I don't like magic.

par

Takes one argument that represents timeout in seconds and evaluates into a subref that will execute subarrayref in parallel returning resultarrayref in scalar context or resultarrayref and errorarrayref in list context.

Timeout can be undef or zero. In this case timeout is disabled and you might never join forks.

When nesting parallelism, keep in mind the outer timeout. Because if inner par is cut off by the timeout of the outer, you will not be able to recover return nor error values of finished inner processes as the whole inner par is considered to be a runaway process.

par can die if unable to fork.

SEE ALSO

Top

perlipc "fork" in perlfunc perlfork subs::parallel Parallel::Simple Parallel::Queue Parallel::Forker Parallel::Jobs Parallel::Workers Parallel::SubFork Parallel::Pvm Parallel::Performing Parallel::Fork::BossWorker Parallel::ForkControl Proc::Fork

BUGS

Top

Expect lots if Windows and/or OOP is used. Windows because of the lack of forking and OOP because it requires locking. This module is designed functionally and is relying on copy-on-write forking.

The joining mechanism of this module can be incompatible with other forking modules because it's waiting for child processes to finish. Nesting of par works as expected.

Subroutines passed to par that return anything other than references to simple Perl structures may behave unexpectedly. Joining relies on Storable.

AUTHOR

Top

Eugene Grigoriev,

  let a b c d e f g  = concat [e,f,d,g,c,f,b] in
      a "com" "gmail" "grigoriev" "eugene" "." "@"

LICENSE

Top

BSD


Parallel-SubArray documentation Contained in the Parallel-SubArray distribution.

package Parallel::SubArray;
require v5.8.6;
our $VERSION = 0.6;
use strict;
use Storable qw(store_fd fd_retrieve);
use Exporter 'import';
our @EXPORT_OK = qw(par);

sub par {
  my( $timeout ) = @_;
  sub {
    my @subs = @{ shift(@_) };
    my %rets;
    my $c = 0;
    for my $sub ( @subs ) {
      $c++;
      my( $parent_w, $child_r );
      pipe( $child_r, $parent_w );
      select((select($parent_w ), $| = 1)[0]);
      if( my $pid = fork ) {
	close $parent_w;
	$rets{ $pid } = { fd  => $child_r,
			  ord => $c
			};
      } else {
	die "Cannot fork: $!" unless defined $pid;
	close $child_r;
	my $exit = sub {
	  my( $save_val, $exit_val ) = @_;
	  store_fd $save_val, $parent_w;
	  close $parent_w;
	  exit $exit_val;
	};
	local $SIG{ALRM} = sub { $exit->( ['TIMEOUT'], 1 ) };
	alarm( $timeout || 0 );
	my $ret = eval{ $sub->() };
	my $err = $@;
	alarm( 0 );
	$exit->( [$err], 1 ) if $err;
	$exit->(  $ret , 0 );
      }
    }
    while(1) {
      my $pid = wait();
      my $err = $?;
      last if( $pid == -1 or not @subs );
      next if not exists $rets{ $pid };
      if( $err ) {
	$rets{ $pid }->{err} = fd_retrieve( $rets{ $pid }->{fd} )->[0];
      } else {
	$rets{ $pid }->{val} = fd_retrieve( $rets{ $pid }->{fd} );
      }
      close $rets{ $pid }->{fd};
      pop @subs;
    }
    my $r = sub {
      my( $key ) = @_;
      # can be optimized
      [ map  { $rets{$_}->{ $key } }
	sort { $rets{$a}->{ord} <=> $rets{$b}->{ord} }
	keys %rets
      ];
    };
    return wantarray ? ( $r->('val'), $r->('err') ) : $r->('val');
  }
}

1;
__END__