Forks::Super::Job - object representing a background task


Forks-Super documentation Contained in the Forks-Super distribution.

Index


Code Index:

NAME

Top

Forks::Super::Job - object representing a background task

VERSION

Top

0.52

SYNOPSIS

Top

    use Forks::Super;

    $pid = Forks::Super::fork( \%options );  # see Forks::Super
    $job = Forks::Super::Job::get($pid);
    $job = Forks::Super::Job::getByName($name);

    print "Current job state is $job->{state}\n";
    print "Job was created at ", scalar localtime($job->{created}), "\n";

with overloading

See "OVERLOADING".

    use Forks::Super 'overload';
    $job = Forks::Super::fork( \%options );
    print "Process id of new job is $job\n";
    print "Current state is ", $job->state, "\n";
    waitpid $job, 0;
    print "Exit status was ", $job->status, "\n";

DESCRIPTION

Top

Calls to Forks::Super::fork() that successfully spawn a child process or create a deferred job (see "Deferred processes" in Forks::Super) will cause a Forks::Super::Job instance to be created to track the job's state. For many uses of fork(), it will not be necessary to query the state of a background job. But access to these objects is provided for users who want to exercise even greater control over their use of background processes.

Calls to Forks::Super::fork() that fail (return undef or small negative numbers) generally do not cause a new Forks::Super::Job instance to be created.

ATTRIBUTES

Top

