Event::ExecFlow::Job - Abstract base class for all job classes


Event-ExecFlow documentation Contained in the Event-ExecFlow distribution.

Index


Code Index:

NAME

Top

Event::ExecFlow::Job - Abstract base class for all job classes

SYNOPSIS

Top

  Event::ExecFlow::Job->new (
    title                => Descriptive title,
    name                 => Internal short name,
    depends_on           => Names of jobs, this job depends on,
    progress_max         => Maximum expected progress value,
    progress_ips         => String to show as "items per second",
    no_progress          => Job has no progress state at all,
    pre_callbacks        => Callbacks executed before job starts,
    post_callbacks       => Callbacks executed after job finished,
    error_callbacks      => Callbacks executed if job had errors,
    warning_callbacks    => Callbacks executed if job had warnings,
    stash                => A custom data hash stored with the job,
  );

DESCRIPTION

Top

This is an abstract base class and usually not used directly from the application. For daily programming the attributes defined in this class are most important, since they are common to all Jobs of the Event::ExecFlow framework.

OBJECT HIERARCHY

Top

  Event::ExecFlow

  Event::ExecFlow::Job
  +--- Event::ExecFlow::Job::Group
  +--- Event::ExecFlow::Job::Command
  +--- Event::ExecFlow::Job::Code

  Event::ExecFlow::Frontend
  Event::ExecFlow::Callbacks
  Event::ExecFlow::Scheduler
  +--- Event::ExecFlow::Scheduler::SimpleMax

ATTRIBUTES

Top

Attributes may be set with the new() constructor passed as a hash and accessed at runtime using the common get_ATTR(), set_ATTR() style accessors.

[ FIXME: describe all attributes in detail ]

METHODS

Top

[ FIXME: describe all methods in detail ]

AUTHORS

Top

 Jörn Reder <joern at zyn dot de>

COPYRIGHT AND LICENSE

Top


Event-ExecFlow documentation Contained in the Event-ExecFlow distribution.

package Event::ExecFlow::Job;

use strict;
use Carp;

use Locale::TextDomain $Event::ExecFlow::locale_textdomain;

sub get_id                      { shift->{id}                           }
sub get_title                   { shift->{title}                        }
sub get_name                    { shift->{name}                         }
sub get_depends_on              { shift->{depends_on}                   }
sub get_state                   { shift->{state}                        }
sub get_cancelled               { shift->{cancelled}                    }
sub get_error_message           { shift->{error_message}                }
sub get_warning_message         { shift->{warning_message}              }
sub get_progress_max            { shift->{progress_max}                 }
sub get_progress_cnt            { shift->{progress_cnt}                 }
sub get_progress_start_time     { shift->{progress_start_time}          }
sub get_progress_end_time       { shift->{progress_end_time}            }
sub get_progress_ips            { shift->{progress_ips}                 }
sub get_no_progress             { shift->{no_progress}                  }
sub get_last_progress           { shift->{last_progress}                }
sub get_last_percent_logged     { shift->{last_percent_logged}          }
sub get_pre_callbacks           { shift->{pre_callbacks}                }
sub get_post_callbacks          { shift->{post_callbacks}               }
sub get_error_callbacks         { shift->{error_callbacks}              }
sub get_warning_callbacks       { shift->{warning_callbacks}            }
sub get_frontend                { shift->{frontend}                     }
sub get_group                   { shift->{group}                        }
sub get_diskspace_consumed      { shift->{diskspace_consumed}           }
sub get_diskspace_freed         { shift->{diskspace_freed}              }
sub get_stash                   { shift->{stash}                        }
sub get_paused                  { shift->{paused}                       }
sub get_paused_seconds          { shift->{paused_seconds}               }
sub get_paused_start_time       { shift->{paused_start_time}            }
sub get_skipped                 { shift->{skipped}                      }

