/usr/local/CPAN/Forks-Super/Forks/Super/Wait.pm
#
# Forks::Super::Wait - implementation of Forks::Super:: wait, waitpid,
# and waitall methods
#
package Forks::Super::Wait;
use Forks::Super::Job;
use Forks::Super::Util qw(is_number isValidPid pause);
use Forks::Super::Debug qw(:all);
use Forks::Super::Config;
use Forks::Super::Queue;
use Forks::Super::Tie::Enum;
use POSIX ':sys_wait_h';
use Exporter;
use Carp;
use strict;
use warnings;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(wait waitpid waitall TIMEOUT WREAP_BG_OK);
our %EXPORT_TAGS = (all => \@EXPORT_OK);
our $VERSION = '0.52';
our ($productive_pause_code, $productive_waitpid_code);
tie our $WAIT_ACTION_ON_SUSPENDED_JOBS,
'Forks::Super::Tie::Enum', qw(wait fail resume);
sub set_productive_pause_code (&) {
$productive_pause_code = shift;
return;
}
sub set_productive_waitpid_code (&) {
$productive_waitpid_code = shift;
return;
}
use constant TIMEOUT => -1.5;
use constant ONLY_SUSPENDED_JOBS_LEFT => -1.75;
use constant WREAP_BG_OK => WNOHANG() << 1;
sub wait {
my $timeout = shift || 0;
$timeout = 1E-6 if $timeout < 0;
debug("invoked Forks::Super::wait") if $DEBUG;
return Forks::Super::Wait::waitpid(-1, 0, $timeout);
}
sub waitpid {
my ($target,$flags,$timeout,@dummy) = @_;
$productive_waitpid_code->() if $productive_waitpid_code;
$timeout = 0 if !defined $timeout;
$timeout = 1E-6 if $timeout < 0;
if (@dummy > 0) {
carp "Forks::Super::waitpid: Too many arguments\n";
}
if (not defined $flags) {
carp "Forks::Super::waitpid: Not enough arguments\n";
$flags = 0;
}
# waitpid:
# -1: wait on any process
# t>0: wait on process #t
# 0: wait on any process in current process group
# -t: wait on any process in process group #t
# return -1 if there are no eligible procs to wait for
my $no_hang = ($flags & WNOHANG) != 0;
my $reap_bg_ok = $flags == WREAP_BG_OK;
if (is_number($target) && $target == -1) {
return _waitpid_any($no_hang, $reap_bg_ok, $timeout);
}
if (defined $ALL_JOBS{$target}) {
return _waitpid_target($no_hang, $reap_bg_ok, $target, $timeout);
}
if (0 < (my @wantarray = Forks::Super::Job::getByName($target))) {
return _waitpid_name($no_hang, $reap_bg_ok, $target, $timeout);
}
if (!is_number($target)) {
return -1;
}
if ($target > 0) {
# invalid pid
return -1;
}
if ($Forks::Super::SysInfo::CONFIG{'getpgrp'}) {
if ($target == 0) {
unless (eval { $target = getpgrp(0) } ) {
$target = -$$;
}
} else {
$target = -$target;
}
return _waitpid_pgrp($no_hang, $reap_bg_ok, $target, $timeout);
} else {
return -1;
}
}
sub waitall {
my $timeout = shift || 9E9; # 285 years should be long enough to wait
$timeout = 1E-6 if $timeout < 0;
my $waited_for = 0;
my $expire = Time::HiRes::time() + $timeout ;
debug("Forks::Super::waitall(): waiting on all procs") if $DEBUG;
my $pid;
do {
# $productive_waitpid_code->() if $productive_waitpid_code;
$pid = Forks::Super::Wait::wait($expire - Time::HiRes::time());
if ($DEBUG) {
debug("Forks::Super::waitall: caught pid $pid");
}
} while isValidPid($pid,1)
&& ++$waited_for
&& Time::HiRes::time() < $expire;
return $waited_for;
}
# is return value from _reap/waitpid/wait a simple scalar or an
# overloaded Forks::Super::Job object?
our $OVERLOAD_RETURN;
sub _reap_return {
my ($job) = @_;
if (!defined $OVERLOAD_RETURN) {
$OVERLOAD_RETURN = $Forks::Super::Job::OVERLOAD_ENABLED;
}
# return $OVERLOAD_RETURN ? $job : $job->{real_pid};
my $pid = $job->{real_pid};
return $OVERLOAD_RETURN ? Forks::Super::Job::get($pid) : $pid;
}
#
# The handle_CHLD() subroutine takes care of reaping
# processes from the operating system. This method's
# part of the relay is taking the reaped process
# and updating the job's state.
#
# Optionally takes a process group ID to reap processes
# from that specific group.
#
# return the process id of the job that was reaped, or
# -1 if no eligible jobs were reaped. In wantarray mode,
# return the number of eligible processes (state == ACTIVE
# or state == COMPLETE or STATE == SUSPENDED) that were
# not reaped.
#
sub _reap {
my ($reap_bg_ok, $optional_pgid) = @_; # to reap procs from specific group
$productive_waitpid_code->() if $productive_waitpid_code;
Forks::Super::Sigchld::handle_bastards();
my @j = @ALL_JOBS;
if (defined $optional_pgid) {
@j = grep { $_->{pgid} == $optional_pgid } @ALL_JOBS;
}
# see if any jobs are complete (signaled the SIGCHLD handler)
# but have not been reaped.
my @waiting = grep { $_->{state} eq 'COMPLETE' } @j;
if (!$reap_bg_ok) {
@waiting = grep { $_->{_is_bg} == 0 } @waiting;
}
debug('Forks::Super::_reap(): found ', scalar @waiting,
' complete & unreaped processes') if $DEBUG;
if (@waiting > 0) {
@waiting = sort { $a->{end} <=> $b->{end} } @waiting;
my $job = shift @waiting;
my $real_pid = $job->{real_pid};
my $pid = $job->{pid};
if ($job->{debug}) {
debug("Forks::Super::_reap(): reaping $pid/$real_pid.");
}
if (not wantarray) {
return _reap_return($job);
}
# return $real_pid if not wantarray;
my ($nactive1, $nalive, $nactive2)
= Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);
debug("Forks::Super::_reap(): $nalive remain.") if $DEBUG;
$job->_mark_reaped;
return (_reap_return($job), $nactive1, $nalive, $nactive2);
}
# the failure to reap active jobs may occur because the jobs are still
# running, or it may occur because the relevant signals arrived at a
# time when the signal handler was overwhelmed
my ($nactive1, $nalive, $nactive2)
= Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);
return -1 if not wantarray;
if ($DEBUG) {
debug('Forks::Super::_reap(): nothing to reap now. ',
"$nactive1 remain.");
}
return (-1, $nactive1, $nalive, $nactive2);
}
# wait on any process
sub _waitpid_any {
my ($no_hang,$reap_bg_ok,$timeout) = @_;
my $expire = Time::HiRes::time() + ($timeout || 9E9);
my ($pid, $nactive2, $nalive, $nactive) = _reap($reap_bg_ok);
unless ($no_hang) {
while (!isValidPid($pid,1) && $nalive > 0) {
if (Time::HiRes::time() >= $expire) {
return TIMEOUT;
}
if ($nactive == 0) {
if ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'fail') {
return ONLY_SUSPENDED_JOBS_LEFT;
} elsif ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'resume') {
_active_one_suspended_job($reap_bg_ok);
}
}
pause();
($pid, $nactive2, $nalive, $nactive) = _reap($reap_bg_ok);
}
}
if (defined $ALL_JOBS{$pid}) {
my $job = Forks::Super::Job::get($ALL_JOBS{$pid});
pause() while not defined $job->{status};
$? = $job->{status};
}
return $pid;
}
sub _active_one_suspended_job {
my @suspended = grep { $_->{state} eq 'SUSPENDED' } @Forks::Super::ALL_JOBS;
if (@suspended == 0) {
@suspended = grep { $_->{state} =~ /SUSPENDED/ } @Forks::Super::ALL_JOBS;
}
@suspended = sort {
$b->{queue_priority} <=> $a->{queue_priority} } @suspended;
if (@suspended == 0) {
warn "Forks::Super::_activate_one_suspended_job(): ",
" can't find an appropriate suspended job to resume\n";
return;
}
my $j1 = $suspended[0];
$j1->{queue_priority} -= 1E-4;
$j1->resume;
return;
}
# wait on a specific process
sub _waitpid_target {
my ($no_hang, $reap_bg_ok, $target, $timeout) = @_;
my $expire = Time::HiRes::time() + ($timeout || 9E9);
my $job = $ALL_JOBS{$target};
if (not defined $job) {
return -1;
}
if ($job->{state} eq 'COMPLETE') {
$job->_mark_reaped;
return _reap_return($job);
} elsif ($no_hang or
$job->{state} eq 'REAPED') {
return -1;
} else {
# block until job is complete.
while ($job->{state} ne 'COMPLETE' and $job->{state} ne 'REAPED') {
if (Time::HiRes::time() >= $expire) {
return TIMEOUT;
}
pause();
Forks::Super::Queue::check_queue() if $job->{state} =~ /DEFER|SUSPEND/;
}
$job->_mark_reaped;
return _reap_return($job);
}
}
sub _waitpid_name {
my ($no_hang, $reap_bg_ok, $target, $timeout) = @_;
my $expire = Time::HiRes::time() + ($timeout || 9E9);
my @jobs = Forks::Super::Job::getByName($target);
if (@jobs == 0) {
return -1;
}
my @jobs_to_wait_for = ();
foreach my $job (@jobs) {
if ($job->{state} eq 'COMPLETE') {
$job->_mark_reaped;
return _reap_return($job);
} elsif ($job->{state} ne 'REAPED' && $job->{state} ne 'DEFERRED') {
push @jobs_to_wait_for, $job;
}
}
if ($no_hang || @jobs_to_wait_for == 0) {
return -1;
}
# otherwise block until a job is complete
@jobs = grep {
$_->{state} eq 'COMPLETE' || $_->{state} eq 'REAPED'
} @jobs_to_wait_for;
while (@jobs == 0) {
if (Time::HiRes::time() >= $expire) {
return TIMEOUT;
}
pause();
Forks::Super::Queue::run_queue()
if grep {$_->{state} eq 'DEFERRED'} @jobs_to_wait_for;
@jobs = grep { $_->{state} eq 'COMPLETE' || $_->{state} eq 'REAPED'} @jobs_to_wait_for;
}
$jobs[0]->_mark_reaped;
return _reap_return($jobs[0]);
}
# wait on any process from a specific process group
sub _waitpid_pgrp {
my ($no_hang, $reap_bg_ok, $target, $timeout) = @_;
my $expire = Time::HiRes::time() + ($timeout || 9E9);
my ($pid, $nactive) = _reap($reap_bg_ok,$target);
unless ($no_hang) {
while (!isValidPid($pid,1) && $nactive > 0) {
if (Time::HiRes::time() >= $expire) {
return TIMEOUT;
}
pause();
($pid, $nactive) = _reap($reap_bg_ok,$target);
}
}
$? = $ALL_JOBS{$pid}->{status}
if defined $ALL_JOBS{$pid};
return $pid;
}
1;