/usr/local/CPAN/dvdrip/Video/DVDRip/Cluster/Scheduler.pm
# $Id: Scheduler.pm 2301 2007-04-13 11:20:43Z joern $
#-----------------------------------------------------------------------
# Copyright (C) 2001-2006 Jörn Reder <joern AT zyn.de>.
# All Rights Reserved. See file COPYRIGHT for details.
#
# This program is part of Video::DVDRip, which is free software; you can
# redistribute it and/or modify it under the same terms as Perl itself.
#-----------------------------------------------------------------------
package Video::DVDRip::Cluster::Scheduler;
use strict;
use base qw(Event::ExecFlow::Scheduler::SimpleMax);
use Locale::TextDomain qw (video.dvdrip);
use Video::DVDRip::Cluster::ExecFlowFrontend;
my $DEBUG = 0;
sub is_exclusive { 1 }
sub get_exec_flow_group { shift->{exec_flow_group} }
sub get_max { shift->{max} }
sub get_cnt { shift->{cnt} }
sub get_jobs_by_id { shift->{jobs_by_id} }
sub get_jobs_by_project_id { shift->{jobs_by_project_id} }
sub get_projects_by_job_id { shift->{projects_by_job_id} }
sub set_exec_flow_group { shift->{exec_flow_group} = $_[1] }
sub set_max { shift->{max} = $_[1] }
sub set_cnt { shift->{cnt} = $_[1] }
sub set_jobs_by_id { shift->{jobs_by_id} = $_[1] }
sub set_jobs_by_project_id { shift->{jobs_by_project_id} = $_[1] }
sub set_projects_by_job_id { shift->{projects_by_job_id} = $_[1] }
sub get_master { Video::DVDRip::Cluster::Master->get_master }
sub new {
my $class = shift;
my %par = @_;
my ($max) = @par{'max'};
$max ||= 3;
my $self = bless {
max => $max,
cnt => 0,
jobs_by_id => {},
jobs_by_project_id => {},
projects_by_job_id => {},
}, $class;
my $exec_flow_group = Event::ExecFlow::Job::Group->new (
name => "all_projects",
title => __"All running cluster projects",
parallel => 1,
scheduler => $self,
frontend => Video::DVDRip::Cluster::ExecFlowFrontend->new(),
fail_with_members => 0,
stop_on_failure => 0,
);
$self->set_exec_flow_group($exec_flow_group);
$self->set_jobs_by_id ({
$exec_flow_group->get_id => $exec_flow_group,
});
return $self;
}
sub add_project {
my $self = shift;
my ($project) = @_;
my $job = $project->job;
$self->get_exec_flow_group->add_job($job);
$job->set_group_in_all_childs;
$job->traverse_all_jobs(sub {
$_[0]->set_fail_with_members(0)
if $_[0]->get_type eq 'group';
});
$self->register_all_jobs($project);
$self->get_jobs_by_project_id->{$project->id} = $job;
$job->get_member_finished_callbacks->add(sub {
$self->get_master->emit_event(
"PROJECT_UPDATE", $project->id
);
});
$job->get_pre_callbacks->add(sub {
$project->set_state("running");
$self->get_master->emit_event(
"PROJECT_UPDATE", $project->id
);
});
$job->get_post_callbacks->add(sub {
$project->set_state($job->get_state);
$self->get_master->emit_event(
"PROJECT_UPDATE", $project->id
);
$project->save;
});
$self->get_master->emit_event(
"JOB_UPDATE", $self->get_exec_flow_group->get_id
);
1;
}
sub cancel_project {
my $self = shift;
my ($project) = @_;
return unless $self->get_jobs_by_project_id->{$project->id};
$project->set_cancel_in_progress(1);
$project->job->cancel;
$project->set_state("cancelled");
$project->save;
$project->set_cancel_in_progress(0);
1;
}
sub restart_project {
my $self = shift;
my ($project) = @_;
return unless $self->get_jobs_by_project_id->{$project->id};
$project->job->reset_non_finished_jobs;
$project->set_state("waiting");
$project->save;
my $exec_flow_group = $self->get_exec_flow_group;
$exec_flow_group->set_state("waiting")
unless $exec_flow_group->get_state eq 'running';
$exec_flow_group->init_progress_state();
1;
}
sub remove_project {
my $self = shift;
my ($project) = @_;
return unless $self->get_jobs_by_project_id->{$project->id};
my $job = $project->job;
my $exec_flow_group = $self->get_exec_flow_group;
my $cnt = $job->get_type eq 'group' ? $job->get_progress_cnt : 1;
$exec_flow_group->decrease_progress_cnt($cnt);
$exec_flow_group->remove_job($job);
$self->deregister_all_jobs($job);
delete $self->get_jobs_by_project_id->{$project->id};
$self->get_master->emit_event(
"JOB_UPDATE", $self->get_exec_flow_group->get_id
);
1;
}
sub register_all_jobs {
my $self = shift;
my ($project, $job) = @_;
$job ||= $project->job;
$self->get_jobs_by_id->{$job->get_id} = $job;
$self->get_projects_by_job_id->{$job->get_id} = $project;
if ( $job->get_type eq 'group' ) {
foreach my $child ( @{$job->get_jobs} ) {
$self->register_all_jobs($project, $child);
}
}
1;
}
sub deregister_all_jobs {
my $self = shift;
my ($job) = @_;
delete $self->get_jobs_by_id->{$job->get_id};
delete $self->get_projects_by_job_id->{$job->get_id};
if ( $job->get_type eq 'group' ) {
foreach my $child ( @{$job->get_jobs} ) {
$self->deregister_all_jobs($child);
}
}
1;
}
sub init {
my $self = shift;
$self->get_exec_flow_group->set_group_in_all_childs;
$self->get_exec_flow_group->init_progress_state();
1;
}
sub job_finished {
my $self = shift;
my ($job) = @_;
return if $job->get_type ne 'command';
my $node = $job->get_node;
$node->set_assigned_job(undef);
$job->set_node(undef);
$node->set_state("idle")
if $node->state ne 'stopped';
$job->reset if $job->get_cancelled;
$self->run;
1;
}
#---------------------------------------------------------------------
sub run {
my $self = shift;
#-- get idle nodes
my ($local_nodes_lref, $remote_nodes_lref) = $self->get_idle_nodes;
#-- nothing to do if no node is idle
return 1 if @{$local_nodes_lref} + @{$remote_nodes_lref} == 0;
#-- collect jobs, separate local from remote capable jobs
my (@local_jobs, @remote_jobs);
$self->traverse_job_tree(
$self->get_exec_flow_group,
\@local_jobs,
\@remote_jobs,
scalar(@{$local_nodes_lref}),
scalar(@{$remote_nodes_lref}),
);
if ( @local_jobs == 0 && @remote_jobs == 0 ) {
#-- no jobs found. check if a project has jobs with errors
#-- obviously have jobs executed with errors
foreach my $project_job ( @{$self->get_exec_flow_group->get_jobs} ) {
if ( $project_job->get_state eq 'running' &&
$project_job->get_error_message eq '' ) {
my $has_errors;
$project_job->traverse_all_jobs (sub {
my ($job) = @_;
if ( $job->get_error_message ) {
$has_errors = 1;
$project_job->add_job_error_message($job);
}
});
$project_job->execution_finished if $has_errors;
}
}
return;
}
#-- start local jobs on local nodes first
$DEBUG && print "-"x80,"\n";
$DEBUG && print "local jobs = ".join("\n ", map {$_->get_info} @local_jobs)."\n";
$DEBUG && print "local nodes = ".join("\n ", map {$_->name} @{$local_nodes_lref})."\n";
if ( @local_jobs && @{$local_nodes_lref} ) {
$self->start_jobs_on_nodes(\@local_jobs, $local_nodes_lref);
}
#-- start remote jobs on remote nodes next
$DEBUG && print "-"x80,"\n";
$DEBUG && print "remote jobs = ".join("\n ", map {$_->get_info} @remote_jobs)."\n";
$DEBUG && print "remote nodes = ".join("\n ", map {$_->name} @{$remote_nodes_lref})."\n";
if ( @remote_jobs && @{$remote_nodes_lref} ) {
$self->start_jobs_on_nodes(\@remote_jobs, $remote_nodes_lref);
}
#-- do we have remote jobs left and local nodes?
$DEBUG && print "-"x80,"\n";
$DEBUG && print "remote jobs = ".join("\n ", map {$_->get_info} @remote_jobs)."\n";
$DEBUG && print "local nodes = ".join("\n ", map {$_->name} @{$local_nodes_lref})."\n";
if ( @remote_jobs && @{$local_nodes_lref} ) {
$self->start_jobs_on_nodes(\@remote_jobs, $local_nodes_lref);
}
#-- do we have local jobs left and remote nodes?
$DEBUG && print "-"x80,"\n";
$DEBUG && print "local jobs = ".join("\n ", map {$_->get_info} @local_jobs)."\n";
$DEBUG && print "remote nodes = ".join("\n ", map {$_->name} @{$remote_nodes_lref})."\n";
if ( @local_jobs && @{$remote_nodes_lref} ) {
$self->start_jobs_on_nodes(\@local_jobs, $remote_nodes_lref);
}
$DEBUG && print "-"x80,"\n";
1;
}
sub get_idle_nodes {
my $self = shift;
my $master = $self->get_master;
my (@local_nodes, @remote_nodes) = @_;
foreach my $node ( sort { $b->speed_index <=> $a->speed_index }
@{$master->nodes} ) {
next if $node->state ne 'idle';
if ( $node->data_is_local ) {
push @local_nodes, $node;
}
else {
push @remote_nodes, $node
}
}
return (\@local_nodes, \@remote_nodes);
}
sub traverse_job_tree {
my $self = shift;
my ($job, $local_jobs_lref, $remote_jobs_lref, $local_cnt, $remote_cnt) = @_;
return if @{$local_jobs_lref} >= $local_cnt &&
@{$remote_jobs_lref} >= $remote_cnt;
if ( $job->get_type eq 'group' ) {
if ( $job->get_state =~ /^(?:waiting|running)$/ &&
( !$job->get_group ||
$job->get_group->dependencies_ok($job)
) ) {
foreach my $child ( @{$job->get_jobs} ) {
$self->traverse_job_tree(
$child, $local_jobs_lref, $remote_jobs_lref,
$local_cnt, $remote_cnt
);
return if @{$local_jobs_lref} >= $local_cnt &&
@{$remote_jobs_lref} >= $remote_cnt;
}
}
}
else {
if ( $job->get_state eq 'waiting' &&
$job->get_group->dependencies_ok($job) ) {
if ( $job->get_stash->{prefer_local_access} ) {
push @{$local_jobs_lref}, $job;
}
else {
push @{$remote_jobs_lref}, $job;
}
}
}
}
sub start_jobs_on_nodes {
my $self = shift;
my ($jobs, $nodes) = @_;
while ( my $job = shift @{$jobs} ) {
my $node = shift @{$nodes};
last if !$node;
$job->set_node($node);
$self->start_job($job);
}
1;
}
sub start_job {
my $self = shift;
my ($job) = @_;
#-- start all parent groups if not yet started
my $group = $job->get_group;
while ( $group ) {
if ( $group->get_state eq 'waiting' ) {
$group->start;
}
$group = $group->get_group;
}
#-- start job via group
$job->get_node->set_state("running");
$job->get_node->set_assigned_job($job);
$job->get_group->start_child_job($job);
1;
}
1;