sub set_title                   { shift->{title}                = $_[1] }
sub set_name                    { shift->{name}                 = $_[1] }
sub set_state                   { shift->{state}                = $_[1] }
sub set_error_message           { shift->{error_message}        = $_[1] }
sub set_warning_message         { shift->{warning_message}      = $_[1] }
sub set_progress_max            { shift->{progress_max}         = $_[1] }
sub set_progress_cnt            { shift->{progress_cnt}         = $_[1] }
sub set_progress_start_time     { shift->{progress_start_time}  = $_[1] }
sub set_progress_end_time       { shift->{progress_end_time}    = $_[1] }
sub set_progress_ips            { shift->{progress_ips}         = $_[1] }
sub set_no_progress             { shift->{no_progress}          = $_[1] }
sub set_last_progress           { shift->{last_progress}        = $_[1] }
sub set_last_percent_logged     { shift->{last_percent_logged}  = $_[1] }
sub set_pre_callbacks           { shift->{pre_callbacks}        = $_[1] }
sub set_post_callbacks          { shift->{post_callbacks}       = $_[1] }
sub set_error_callbacks         { shift->{error_callbacks}      = $_[1] }
sub set_warning_callbacks       { shift->{warning_callbacks}    = $_[1] }
sub set_frontend                { shift->{frontend}             = $_[1] }
sub set_group                   { shift->{group}                = $_[1] }
sub set_diskspace_consumed      { shift->{diskspace_consumed}   = $_[1] }
sub set_diskspace_freed         { shift->{diskspace_freed}      = $_[1] }
sub set_stash                   { shift->{stash}                = $_[1] }
sub set_paused                  { shift->{paused}               = $_[1] }
sub set_paused_seconds          { shift->{paused_seconds}       = $_[1] }
sub set_paused_start_time       { shift->{paused_start_time}    = $_[1] }
sub set_skipped                 { shift->{skipped}              = $_[1] }

sub set_depends_on {
    my $self = shift;
    my ($jobs_lref) = @_;
    
    my @job_names = map { ref $_ ? $_->get_name : $_ } @{$jobs_lref};
    $self->{depends_on} = \@job_names;
    
    return \@job_names;
}

sub set_cancelled {
    my $self = shift;
    my ($cancelled) = @_;
    $self->{cancelled} = $cancelled;
    $self->set_state($cancelled ? "cancelled":"waiting");
    return $cancelled;
}

sub finished_ok {
    my $self = shift;
    return !$self->get_cancelled &&
           !$self->get_error_message;
}

my $JOB_ID = (time - 1140691085) * 1_000_000;

sub new {
    my $class = shift;
    my %par = @_;
    my  ($title, $name, $depends_on, $pre_callbacks) =
    @par{'title','name','depends_on','pre_callbacks'};
    my  ($post_callbacks, $error_callbacks, $warning_callbacks) =
    @par{'post_callbacks','error_callbacks','warning_callbacks'};
    my  ($progress_cnt, $progress_max, $progress_ips, $no_progress) =
    @par{'progress_cnt','progress_max','progress_ips','no_progress'};
    my  ($diskspace_consumed, $diskspace_freed, $stash, $frontend) =
    @par{'diskspace_consumed','diskspace_freed','stash','frontend'};

    my $id = ++$JOB_ID;

    $depends_on ||= [];
    $stash      ||= {};
    $name       ||= '~'.$id;

    croak "Job '$name' depends on itself"
        if grep { $_ eq $name } @{$depends_on};

    for my $cb ( $pre_callbacks,   $post_callbacks,
                 $error_callbacks, $warning_callbacks ) {
        $cb ||= Event::ExecFlow::Callbacks->new;
        $cb   = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
    }

    my $self = bless {
        id                      => $id,
        title                   => $title,
        name                    => $name,
        depends_on              => $depends_on,
        state                   => 'waiting',
        diskspace_consumed      => $diskspace_consumed,
        diskspace_freed         => $diskspace_freed,
        progress_cnt            => $progress_cnt,
        progress_max            => $progress_max,
        progress_ips            => $progress_ips,
        no_progress             => $no_progress,
        pre_callbacks           => $pre_callbacks,
        post_callbacks          => $post_callbacks,
        error_callbacks         => $error_callbacks,
        warning_callbacks       => $warning_callbacks,
        stash                   => $stash,
        frontend                => $frontend,
        paused_seconds          => 0,
        last_percent_logged     => 0,
        group                   => undef,
    }, $class;
    
    $self->set_depends_on($depends_on);
    
    return $self;
}

sub init {
    my $self = shift;
    
    return if $self->get_state ne 'waiting' &&
              $self->get_state ne 'running';
    
    $self->set_state("waiting");
    $self->set_progress_start_time(time);
    $self->set_progress_end_time();
    $self->set_cancelled();
    $self->set_error_message();
    $self->set_last_percent_logged(0);
    $self->set_last_progress();
    $self->set_progress_cnt(0);

    1;
}