Use the Forks::Super::Job::get or Forks::Super::Job::getByName methods to obtain a Forks::Super::Job object for examination. The Forks::Super::Job::get method takes a process ID or job ID as an input (a value that may have been returned from a previous call to Forks::Super::fork() and returns a reference to a Forks::Super::Job object, or undef if the process ID or job ID was not associated with any known Job object. The Forks::Super::Job::getByName looks up job objects by the name parameter that may have been passed in the Forks::Super::fork() call.

A Forks::Super::Job object has many attributes, some of which may be of interest to an end-user. Most of these should not be overwritten.

pid

Process ID or job ID. For deferred processes, this will be a unique large negative number (a job ID). For processes that were not deferred, this valud is the process ID of the child process that performed this job's task.

real_pid

The process ID of the child process that performed this job's task. For deferred processes, this value is undefined until the job is launched and the child process is spawned.

pgid

The process group ID of the child process. For deferred processes, this value is undefined until the child process is spawned. It is also undefined for systems that do not implement getpgrp ("getpgrp" in perlfunc).

created

The time (since the epoch) at which the instance was created.

start

The time at which a child process was created for the job. This value will be undefined until the child process is spawned.

end

The time at which the child process completed and the parent process received a SIGCHLD signal for the end of this process. This value will be undefined until the child process is complete.

reaped

The time at which a job was reaped via a call to Forks::Super::wait, Forks::Super::waitpid, or Forks::Super::waitall. Will be undefined until the job is reaped.

state

A string value indicating the current state of the job. Current allowable values are

DEFERRED

For jobs that are on the job queue and have not started yet.

ACTIVE

For jobs that have started in a child process and are, to the knowledge of the parent process, still running.

COMPLETE

For jobs that have completed and caused the parent process to receive a SIGCHLD signal, but have not been reaped.

The difference between a COMPLETE job and a REAPED job is whether the job's process identifier has been returned in a call to Forks::Super::wait or Forks::Super::waitpid (or implicitly returned in a call to Forks::Super::waitall). When the process gets reaped, the global variable $? (see "$CHILD_ERROR" in perlvar) will contain the exit status of the process, until the next time a process is reaped.

REAPED

For jobs that have been reaped by a call to Forks::Super::wait, Forks::Super::waitpid, or Forks::Super::waitall.

SUSPENDED

The job has started but it has been suspended (with a SIGSTOP or other appropriate mechanism for your operating system) and is not currently running. A suspended job will not consume CPU resources but my tie up memory resources.

SUSPENDED-DEFERRED

Job is in the job queue and has not started yet, and also the job has been suspended.

status

The exit status of a job. See CHILD_ERROR ("CHILD_ERROR" in perlvar) in perlvar. Will be undefined until the job is complete.

style

One of the strings natural, cmd, or sub, indicating whether the initial fork call returned from the child process or whether the child process was going to run a shell command or invoke a Perl subroutine and then exit.

cmd

The shell command to run that was supplied in the fork call.

sub
args

The name of or reference to CODE to run and the subroutine arguments that were supplied in the fork call.

_on_busy

The behavior of this job in the event that the system was too "busy" to enable the job to launch. Will have one of the string values block, fail, or queue.

queue_priority

If this job was deferred, the relative priority of this job.

can_launch

By default undefined, but could be a CODE reference supplied in the fork() call. If defined, it is the code that runs when a job is ready to start to determine whether the system is too busy or not.

depend_on

If defined, contains a list of process IDs and job IDs that must complete before this job will be allowed to start.

depend_start

If defined, contains a list of process IDs and job IDs that must start before this job will be allowed to start.

start_after

Indicates the earliest time (since the epoch) at which this job may start.

expiration

Indicates the latest time that this job may be allowed to run. Jobs that run past their expiration parameter will be killed.

os_priority

Value supplied to the fork call about desired operating system priority for the job.

cpu_affinity

Value supplied to the fork call about desired CPU's for this process to prefer.

child_stdin
child_stdout
child_stderr

If the job has been configured for interprocess communication, these attributes correspond to the handles for passing standard input to the child process, and reading standard output and standard error from the child process, respectively.

Note that the standard read/write operations on these filehandles can also be accomplished through the write_stdin, read_stdout, and read_stderr methods of this class. Since these methods can adjust their behavior based on the type of IPC channel (file, socket, or pipe) or other idiosyncracies of your operating system (#@$%^&*! Windows), using these methods is preferred to using the filehandles directly.

FUNCTIONS

Top

get

$job = Forks::Super::Job::get($pidOrName)

Looks up a Forks::Super::Job object by a process ID/job ID or name attribute and returns the job object. Returns undef for an unrecognized pid or job name.

count_active_processes

$n = Forks::Super::Job::count_active_processes()

Returns the current number of active background processes. This includes only

1. First generation processes. Not the children and grandchildren of child processes.
2. Processes spawned by the Forks::Super module, and not processes that may have been created outside the Forks::Super framework, say, by an explicit call to CORE::fork(), a call like system("./myTask.sh &"), or a form of Perl's open function that launches an external command.

METHODS

Top

A Forks::Super::Job object recognizes the following methods. In general, these methods should only be used from the foreground process (the process that spawned the background job).

waitpid

$job->wait( [$timeout] )
$job->waitpid( $flags [,$timeout] )

Convenience method to wait until or test whether the specified job has completed. See Forks::Super::waitpid.

The calls $job->wait and $job->wait() will block until a job has completed. But $job->wait(0) will call wait with a timeout of zero seconds, so it will be equivalent to a call of waitpid $job, &WNOHANG.

kill

$job->kill($signal)

Convenience method to send a signal to a background job. See Forks::Super::kill.

suspend

$job->suspend

When called on an active job, suspends the background process with SIGSTOP or other mechanism appropriate for the operating system.

resume

$job->resume

When called on a suspended job (see suspend|"$job->suspend", above), resumes the background process with SIGCONT or other mechanism appropriate for the operating system.

is_<state>

$job->is_complete

Indicates whether the job is in the COMPLETE or REAPED state.

$job->is_started

Indicates whether the job has started in a background process. While return a false value while the job is still in a deferred state.

$job->is_active

Indicates whether the specified job is currently running in a background process.

$job->is_suspended

Indicates whether the specified job has started but is currently in a suspended state.

toString

$job->toString()
$job->toShortString()

Outputs a string description of the important features of the job.

write_stdin

$job->write_stdin(@msg)

Writes the specified message to the child process's standard input stream, if the child process has been configured to receive input from interprocess communication. Writing to a closed handle or writing to a process that is not configured for IPC will result in a warning.

Using this method may be preferrable to calling print with the process's child_stdin attribute, as the write_stdin method will take into account the type of IPC channel (file, socket, or pipe) and may alter its behavior because of it. In a near future release, it is hoped that the simple print to the child stdin filehandle will do the right thing, using tied filehandles and other Perl magic.

read_stdout

read_stderr

$line = $job->read_stdout()
@lines = $job->read_stdout()
$line = $job->read_stderr()
@lines = $job->read_stderr()

In scalar context, attempts to read a single line, and in list context, attempts to read all available lines from a child process's standard output or standard error stream.

If there is no available input, and if the Forks::Super module detects that the background job has completed (such that no more input will be created), then the file handle will automatically be closed. In scalar context, these methods will return undef if there is no input currently available on an inactive process, and "" (empty string) if there is no input available on an active process.

Reading from a closed handle, or calling these methods on a process that has not been configured for IPC will result in a warning.

close_fh

$job->close_fh([@handle_id])

Closes IPC filehandles for the specified job. Optional input is one or more values from the set stdin, stdout, stderr, and all to specify which filehandles to close. If no parameters are provided, the default behavior is to close all configured file handles.

The close_fh method may perform certain cleanup operations that are specific to the type and settings of the specified handle, so using this method is preferred to:

    # not as good as:  $job->close_fh('stdin','stderr')
    close $job->{child_stdin};
    close $Forks::Super::CHILD_STDERR{$job};

On most systems, open filehandles are a scarce resource and it is a very good practice to close filehandles when the jobs that created them are finished running and you are finished processing input and output on those filehandles.

reuse

$pid = $job->reuse( \%new_opts )

Creates a new background process by calling Forks::Super::fork, using all of the existing settings of the current Forks::Super::Job object. Additional options may be provided which will override the original settings.

Use this method to launch multiple instances of identical or similar jobs.

    $job = fork { child_fh => "all",
              callback => { start => sub { print "I started!" },
                            finish => sub { print "I finished!" } },
              sub => sub {
                 do_something();
                 do_something_else();
                 ...   # do 100 other things.
              },
              args => [ @the_args ], timeout => 15
    };

    # Crikey, I'm not typing all that in again.
    $job2 = $job->reuse { args => [ @new_args ], timeout => 30 };

dispose

$job->dispose()
Forks::Super::Job::dispose( @jobs )

Called on one or more job objects to free up any resources used by a job object. You may call this method on any job where you have finished extracting all of the information that you need from the job. Or to put it another way, you should not call this method on a job if you still wish to access any information about the job. After this method is invoked on a job, any information such as run times, status, and unread input from interprocess communication handles will be lost.

This method will

* close any open filehandles
* attempt to remove temporary files used for interprocess communication ]with the job
* erase all information about the job
* remove the job object from the @ALL_JOBS and %ALL_JOBS variables.

VARIABLES

Top

@ALL_JOBS, %ALL_JOBS

Any job object created by this module will be added to the list @Forks::Super::Job::ALL_JOBS and to the lookup table %Forks::Super::Job::ALL_JOBS. Within %ALL_JOBS, a specific job object can be accessed by its job id (the numerical value returned from Forks::Super::fork()), its real process id (once the job has started), or its name attribute, if one was passed to the Forks::Super::fork() call. This may be helpful for iterating through all of the jobs your program has created.

    my ($longest_job, $longest_time) = (-1, -1);
    foreach $job (@Forks::Super::ALL_JOBS) {
        if ($job->is_complete) {
            $job_time = $job->{end} - $job->{start};
            if ($job_time > $longest_time) {
                ($longest_job, $longest_time) = ($job, $job_time);
            }
        }
    }
    print STDERR "The job that took the longest was $job: ${job_time}s\n";

Jobs that have been passed to the "dispose" method are removed from @ALL_JOBS and %ALL_JOBS.

OVERLOADING

Top

An available feature in the Forks::Super module is to make it more convenient to access the functionality of Forks::Super::Job. When this feature is enabled, the return value from a call to Forks::Super::fork() is an overloaded Forks::Super::Job object.

    $job_or_pid = fork { %options };

In a numerical context, this value looks and behaves like a process ID (or job ID). The value can be passed to functions like kill and waitpid that expect a process ID.

    if ($job_or_pid != $another_pid) { ... }
    kill 'TERM', $job_or_pid;

But you can also access the attributes and methods of the Forks::Super::Job object.

    $job_or_pid->{real_pid}
    $job_or_pid->suspend

Since v0.51, the <> iteration operator has been overloaded for the Forks::Super::Job package. It can be used to read one line of output from a background job's standard output, and to allow you to treat the background job object syntactically like a readable filehandle.

    my $job = fork { cmd => $command };
    while (<$job>) {
        print "Output from $job: $_\n";
    }

Since v0.41, this feature is enabled by default.

Whether the overloading is enabled by default or not, you can override the default behavior in three ways:

1. Set the environment variable FORKS_SUPER_JOB_OVERLOAD to a true or false value
    $ FORKS_SUPER_JOB_OVERLOAD=0 perl a_script_that_uses_Forks_Super.pl ...

2. Pass the parameter overload to the module import function with a 0 or 1 value
    use Forks::Super overload => 1;  # always enable overload feature

3. At runtime, call
    Forks::Super::Job::enable_overload();
    Forks::Super::Job::disable_overload();

In principle you may call these methods at any time and as often as you wish.

Even when overloading is enabled, Forks::Super::fork() still returns a simple scalar value of 0 to the child process (when a value is returned).

SEE ALSO

Top

Forks::Super.

AUTHOR

Top

Marty O'Brien, <mob@cpan.org>

LICENSE AND COPYRIGHT

Top


Forks-Super documentation Contained in the Forks-Super distribution.
#
# Forks::Super::Job - object representing a task to perform in
#                     a background process
# See the subpackages for some implementation details
#

package Forks::Super::Job;
use Forks::Super::Debug qw(debug);
use Forks::Super::Util qw(is_number qualify_sub_name IS_WIN32 is_pipe);
use Forks::Super::Config qw(:all);
use Forks::Super::Job::Ipc;   # does windows prefer to load Ipc before Timeout?
use Forks::Super::Job::Timeout;
use Forks::Super::Queue qw(queue_job);
use Forks::Super::Job::OS;
use Forks::Super::Job::Callback qw(run_callback);
use Signals::XSIG;
use Exporter;
use POSIX ':sys_wait_h';
use Carp;
use IO::Handle;
use strict;
use warnings;

our @ISA = qw(Exporter);
our @EXPORT = qw(@ALL_JOBS %ALL_JOBS);
our $VERSION = '0.52';

our (@ALL_JOBS, %ALL_JOBS, $WIN32_PROC, $WIN32_PROC_PID);
our $OVERLOAD_ENABLED = 0;
our $INSIDE_END_QUEUE = 0;
our $_RETRY_PAUSE;

my $use_overload = $ENV{FORKS_SUPER_JOB_OVERLOAD};
if (!defined($use_overload)) {
  $use_overload = 1;
}
if ($use_overload) {
  enable_overload();
} else {
  enable_overload();
  disable_overload();
}

#############################################################################
# Object methods (meant to be called as $job->xxx(@args))

sub new {
  my ($class, $opts) = @_;
  my $this = {};
  if (ref $opts eq 'HASH') {
    $this->{$_} = $opts->{$_} foreach keys %$opts;
  }

  $this->{__opts__} = $opts;

  $this->{created} = Time::HiRes::time();
  $this->{state} = 'NEW';
  $this->{ppid} = $$;
  if (!defined $this->{_is_bg}) {
    $this->{_is_bg} = 0;
  }
  if (!defined $this->{debug}) {
    $this->{debug} = $Forks::Super::Debug::DEBUG;
  }
  # 0.41: fix overload bug here by putting  bless  before  push @ALL_JOBS
  bless $this, 'Forks::Super::Job';
  push @ALL_JOBS, $this;
  if ($this->{debug}) {
    debug("New job created: ", $this->toString());
  }
  return $this;
}

sub reuse {
  my ($job, $opts) = @_;
  if (ref $opts ne 'HASH') {
    $opts = { @_[1..$#_] };
  }
  my %opts;
  if (defined $job->{__opts__}) {
    %opts = %{$job->{__opts__}};
  }
  for (keys %$opts) {
    $opts{$_} = $opts->{$_};
  }

  return Forks::Super::fork( \%opts ) ;
}

sub is_complete {
  my $job = shift;
  return defined($job->{state}) &&
    ($job->{state} eq 'COMPLETE' || $job->{state} eq 'REAPED');
}

sub is_started {
  my $job = shift;
  return $job->is_complete || $job->is_active || 
    (defined($job->{state}) && $job->{state} eq 'SUSPENDED');
}

sub is_active {
  my $job = shift;
  return defined($job->{state}) && $job->{state} eq 'ACTIVE';
}

sub is_suspended {
  my $job = shift;
  return defined($job->{state}) && $job->{state} =~ /SUSPENDED/;
}

sub is_deferred {
  my $job = shift;
  return defined($job->{state}) && $job->{state} =~ /DEFERRED/;
}

sub waitpid {
  my ($job, $flags, $timeout) = @_;
  return Forks::Super::Wait::waitpid($job->{pid}, $flags, $timeout || 0);
}

sub wait {
  my ($job, $timeout) = @_;
  if (defined($timeout) && $timeout == 0) { # ZZZ
    return Forks::Super::Wait::waitpid($job->{pid}, &WNOHANG);
  }
  return Forks::Super::Wait::waitpid($job->{pid}, 0, $timeout || 0);
}

sub kill {
  my ($job, $signal) = @_;
  if (!defined($signal) || $signal eq '') {
    $signal = Forks::Super::Util::signal_number('INT') || 1;
  }
  return Forks::Super::kill($signal, $job);
}

sub state {
  my $job = shift;
  return $job->{state};
}

sub status {
  my $job = shift;
  return $job->{status};  # may be undefined
}

#
# Produces string representation of a Forks::Super::Job object.
#
sub toString {
  my $job = shift;
  my @to_display = qw(pid state create);
  foreach my $attr (qw(real_pid style cmd exec sub args dir start end reaped
		       status closure pgid child_fh queue_priority
		       timeout expiration)) {
    push @to_display, $attr if defined $job->{$attr};
  }
  my @output = ();
  foreach my $attr (@to_display) {
    next unless defined $job->{$attr};
    if (ref $job->{$attr} eq 'ARRAY') {
      push @output, "$attr=[" . join(q{,},@{$job->{$attr}}) . ']';
    } else {
      push @output, "$attr=" . $job->{$attr};
    }
  }
  return '{' . join ( ';' , @output), '}';
}

sub toFullString {
  my $job = shift;
  my @output = ();
  foreach my $attr (sort keys %$job) {
    next unless defined $job->{$attr};
    if (ref $job->{$attr} eq 'ARRAY') {
      push @output, "$attr=[" . join(',', @{$job->{$attr}}) . ']';
    } elsif (ref $job->{$attr} eq 'HASH') {
      push @output, "$attr={", 
	join(',', map {"$_=>$job->{$attr}{$_}"
		     } sort keys %{$job->{$attr}}), '}';
    } else {
      push @output, "$attr=$job->{$attr}";
    }
  }
  return '{' . join(';', @output), '}';
}

sub toShortString {
  my $job = shift;
  if (defined $job->{short_string}) {
    return $job->{short_string};
  }
  my @to_display = ();
  foreach my $attr (qw(pid state cmd exec sub args closure real_pid)) {
    push @to_display, $attr if defined $job->{$attr};
  }
  my @output;
  foreach my $attr (@to_display) {
    if (ref $job->{$attr} eq 'ARRAY') {
      push @output, "$attr=[" . join(",", @{$job->{$attr}}) . "]";
    } else {
      push @output, "$attr=" . $job->{$attr};
    }
  }
  return $job->{short_string} = "{" . join(";",@output) . "}";
}

sub _mark_complete {
  my $job = shift;
  $job->{end} = Time::HiRes::time();
  $job->{state} = 'COMPLETE';

  $job->run_callback('collect');
  $job->run_callback('finish');
  return;
}

sub _mark_reaped {
  my $job = shift;
  $job->{state} = 'REAPED';
  $job->{reaped} = Time::HiRes::time();
  $? = $job->{status};
  debug("Job $job->{pid} reaped") if $job->{debug};
  return;
}

#
# determine whether a job is eligible to start
#
sub can_launch {
  no strict 'refs';

  my $job = shift;
  $job->{last_check} = Time::HiRes::time();
  if (defined $job->{can_launch}) {
    if (ref $job->{can_launch} eq 'CODE') {
      return $job->{can_launch}->($job);
    } elsif (ref $job->{can_launch} eq '') {
      my $can_launch_sub = $job->{can_launch};
      return $can_launch_sub->($job);
    }
  } else {
    return $job->_can_launch;
  }
}

sub _can_launch_delayed_start_check {
  my $job = shift;
  return 1 if !defined($job->{start_after}) ||
    Time::HiRes::time() >= $job->{start_after};

  debug('Forks::Super::Job::_can_launch(): ',
	'start delay requested. launch fail') if $job->{debug};

  # delay option should normally be associated with queue on busy behavior.
  # any reason not to make this the default ?
  #  delay + fail   is pretty dumb
  #  delay + block  is like sleep + fork

  $job->{_on_busy} = 'QUEUE' if not defined $job->{on_busy};
  return 0;
}

sub _can_launch_dependency_check {
  my $job = shift;
  my @dep_on = defined($job->{depend_on}) ? @{$job->{depend_on}} : ();
  my @dep_start = defined($job->{depend_start}) ? @{$job->{depend_start}} : ();

  foreach my $dj (@dep_on) {
    my $j = $ALL_JOBS{$dj};
    if (not defined $j) {
      carp "Forks::Super::Job: ",
	"dependency $dj for job $job->{pid} is invalid. Ignoring.\n";
      next;
    }
    unless ($j->is_complete) {
      debug('Forks::Super::Job::_can_launch(): ',
	"job waiting for job $j->{pid} to finish. launch fail.")
	if $j->{debug};
      return 0;
    }
  }

  foreach my $dj (@dep_start) {
    my $j = $ALL_JOBS{$dj};
    if (not defined $j) {
      carp "Forks::Super::Job ",
	"start dependency $dj for job $job->{pid} is invalid. Ignoring.\n";
      next;
    }
    unless ($j->is_started) {
      debug('Forks::Super::Job::_can_launch(): ',
	"job waiting for job $j->{pid} to start. launch fail.")
	if $j->{debug};
      return 0;
    }
  }
  return 1;
}

#
# default function for determining whether the system
# is too busy to create a new child process or not
#
sub _can_launch {
  no warnings qw(once);

  my $job = shift;
  my $max_proc = defined($job->{max_proc})
    ? $job->{max_proc} : $Forks::Super::MAX_PROC;
  my $max_load = defined($job->{max_load})
    ? $job->{max_load} : $Forks::Super::MAX_LOAD;
  my $force = defined($job->{max_load}) && $job->{force};

  if ($force) {
    debug('Forks::Super::Job::_can_launch(): force attr set. launch ok')
      if $job->{debug};
    return 1;
  }

  return 0 if not $job->_can_launch_delayed_start_check;
  return 0 if not $job->_can_launch_dependency_check;

  if ($max_proc > 0) {
    my $num_active = count_active_processes();
    if ($num_active >= $max_proc) {
      debug('Forks::Super::Job::_can_launch(): ',
	"active jobs $num_active exceeds limit $max_proc. ",
	    'launch fail.') if $job->{debug};
      return 0;
    }
  }

  if ($max_load > 0) {
    my $load = get_cpu_load();
    if ($load > $max_load) {
      debug('Forks::Super::Job::_can_launch(): ',
	"cpu load $load exceeds limit $max_load. launch fail.")
	if $job->{debug};
      return 0;
    }
  }

  debug('Forks::Super::Job::_can_launch(): system not busy. launch ok.')
    if $job->{debug};
  return 1;
}

# Perl system fork() call. Encapsulated here so it can be overridden 
# and mocked for testing. See t/17-retries.t
sub _CORE_fork { return CORE::fork }

#
# make a system fork call and configure the job object
# in the parent and the child processes
#
sub launch {
  my $job = shift;
  if ($job->is_started) {
    Carp::confess "Forks::Super::Job::launch() ",
	"called on a job in state $job->{state}!\n";
  }

  if ($$ != $Forks::Super::MAIN_PID && $Forks::Super::CHILD_FORK_OK > 0) {
    $Forks::Super::MAIN_PID = $$;
    $Forks::Super::CHILD_FORK_OK--;
  }

  if ($$ != $Forks::Super::MAIN_PID && $Forks::Super::CHILD_FORK_OK < 1) {
    return _launch_from_child($job);
  }
  $job->_preconfig_fh;
  $job->_preconfig2;





  my $retries = $job->{retries} || 0;



  my $pid = _CORE_fork();
  while (!defined($pid) && $retries-- > 0) {
    warn "Forks::Super::launch: ",
      "system fork call returned undef. Retrying ...\n";
    $_RETRY_PAUSE ||= 1.0;
    my $delay = 1.0 + $_RETRY_PAUSE * (($job->{retries} || 1) - $retries);
    Forks::Super::Util::pause($delay);
    $pid = _CORE_fork();
  }







  if (!defined $pid) {
    debug('Forks::Super::Job::launch(): CORE::fork() returned undefined!')
      if $job->{debug};
    return;
  }


  if (Forks::Super::Util::isValidPid($pid)) {

    # parent
    return _postlaunch_parent($pid, $job);

  } elsif ($pid == 0) {

    _postlaunch_child($job);
    return 0;

  } else {

    Carp::confess "Forks::Super::launch(): ",
	"Somehow we got invalid pid=$pid from fork call.";
    return;

  }
}

sub _postlaunch_parent {
  my ($pid, $job) = @_;
  $ALL_JOBS{$pid} = $job;
  if (defined($job->{state}) &&
      $job->{state} ne 'NEW' &&
      $job->{state} ne 'LAUNCHING' &&
      $job->{state} ne 'DEFERRED') {
    warn "Forks::Super::Job::launch(): ",
      "job $pid already has state: $job->{state}\n";
  } else {
    $job->{state} = 'ACTIVE';

    #
    # it is possible that this child exited quickly and has already
    # been reaped in the SIGCHLD handler. In that case, the signal
    # handler should have made an entry in %Forks::Super::Sigchld::BASTARD_DATA
    # for this process.
    #
    Forks::Super::Sigchld::handle_bastards($pid);
  }
  $job->{real_pid} = $pid;
  $job->{pid} = $pid unless defined $job->{pid};
  $job->{start} = Time::HiRes::time();

  $job->_config_parent;
  $job->run_callback('start');
  Forks::Super::Sigchld::handle_CHLD(-1);
  if ($$ != $Forks::Super::MAIN_PID) {
    $XSIG{CHLD}[-1] = \&Forks::Super::Sigchld::handle_CHLD;
  }

  return $OVERLOAD_ENABLED ? $job : $pid;
}

sub _postlaunch_child {
  my $job = shift;
  Forks::Super::init_child() if defined &Forks::Super::init_child;
  $job->_config_child;

  local $ENV{_FORK_PPID} = $$;
  local $ENV{_FORK_PID} = $$;

  if ($job->{style} eq 'cmd' || $job->{style} eq 'exec') {

      if (defined($job->{fh_config}->{stdin})
	  && defined($job->{fh_config}->{sockets})) {

	  $job->_postlaunch_child_to_proc;

      } elsif ($job->{style} eq 'cmd') {

	  $job->_postlaunch_child_to_cmd;

      } else {

	  debug("Exec'ing [ @{$job->{exec}} ]") if $job->{debug};
	  exec( @{$job->{exec}} );

      }

  } elsif ($job->{style} eq 'sub') {

      $job->_postlaunch_child_to_sub;

  }
  return 0;
}

sub _postlaunch_child_to_proc {
    my $job = shift;
    my $proch = Forks::Super::Job::Ipc::_gensym();
    $job->{cmd} ||= $job->{exec};
    my $p1 = open $proch, '|-', @{$job->{cmd}};
    print $proch $job->{fh_config}->{stdin};
    close $proch;
    my $c1 = $?;
    debug("Exit code of $$ was $c1 ", $c1>>8) if $job->{debug};
    deinit_child();
    exit $c1 >> 8;
}

sub _postlaunch_child_to_cmd {
    my $job = shift;
    debug("Executing [ @{$job->{cmd}} ]") if $job->{debug};

    my $c1;
    if (&IS_WIN32) {
      # There are lots of ways to spawn a process in Windows
      if (Forks::Super::Config::CONFIG('Win32::Process')) {
	$c1 = Forks::Super::Job::OS::Win32::open_win32_process($job);
      } else {
	$c1 = Forks::Super::Job::OS::Win32::open3_win32_process($job);
      }
    } else {

      $c1 = system( @{$job->{cmd}} );
    }
    debug("Exit code of $$ was $c1 ", $c1>>8) if $job->{debug};
    deinit_child();
    exit $c1 >> 8;
}

sub _postlaunch_child_to_sub {
    my $job = shift;
    my $sub = $job->{sub};
    my @args = @{$job->{args} || []};

    my $error;
    eval {
      no strict 'refs';
      $job->{_cleanup_code} = \&deinit_child;
      $sub->(@args);
      delete $job->{_cleanup_code};
      1;
    } or do {
      $error = $@;
    };
 
    if ($job->{debug}) {
      if ($error) {
	debug("JOB $$ SUBROUTINE CALL HAD AN ERROR: $error");
      }
      debug("Job $$ subroutine call has completed");
    }
    deinit_child();
    if ($error) {
	die $error,"\n";
    }
    exit 0;
}

sub _launch_from_child {
  my $job = shift;
  if ($Forks::Super::CHILD_FORK_OK == 0) {
    carp 'Forks::Super::fork() not allowed\n',
      "in child process $$ while \$Forks::Super::CHILD_FORK_OK ",
	"is not set!\n";

    return;
  } elsif ($Forks::Super::CHILD_FORK_OK == -1) {
    carp "Forks::Super::fork() call not allowed\n",
	"in child process $$ while \$Forks::Super::CHILD_FORK_OK <= 0.\n",
	  "Will create child of child with CORE::fork()\n";

    my $pid = _CORE_fork();
    if (defined($pid) && $pid == 0) {
      # child of child
      if (defined &Forks::Super::init_child) {
	Forks::Super::init_child();
      } else {
	init_child();
      }
      return $pid;
    }
    return $pid;
  }
  return;
}

sub suspend {
  my $j = shift;
  $j = Forks::Super::Job::get($j) if ref $j ne 'Forks::Super::Job';
  my $pid = $j->{real_pid};
  if ($j->{state} eq 'ACTIVE') {
    local $! = 0;
    my $kill_result = Forks::Super::kill('STOP', $j);
    if ($kill_result > 0) {
      $j->{state} = 'SUSPENDED';
      return 1;
    }
    carp "'STOP' signal not received by $pid, job ", $j->toString(), "\n";
    return;
  }
  if ($j->{state} eq 'DEFERRED') {
    $j->{state} = 'SUSPENDED-DEFERRED';
    return -1;
  }
  if ($j->is_complete) {
    carp "Forks::Super::Job::suspend(): called on completed job ", 
      $j->{pid}, "\n";
    return;
  }
  if ($j->{state} eq 'SUSPENDED') {
    carp "Forks::Super::Job: suspend called on suspended job ", $j->{pid};
    return;
  }
  carp "Forks::Super::Job: suspend called on job ", $j->toString(), "\n";
  return;
}

sub resume {
  my $j = shift;
  $j = Forks::Super::Job::get($j) if ref $j ne 'Forks::Super::Job';
  my $pid = $j->{real_pid};
  if ($j->{state} eq 'SUSPENDED') {
    local $! = 0;
    my $kill_result = Forks::Super::kill('CONT', $j);
    if ($kill_result > 0) {
      $j->{state} = 'ACTIVE';
      return 1;
    }
    carp "'CONT' signal not received by $pid, job ", $j->toString(), "\n";
    return;
  }
  if ($j->{state} eq 'SUSPENDED-DEFERRED') {
    $j->{state} = 'DEFERRED';
    return -1;
  }
  if ($j->is_complete) {
    carp "Forks::Super::Job::resume(): called on a completed job ", 
      $j->{pid}, "\n";
    return;
  }
  carp "Forks::Super::Job::resume(): called on job in state ", 
    $j->{state}, "\n";
  return;
}

#
# do further initialization of a Forks::Super::Job object,
# mainly setting derived fields
#
sub _preconfig {
  my $job = shift;

  $job->_preconfig_style;
  $job->_preconfig_busy_action;
  $job->_preconfig_start_time;
  $job->_preconfig_dependencies;
  Forks::Super::Job::Callback::_preconfig_callbacks($job);
  Forks::Super::Job::OS::_preconfig_os($job);
  return;
}

# some final initialization just before launch
sub _preconfig2 {
  my $job = shift;
  if (!defined $job->{debug}) {
    $job->{debug} = $Forks::Super::Debug::DEBUG;
  }
  return;
}

sub _preconfig_style {
  my $job = shift;

  ###################
  # set up style.
  #

  if (0 && defined $job->{run}) {   # not enabled
    $job->_preconfig_style_run;
  }

  if (defined $job->{cmd}) {
    if (ref $job->{cmd} ne 'ARRAY') {
      $job->{cmd} = [ $job->{cmd} ];
    }
    $job->{style} = 'cmd';
  } elsif (defined $job->{exec}) {
    if (ref $job->{exec} ne 'ARRAY') {
      $job->{exec} = [ $job->{exec} ];
    }
    $job->{style} = 'exec';
  } elsif (defined $job->{sub}) {
    $job->{style} = 'sub';
    $job->{sub} = qualify_sub_name $job->{sub};
    if (defined $job->{args}) {
      if (ref $job->{args} ne 'ARRAY') {
	$job->{args} = [ $job->{args} ];
      }
    } else {
      $job->{args} = [];
    }
  } else {
    $job->{style} = 'natural';
  }
  return;
}

sub _preconfig_style_run {    ### for future use
  my $job = shift;
  if (ref $job->{run} ne 'ARRAY') {
    $job->{run} = [ $job->{run} ];
  }

  return;

  # How will we use or emulate the rich functionality
  # of IPC::Run?
  #
  # inputs are a "harness specification"
  # build a harness
  # on "launch", call $harness->start
  # when the job is reaped, call $harness->finish

  # one feature of IPC::Run harnesses is that they
  # may be reused!

}

sub _preconfig_busy_action {
  my $job = shift;

  ######################
  # what will we do if the job cannot launch?
  #
  if (defined $job->{on_busy}) {
    $job->{_on_busy} = $job->{on_busy};
  } else {
    no warnings 'once';
    $job->{_on_busy} = $Forks::Super::ON_BUSY || 'block';
  }
  $job->{_on_busy} = uc $job->{_on_busy};

  ########################
  # make a queue priority available if needed
  #
  if (not defined $job->{queue_priority}) {
    $job->{queue_priority} = Forks::Super::Queue::get_default_priority();
  }
  return;
}

sub _preconfig_start_time {
  my $job = shift;

  ###########################
  # configure a future start time
  my $start_after = 0;
  if (defined $job->{delay}) {
    $start_after
      = Time::HiRes::time() 
	+ Forks::Super::Job::Timeout::_time_from_natural_language(
		$job->{delay}, 1);
  }
  if (defined $job->{start_after}) {
    my $start_after2 
      = Forks::Super::Job::Timeout::_time_from_natural_language(
		$job->{start_after}, 0);
    if ($start_after < $start_after2) {
      $start_after = $start_after2 
    }
  }
  if ($start_after) {
    $job->{start_after} = $start_after;
    delete $job->{delay};
    debug('Forks::Super::Job::_can_launch(): start delay requested.')
      if $job->{debug};
  }
  return;
}

sub _preconfig_dependencies {
  my $job = shift;

  ##########################
  # assert dependencies are expressed as array refs
  # expand job names to pids
  #
  if (defined $job->{depend_on}) {
    if (ref $job->{depend_on} ne 'ARRAY') {
      $job->{depend_on} = [ $job->{depend_on} ];
    }
    $job->{depend_on} = _resolve_names($job, $job->{depend_on});
  }
  if (defined $job->{depend_start}) {
    if (ref $job->{depend_start} ne 'ARRAY') {
      $job->{depend_start} = [ $job->{depend_start} ];
    }
    $job->{depend_start} = _resolve_names($job, $job->{depend_start});
  }
  return;
}

# convert job names in an array to job ids, if necessary
sub _resolve_names {
  my $job = shift;
  my @in = @{$_[0]};
  my @out = ();
  foreach my $id (@in) {
    if (ref $id eq 'Forks::Super::Job') {
      push @out, $id;
    } elsif (is_number($id) && defined($ALL_JOBS{$id})) {
      push @out, $id;
    } else {
      my @j = Forks::Super::Job::getByName($id);
      if (@j > 0) {
	foreach my $j (@j) {
	  next if \$j eq \$job; 
	  # $j eq $job was not sufficient when $job is overloaded
	  # and $job->{pid} has not been set.

	  push @out, $j->{pid};
	}
      } else {
	carp "Forks::Super: Job ",
	  "dependency identifier \"$id\" is invaild. Ignoring\n";
      }
    }
  }
  return [ @out ];
}

#
# set some additional attributes of a Forks::Super::Job after the
# child is successfully launched.
#
sub _config_parent {
  my $job = shift;
  $job->_config_fh_parent;
  $job->_config_timeout_parent;
  if ($Forks::Super::SysInfo::CONFIG{'getpgrp'}) {
    # when  timeout =>   or   expiration =>  is used,
    # PGID of child will be set to child PID
    if (defined($job->{timeout}) || defined($job->{expiration})) {
      $job->{pgid} = $job->{real_pid};
    } else {
      if (not eval { $job->{pgid} = getpgrp($job->{real_pid}) }) {
	$Forks::Super::SysInfo::CONFIG{'getpgrp'} = 0;
	$job->{pgid} = $job->{real_pid};
      }
    }
  }
  return;
}

sub _config_child {
  my $job = shift;
  $Forks::Super::Job::self = $job;
  $job->_config_callback_child;
  $job->_config_debug_child;
  $job->_config_timeout_child;
  $job->_config_os_child;
  $job->_config_fh_child;
  $job->_config_dir;
  return;
}

sub _config_debug_child {
  my $job = shift;
  if ($job->{debug} && $job->{undebug}) {
    if ($Forks::Super::Config::IS_TEST) {
      debug("Disabling debugging in child $$");
    }
    $Forks::Super::Debug::DEBUG = 0;
    $job->{debug} = 0;
  }
  return;
}

sub _config_dir {
  my $job = shift;
  $job->{dir} ||= $job->{chdir};
  if (defined $job->{dir}) {
    if (!chdir $job->{dir}) {
      croak "Forks::Super::Job::launch(): ",
	"Invalid \"dir\" option: \"$job->{dir}\" $!\n";
    }
  }
  return;
}

END {
  no warnings 'internal';
  $INSIDE_END_QUEUE = 1;
  if ($$ == ($Forks::Super::MAIN_PID ||= $$)) {

    # disable SIGCHLD handler during cleanup. Hopefully this will fix
    # intermittent test failures where all subtests pass but the
    # test exits with non-zero exit status (e.g., t/42d-filehandles.t)

    untie %SIG;
    if ($] >= 5.007003) {
      delete $SIG{CHLD};
    } else {
      $SIG{CHLD} = 'IGNORE';
    }

    Forks::Super::Queue::_cleanup();
    Forks::Super::Job::Ipc::_cleanup();
  } else {
    if (defined($Forks::Super::Job::self)
       && defined($Forks::Super::Job::self->{_cleanup_code})) {
      no strict 'refs';
      $Forks::Super::Job::self->{_cleanup_code}->();
    }
    Forks::Super::Job::Timeout::_cleanup_child();
  }
}

#############################################################################
# Package methods (meant to be called as Forks::Super::Job::xxx(@args))

sub enable_overload {
  if (!$OVERLOAD_ENABLED) {
    $OVERLOAD_ENABLED = 1;

    if (!eval {
      use overload
	'""' => sub { $_[0]->{pid} },
	'+' => sub { $_[0]->{pid} + $_[1] },
        '*' => sub { $_[0]->{pid} * $_[1] },
        '&' => sub { $_[0]->{pid} & $_[1] },
        '|' => sub { $_[0]->{pid} | $_[1] },
        '^' => sub { $_[0]->{pid} ^ $_[1] },
        '~' => sub { ~$_[0]->{pid} },         # since 0.37
        '<=>' => sub { $_[2] ? $_[1] <=> $_[0]->{pid} 
			     : $_[0]->{pid} <=> $_[1] },
        'cmp' => sub { $_[2] ? $_[1] cmp $_[0]->{pid} 
			     : $_[0]->{pid} cmp $_[1] },
        '-'   => sub { $_[2] ? $_[1]  -  $_[0]->{pid} 
			     : $_[0]->{pid}  -  $_[1] },
        '/'   => sub { $_[2] ? $_[1]  /  $_[0]->{pid} 
			     : $_[0]->{pid}  /  $_[1] },
        '%'   => sub { $_[2] ? $_[1]  %  $_[0]->{pid} 
			     : $_[0]->{pid}  %  $_[1] },
        '**'  => sub { $_[2] ? $_[1]  ** $_[0]->{pid} 
			     : $_[0]->{pid}  ** $_[1] },
        '<<'  => sub { $_[2] ? $_[1]  << $_[0]->{pid} 
			     : $_[0]->{pid}  << $_[1] },
        '>>'  => sub { $_[2] ? $_[1]  >> $_[0]->{pid} 
			     : $_[0]->{pid}  >> $_[1] },
        'x'   => sub { $_[2] ? $_[1]  x  $_[0]->{pid} 
			     : $_[0]->{pid}  x  $_[1] },
        'cos'  => sub { cos $_[0]->{pid} },
        'sin'  => sub { sin $_[0]->{pid} },
        'exp'  => sub { exp $_[0]->{pid} },
        'log'  => sub { log $_[0]->{pid} },
        'sqrt' => sub { sqrt $_[0]->{pid} },
        'int'  => sub { int $_[0]->{pid} },
        'abs'  => sub { abs $_[0]->{pid} },
        'atan2' => sub { $_[2] ? atan2($_[1],$_[0]->{pid}) 
			       : atan2($_[0]->{pid},$_[1]) };

      # XXX - why doesn't it work when I include
      #       '<>' => sub { ... }
      #    in the  use overload  block?
      no strict 'refs';
      *{'Forks::Super::Job::(<>'} = sub {
	return $_[0]->read_stdout();
      };
      1 }            # end eval { use overload ... }
	) {
      carp "Error enabling overloading on Forks::Super::Job objects: $@\n";
    } elsif ($Forks::Super::Debug::DEBUG) {
        debug("Enabled overloading on Forks::Super::Job objects");
    }
  }
  return;
}

sub disable_overload {
  if ($OVERLOAD_ENABLED) {
    $OVERLOAD_ENABLED = 0;
    eval { no overload values %overload::ops; 1 }
        or Forks::Super::Debug::carp_once "Forks::Super::Job ",
    		"disable overload failed: $@";
  }
  return;
}

# returns a Forks::Super::Job object with the given identifier
sub get {
  my $id = shift;
  if (!defined $id) {
    Carp::cluck "undef value passed to Forks::Super::Job::get()";
  }
  if (ref $id eq 'Forks::Super::Job') {
    return $id;
  }
  if (defined $ALL_JOBS{$id}) {
    return $ALL_JOBS{$id};
  }
  return getByPid($id) || getByName($id);
}

sub getByPid {
  my $id = shift;
  if (is_number($id)) {
    my @j = grep { (defined($_->{pid}) && $_->{pid} == $id) ||
		   (defined($_->{real_pid}) && $_->{real_pid} == $id)
		 } @ALL_JOBS;
    return $j[0] if @j > 0;
  }
  return;
}

sub getByName {
  my $id = shift;
  my @j = grep { defined($_->{name}) && $_->{name} eq $id } @ALL_JOBS;
  if (@j > 0) {
    return wantarray ? @j : $j[0];
  }
  return;
}

# retrieve a job object for a pid or job name, if necessary
sub _resolve {
  if (ref $_[0] ne 'Forks::Super::Job') {
    my $job = get($_[0]);
    if (defined $job) {
      return $_[0] = $job;
    }
    return $job;
  }
  return $_[0];
}

#
# count the number of active processes
#
sub count_active_processes {
  my $optional_pgid = shift;
  if (defined $optional_pgid) {
    return scalar grep {
      $_->{state} eq 'ACTIVE'
	and $_->{pgid} == $optional_pgid } @ALL_JOBS;
  }
  return scalar grep { defined($_->{state})
			 && $_->{state} eq 'ACTIVE' } @ALL_JOBS;
}

sub count_alive_processes {
  my ($count_bg, $optional_pgid) = @_;
  my @alive = grep { $_->{state} eq 'ACTIVE' ||
		     $_->{state} eq 'COMPLETE' ||
		     $_->{state} eq 'DEFERRED' ||
		     $_->{state} eq 'LAUNCHING' || # rare
		     $_->{state} eq 'SUSPENDED' ||
		     $_->{state} eq 'SUSPENDED-DEFERRED' 
		   } @ALL_JOBS;
  if (!$count_bg) {
    @alive = grep { $_->{_is_bg} == 0 } @alive;
  }
  if (defined $optional_pgid) {
    @alive = grep { $_->{pgid} == $optional_pgid } @alive;
  }
  return scalar @alive;
}

#
# _reap should distinguish:
#
#    all alive jobs (ACTIVE+COMPLETE+SUSPENDED+DEFERRED+SUSPENDED-DEFERRED)
#    all active jobs (ACTIVE + COMPLETE + DEFERRED)
#    filtered alive jobs (by optional pgid)
#    filtered ACTIVE + COMPLETE + DEFERRED jobs
#
#    if  all_active==0  and  all_alive>0,  
#    then see Wait::WAIT_ACTION_ON_SUSPENDED_JOBS
#
sub count_processes {
  my ($count_bg, $optional_pgid) = @_;
  my @alive = grep { $_->{state} ne 'REAPED' && $_->{state} ne 'NEW' 
		   } @ALL_JOBS;
  if (!$count_bg) {
    @alive = grep { $_->{_is_bg} == 0 } @alive;
  }
  my @active = grep { $_->{state} !~ /SUSPENDED/ } @alive;
  my @filtered_active = @active;
  if (defined $optional_pgid) {
    @filtered_active = grep { $_->{pgid} == $optional_pgid } @filtered_active;
  }

  my @n = (scalar(@filtered_active), scalar(@alive), scalar(@active));

  if ($Forks::Super::Debug::DEBUG) {
    debug("count_processes(): @n");
    debug("count_processes(): Filtered active: ",
	  $filtered_active[0]->toString()) if $n[0];
    debug("count_processes(): Alive: ", $alive[0]->toShortString()) if $n[1];
    debug("count_processes(): Active: @active") if $n[2];
  }

  return @n;
}

sub init_child {
  Forks::Super::Job::Ipc::init_child();
  return;
}

sub deinit_child {
  Forks::Super::Job::Ipc::deinit_child();
  return;
}

#
# get the current CPU load. May not be possible
# to do on all operating systems.
#
sub get_cpu_load {
  return Forks::Super::Job::OS::get_cpu_load();
}

sub dispose {
  foreach my $job (@_) {

    my $pid = $job->{pid};
    my $real_pid = $job->{real_pid} || $pid;

    $job->close_fh('all');
    delete $Forks::Super::CHILD_STDIN{$pid};
    delete $Forks::Super::CHILD_STDIN{$real_pid};
    delete $Forks::Super::CHILD_STDOUT{$pid};
    delete $Forks::Super::CHILD_STDOUT{$real_pid};
    delete $Forks::Super::CHILD_STDERR{$pid};
    delete $Forks::Super::CHILD_STDERR{$real_pid};

    foreach my $attr ('f_in','f_out','f_err') {
      my $file = $job->{fh_config} && $job->{fh_config}->{$attr};
      if (defined($file) && -f $file) {
	$! = 0;
	if (unlink $file) {
	  delete $Forks::Super::Job::Ipc::IPC_FILES{$file};
	} elsif ($INSIDE_END_QUEUE) {
	  warn "unlink failed for \"$file\": $! $^E\n";
	  warn "@{$Forks::Super::Job::Ipc::IPC_FILES{$file}}\n";
	}
      }
    }

    # XXX - disposed jobs should go to %ARCHIVED_JOBS, @ARCHIVED_JOBS
    my @k = grep { $ALL_JOBS{$_} eq $job } keys %ALL_JOBS;
    delete $ALL_JOBS{$_} for @k;

    delete $job->{$_} for keys %$job;
    $job->{disposed} ||= time;
  }
  @ALL_JOBS = grep { !$_->{disposed} } @ALL_JOBS;
  return;
}

#
# Print information about all known jobs.
#
sub printAll {
  print "ALL JOBS\n";
  print "--------\n";
  foreach my $job
    (sort {$a->{pid} <=> $b->{pid} ||
	     $a->{created} <=> $b->{created}} @ALL_JOBS) {

      print $job->toString(), "\n";
      print "----------------------------\n";
    }
  return;
}

sub get_win32_proc { return $WIN32_PROC; }
sub get_win32_proc_pid { return $WIN32_PROC_PID; }

1;

__END__