/usr/local/CPAN/Parallel-Depend/Parallel/Depend/Queue.pm


########################################################################
# housekeeping
########################################################################

package Parallel::Depend::Queue;

use v5.10;
use strict;
use vars qw( $job_id_sep $nspace_sep );

use File::Basename;

use Parallel::Depend::Util  qw( log_fatal log_message );
use Scalar::Util            qw( blessed );
use Symbol                  qw( qualify_to_ref );

########################################################################
# package variables
########################################################################

our $VERSION    = v1.2.0;

my $empty       = {};

*job_id_sep     = \qq{$;};
*nspace_sep     = \q{.};

my @no_inherit_attrz
= qw
(
    ad_hoc
    alias
);

########################################################################
# public interface
########################################################################

sub new
{
    my $proto   = shift;

    my $que     = $proto->construct;

    $que->init( @_ );

    $que
}

sub construct
{
    my $proto   = shift;

    bless {}, blessed $proto || $proto
}

sub init
{
    # populate the queue structure, load the
    # initial arguments as attributes.
    
    my $que     = shift;

    my $argz    = ref $_[0] ? shift : +{ @_ };

    my $base
    = delete $argz->{ base } || basename $0, '.pl', '.pm', '.t';

    # Notice the lack of 'alias' and 'attrib' in the initial_queue:
    # these are localized during dispatch from the contents of 
    #
    #   local $que->{ attrib } = $que->attrib;
    #   local $que->{ attrib } = $que->attrib( $job_id );

    %$que =
    (
        # defined by the schedule.
        # { prior }{ $job } => jobs that run prior to $job.
        # { after }{ $job } => jobs that run after $job.
        # runnable uses prior; complete uses after.

        before  => {},  # jobs blocked from running.
        after   => {},  # jobs with other jobs following.

        # assembled in prepare to track clean jobs on
        # restart or failed ones during execution, 
        # paths for run, log, err files.

        skip    => {},  # job_id => $reason, 0 => clean.

        # stderr, stdout, runfile paths for each job.
        #
        # prefix used to generate the basenames of 
        # all files.

        prefix  => $base,

        files   => {},  # $job_id => [ run, out, err ]
    );
    
    # global namespace is '', which gets the 
    # initial set of attributes and an empty
    # set of aliases.

    $que->{ _attrib }{ '' } = $argz;
    $que->{ _alias  }{ '' } = {};

    $que
}

########################################################################
# default is a shallow copy of the current 
# context's data.  this allows prepare and 
# ad_hoc calls to use local $que->{ foo } = 
# $que->foo( $new_context ) when preparing or 
# executing the schedule.

for
(
    [ alias   => []                 ],
    [ attrib  => \@no_inherit_attrz ],
)
{
    my ( $type, $purge ) = @$_;

    my $key     = '_' . $type;

    my $get     = qualify_to_ref $type;

    *$get
    = sub
    {
        my ( $que, $nspace ) = @_;

        $nspace //= $que->{ namespace } || '';

        my $a   = $que->{ $key };

        $a->{ $nspace }
        ||= do
        {
            my %b   = %{ $que->{ $type } };

            delete @b{ @$purge };

            \%b
        }
    };
}

sub purge_group
{
    my ( $que, $group ) = @_;

    my $job_id  = $que->job_id( $group );

    my $grp_id  = $que->namespace( $group );

    for( @{ $que }{ qw( _attrib _alias ) } )
    {
        delete @{ $_ }{ ( $grp_id, $job_id ) };
    }

    return
}

########################################################################
# these keep $job_id_sep and $nspace_sep private.

sub namespace
{
    my $que = shift;

    exists $que->{ namespace }
    or log_fatal 'Bogus namespace: queue missing namespace';

    defined ( my $nspace  = $que->{ namespace } )
    or log_fatal "Damaged queue: undefined namespace", $que;

    if( $nspace && @_ )
    {
        join $nspace_sep, $que->{ namespace }, $_[0]
    }
    elsif( @_ )
    {
        $_[0]
    }
    else
    {
        $nspace
    }
}

sub job_id
{
    my $que = shift;

    @_ 
    ? join $job_id_sep, $que->{ namespace }, $_[0]
    : $que->{ namespace } . $job_id_sep
}

sub job_id_split
{
    my ( $que, $job_id ) = @_;

    $job_id //= $que->{ job_id }
    or log_fatal 'Bogus job_id_split: missing job_id';

    split /$job_id_sep/o, $job_id, 2
}

sub job_name
{
    my $que = shift;

    my $job = shift // $que->{ job }
    or log_fatal 'Bogus job_name: missing job';

    my $i = index $job, $job_id_sep;

    $i < 0
    ? $job
    : substr $job, ++$i
}

sub resolve_alias
{
    my $que     = shift;
    my $job     = $que->job_name( @_ );

    my $alias
    = $que->{ alias }{ $job }
    // $que->attrib->{ alias }
    // $job
    ;

    wantarray
    ? ( $alias => $job )
    : $alias
}


########################################################################
# mainly used to merge attributes from jobs with the currently
# context's attributes. 

sub merge_attrib
{
    my $que         = shift;

    my ( $alias, $name ) = $que->resolve_alias( @_ );
     
    my $nspace      = $que->namespace;

    my $attrz       = $que->{ _attrib };

    my $global      = $que->attrib;
    my $alias_attrz = $attrz->{ $nspace, $alias } || $empty;
    my $job_attrz   = $attrz->{ $nspace, $name  } || $empty;

    wantarray
    ?  ( %$global, %$alias_attrz, %$job_attrz )
    : +{ %$global, %$alias_attrz, %$job_attrz }
}

sub set_job_attrib
{
    my $que     = shift;

    my $job     = shift 
    or log_fatal "Bogus set_job_attrib: missing job argument";

    my $nspace  = $que->{ namespace };

    my $attrz   = $que->{ _attrib }{ $nspace, $job } ||= {};

    my $argz    = ref $_[0] ? shift : +{ @_ };

    while( my ( $name, $value ) = each %$argz )
    {
        defined $value
        ? $attrz->{ $name } = $value
        : delete $attrz->{ $name }
    }

    return
}

sub get_job_attrib
{
    my $que     = shift;

    my $job     = shift 
    or log_fatal "Bogus set_job_attrib: missing job argument";

    my $nspace  = $que->{ namespace };

    my $attrz   = $que->{ _attrib }{ $nspace, $job }
    or return;

    wantarray
    ? @{ $attrz }{ @_ }
    : $attrz->{ $_[0] }
}

# keep require happy

1

__END__