sub start {
    my $self = shift;
    
    $Event::ExecFlow::DEBUG && print "Job->start(".$self->get_info.")\n";

    if ( !$self->get_frontend ) {
        require Event::ExecFlow::Frontend;
        $self->set_frontend(Event::ExecFlow::Frontend->new);
    }
    
    $self->init;
    $self->set_state("running");

    $self->get_frontend->report_job_start($self);
    
    $self->get_pre_callbacks->execute($self);
    
    if ( $self->get_error_message ) {
        $self->execution_finished;
        return 0;
    }
    
    if ( $self->get_warning_message ) {
        $self->get_warning_callbacks->execute($self);
        $self->get_frontend->report_job_warning($self);
    }

    if ( $self->get_skipped ) { # may be set by pre_callbacks
        $self->execution_finished;
        return 0;
    }

    $self->execute;
    
    1;
}

sub reset {
    my $self = shift;
    
    return if $self->get_state eq 'running' or
              $self->get_state eq 'waiting';
    
    $self->set_state("waiting");
    $self->set_progress_start_time();
    $self->set_progress_end_time();
    $self->set_cancelled();
    $self->set_error_message();
    $self->set_last_percent_logged(0);
    $self->set_last_progress();
    $self->set_progress_cnt(0);
    
    $self->get_frontend->report_job_progress($self);

    1;
}

sub cancel {
    die "Missing implementation for method cancel() of object ".shift;
}

sub execute {
    die "Missing implementation for method execute() of object ".shift;
}

sub pause {
    my $self = shift;
    
    $self->set_paused(!$self->get_paused);
    $self->pause_job;

    if ( $self->get_paused ) {
        $self->set_paused_start_time(time);
    }
    else {
        my $start_time = $self->get_paused_start_time;
        my $duration   = time - $start_time;
        $self->set_paused_seconds($duration + $self->get_paused_seconds);
        $self->set_paused_start_time();
    }

    1;    
}

sub execution_finished {
    my $self = shift;

    $Event::ExecFlow::DEBUG && print "Job->execution_finished(".$self->get_info.")\n";

    $self->set_progress_end_time(time);
    $self->get_frontend->report_job_progress($self);

    if ( !$self->get_cancelled ) {
        if ( $self->get_error_message ) {
            $self->set_state("error");
        }
        else {
            $self->set_state("finished");
        }
    }

    $self->get_post_callbacks->execute($self);

    $self->set_state("error") if $self->get_error_message;

    $self->get_frontend->report_job_finished($self);

    if ( !$self->get_cancelled ) {
        if ( $self->get_error_message ) {
            $self->get_error_callbacks->execute($self);
            $self->get_frontend->report_job_error($self);
        }        

        if ( $self->get_warning_message ) {
            $self->get_warning_callbacks->execute($self);
            $self->get_frontend->report_job_warning($self);
        }
    }

    if ( $self->get_type ne 'group' and $self->get_state eq 'finished' ) {
        my $parent = $self;
        while ( $parent = $parent->get_group ) {
            $parent->set_progress_cnt($parent->get_progress_cnt+1);
            $self->get_frontend->report_job_progress($parent);
        }
    }

    1;
}

sub emit_warning_message {
    my $self = shift;
    my ($warning) = @_;
    
    $self->get_frontend->report_job_warning($self, $warning);
    
    1;
}

sub get_job_cnt { 1 }

sub get_info {
    my $self = shift;
    return $self->get_title || $self->get_name || "Unnamed";
}

sub get_progress_fraction {
    my $self = shift;
    my $max = $self->get_progress_max || 0;
    my $cnt = $self->get_progress_cnt || 0;
    return $max == 0 ? 0 : $cnt / $max;
}

sub get_progress_percent {
    my $self = shift;
    return sprintf("%.2f", 100 * $self->get_progress_fraction);
}

sub get_progress_text {
    my $self = shift;
    return $self->get_info.": ".$self->get_progress_stats;
}

sub get_progress_stats {
    my $self = shift;

    my $cancelled = $self->get_cancelled ? "[".__("Cancelled")."]" : "";
    $cancelled  ||= $self->get_error_message ? "[".__("Error")."]" : "";
    $cancelled  ||= $self->get_skipped ? "[".__("Skipped")."]" : "";

    return __("Waiting")." ".$cancelled if $self->get_state eq 'waiting';

    my $cnt       = $self->get_progress_cnt;
    my $max       = $self->get_progress_max || 1;
    my $time      = ( time - $self->get_progress_start_time
                           - $self->get_paused_seconds );
    my $ips_label = $self->get_progress_ips;
    my $ips       = "";

    if ( $self->get_progress_end_time ) {
        $time = $self->get_progress_end_time
              - $self->get_progress_start_time
              - $self->get_paused_seconds;
        my $text = __x( "Duration: {time}", time => $self->format_time($time) );
        if ( $ips_label ) {
            $time ||= 1;
            $text .= ", $ips_label: ".sprintf( "%2.1f", $cnt / $time );
        }
        return $text." ".$cancelled;
    }

    return $cancelled if $self->get_no_progress;
    return __("Initializing")." ".$cancelled if ! defined $cnt;

    $ips = sprintf( ", %2.1f $ips_label", $cnt / $time )
        if $ips_label && $time;

    my $elapsed = "";
    $elapsed = ", "
        . __x( "elapsed {time}", time => $self->format_time($time) )
            if $self->get_type ne 'group';

    my $percent = $self->get_progress_percent.'%';
    $percent .= __" finished" if $self->get_type eq 'group';

    my $eta = "";
    $eta = ", ETA: "
        . $self->format_time( int( $time * $max / $cnt ) - $time + 1 )
        if $time > 5 && $cnt != 0 && $self->get_type ne 'group';

    my $int_percent = int( $cnt / $max * 100 );

    if ( $int_percent > $self->get_last_percent_logged + 10 ) {
        $int_percent = int( $int_percent / 10 ) * 10;
        $self->set_last_percent_logged($int_percent);
        my $line = $self->get_info . ": "
                . __x( "{percent}PERCENT done.",
                percent => $int_percent );
        $line =~ s/PERCENT/%/;
        $self->log($line);
    }

    $cancelled = " ".$cancelled if $cancelled;

    return "$percent$ips$elapsed$eta$cancelled";
}

sub format_time {
    my $self = shift;
    my ($time) = @_;

    my ($h, $m, $s);
    $h = int($time/3600);
    $m = int(($time-$h*3600)/60);
    $s = $time % 60;

    return sprintf ("%02d:%02d", $m, $s) if $h == 0;
    return sprintf ("%02d:%02d:%02d", $h, $m, $s);
}

sub log {
    my $self = shift;
    $self->get_frontend->log(@_);
    1;
}

sub progress_has_changed {
    my $self = shift;

    my $last_progress = $self->get_last_progress||"";
    my $curr_progress = $self->get_progress_cnt."/".$self->get_progress_max;

    if ( $last_progress ne $curr_progress ) {
        $self->set_last_progress($curr_progress);
        return 1;
    }
    else {
        return 0;
    }

}

sub frontend_signal {
    my $self = shift;
    my ($signal, @args) = @_;
    
    my $method = "signal_$signal";
    $self->get_frontend->$method(@args);
    
    1;
}

sub get_max_diskspace_consumed {
    my $self = shift;
    my ($currently_consumed, $max_consumed) = @_;

    $currently_consumed += $self->get_diskspace_consumed;

    if ( $currently_consumed > $max_consumed ) {
        $max_consumed = $currently_consumed;
    }

    $currently_consumed -= $self->get_diskspace_freed;
    
    return ($currently_consumed, $max_consumed);
}

sub backup_state {
    my $self = shift;
    
    my %data = %{$self};
    
    delete @data{
        qw(
            pre_callbacks
            post_callbacks
            error_callbacks
            warning_callbacks
            frontend
            group
            _post_callbacks_added
        )
    };

    $data{type} = $self->get_type;

    return \%data;
}

sub restore_state {
    my $self = shift;
    my ($data_href) = @_;
    
    if ( $data_href->{type} ne $self->get_type ) {
        die "Can't restore job state due to data type mismatch: ".
            "Job type=".$self->get_type.", ".
            "Data type=".$data_href->{type};
    }

    foreach my $key ( keys %{$data_href} ) {
        $self->{$key} = $data_href->{$key};
    }

    delete $self->{type};

    $self->set_state("waiting")
        if $self->get_state eq 'running';
    
    1;
}

sub add_stash {
    my $self = shift;
    my ($add_stash) = @_;
    
    my $stash = $self->get_stash;
    
    while ( my ($k, $v) = each %{$add_stash} ) {
        $stash->{$k} = $v;
    }
    
    1;
}

sub get_job_with_id {
    my $self = shift;
    my ($job_id) = @_;
    return $self if $job_id eq $self->get_id;
    return;
}

1;

__END__