/usr/local/CPAN/dvdrip/Video/DVDRip/Cluster/JobPlanner.pm
# $Id: JobPlanner.pm 2368 2009-02-22 18:26:44Z joern $
#-----------------------------------------------------------------------
# Copyright (C) 2001-2006 Jörn Reder <joern AT zyn.de>.
# All Rights Reserved. See file COPYRIGHT for details.
#
# This program is part of Video::DVDRip, which is free software; you can
# redistribute it and/or modify it under the same terms as Perl itself.
#-----------------------------------------------------------------------
package Video::DVDRip::Cluster::JobPlanner;
use base qw(Video::DVDRip::JobPlanner);
use Carp;
use strict;
use Locale::TextDomain qw (video.dvdrip);
use Event::ExecFlow qw (video.dvdrip);
sub build_cluster_transcode_job {
my $self = shift;
my $project = $self->get_project;
my $title = $project->title,
my @title_jobs;
my ($job, $last_job);
#-- work has to be done per PSU. Jobs for all PSU's
#-- are collected here
my @all_psu_jobs;
foreach my $psu ( @{ $title->program_stream_units } ) {
next if not $psu->selected;
push @all_psu_jobs, $self->build_cluster_psu_jobs($title, $psu);
}
#-- build a group for all PSU jobs
push @title_jobs, $job = Event::ExecFlow::Job::Group->new (
title => __x("Process all PSU's - title #{nr}", nr => $title->nr),
jobs => \@all_psu_jobs,
parallel => 1,
);
$last_job = $job;
if ( @all_psu_jobs > 1 ) {
#-- merge all PSU's
push @title_jobs, $job = $self->build_cluster_merge_psu_files_job($title);
$job->set_depends_on([$last_job]);
$last_job = $job;
}
else {
#-- move PSU 0 file to final destination
push @title_jobs, $job = Event::ExecFlow::Job::Command->new (
title => __"Move PSU #0 file to final destination",
command => sub {
"mv ".
$title->audio_video_psu_file." ".
$title->target_avi_file,
},
stash => { psu => 0 },
depends_on => [ $last_job ],
);
$last_job = $job;
}
#-- split video file
if ( $title->tc_split ) {
push @title_jobs, $job = $self->build_split_job($title);
$job->set_depends_on([$last_job]);
$job->set_stash({ prefer_local_access => 1 });
$last_job = $job;
}
#-- vobsub generation?
if ( $title->has_vobsub_subtitles ) {
push @title_jobs, $job = $self->build_vobsub_job($title);
$job->set_depends_on([$last_job]);
my $stash_method = $job->get_type eq 'group' ? "add_stash_to_all_jobs" : "add_stash";
$job->$stash_method({ prefer_local_access => 1 });
$last_job = $job;
}
#-- build a group for all jobs of this title
return Event::ExecFlow::Job::Group->new (
title => __x("Project '{project}' - title #{nr}",
project => $project->name, nr => $title->nr ),
jobs => \@title_jobs,
parallel => 1,
);
}
sub build_cluster_psu_jobs {
my $self = shift;
my ($title, $psu) = @_;
my ($job, $last_job);
#-- calculate number of chunks for this PSU
my $frames_per_chunk = $title->frames_per_chunk || 10000;
my $psu_frames = $psu->frames;
my $psu_nr = $psu->nr;
my $chunk_cnt = int( $psu_frames / $frames_per_chunk );
my $nodes_cnt =
1 + Video::DVDRip::Cluster::Master->get_master->get_online_nodes_cnt;
$chunk_cnt = $nodes_cnt if $chunk_cnt < $nodes_cnt;
$chunk_cnt = 2 if $chunk_cnt < 2;
$psu->set_chunk_cnt($chunk_cnt);
#-- jobs for this PSU are collected here
my @psu_jobs;
#-- first all audio transcoding jobs
my $tc_audio_job = $self->build_cluster_audio_jobs($title, $psu);
push @psu_jobs, $tc_audio_job if $tc_audio_job;
#-- then all video chunk jobs
my $tc_video_job;
push @psu_jobs, $tc_video_job = $self->build_cluster_video_chunk_jobs($title, $psu);
#-- add jobs for video chunk merging
my $tc_merge_video_job = $self->build_cluster_merge_video_chunks_job ($title, $psu);
$tc_merge_video_job->set_depends_on([$tc_video_job]);
push @psu_jobs, $tc_merge_video_job;
#-- add jobs for audio tracks merging
if ( $tc_audio_job ) {
my $tc_merge_audio_job = $self->build_cluster_merge_audio_tracks_job ($title, $psu);
if ( $tc_merge_audio_job ) {
$tc_merge_audio_job->set_depends_on([$tc_merge_video_job, $tc_audio_job]);
push @psu_jobs, $tc_merge_audio_job;
}
else {
#-- with one audio track video need to depend on the audio job
push @{$tc_merge_video_job->{depends_on}}, $tc_audio_job->get_name;
}
}
#-- build a group for all jobs of this PSU
return Event::ExecFlow::Job::Group->new (
title => __x("Process PSU #{nr}", nr => $psu->nr),
jobs => \@psu_jobs,
parallel => 1,
);
}
sub build_cluster_audio_jobs {
my $self = shift;
my ($title, $psu) = @_;
my @audio_jobs;
foreach my $audio ( @{$title->audio_tracks} ) {
#-- skip deactivated tracks
next if $audio->tc_target_track == -1;
my $vob_nr = $audio->tc_nr;
my $avi_nr = $audio->tc_target_track;
my $job = $self->build_transcode_audio_job($title, $vob_nr, $avi_nr);
$job->set_stash ({
prefer_local_access => 1,
psu => $psu->nr,
chunk_cnt => $psu->chunk_cnt,
vob_nr => $vob_nr,
avi_nr => $avi_nr,
});
push @audio_jobs, $job;
}
return unless @audio_jobs;
return Event::ExecFlow::Job::Group->new (
title => __x("Process all audio tracks - title #{nr}, PSU #{psu}",
nr => $title->nr, psu => $psu->nr ),
jobs => \@audio_jobs,
parallel => 1,
);
}
sub build_cluster_merge_audio_tracks_job {
my $self = shift;
my ($title, $psu) = @_;
my $mode = $title->is_ogg ? "all" : "skip1";
my @merge_audio_jobs;
my $first = 1;
my $last_job;
foreach my $audio ( sort { $a->tc_target_track <=> $b->tc_target_track }
@{$title->audio_tracks} ) {
#-- skip deactivated tracks
next if $audio->tc_target_track == -1;
#-- all or skip first?
if ( $mode eq 'skip1' && $first ) {
$first = 0;
next;
}
$first = 0;
my $vob_nr = $audio->tc_nr;
my $avi_nr = $audio->tc_target_track;
my $job = $self->build_merge_audio_job($title, $vob_nr, $avi_nr);
$job->set_depends_on([$last_job]) if $last_job;
$job->set_stash ({
prefer_local_access => 1,
psu => $psu->nr,
chunk_cnt => $psu->chunk_cnt,
vob_nr => $vob_nr,
avi_nr => $avi_nr,
});
push @merge_audio_jobs, $job;
$last_job = $job;
}
return unless @merge_audio_jobs;
return Event::ExecFlow::Job::Group->new (
title => __x("Merge all audio tracks - title #{nr}, PSU #{psu}",
nr => $title->nr, psu => $psu->nr ),
jobs => \@merge_audio_jobs,
parallel => 1,
);
}
sub build_cluster_video_chunk_jobs {
my $self = shift;
my ($title, $psu) = @_;
my @all_video_chunk_jobs;
my $chunk_cnt = $psu->chunk_cnt;
my $multipass = $title->tc_multipass;
my ($job, $last_job);
for ( my $i = 0; $i < $chunk_cnt; ++$i ) {
if ( $multipass ) {
#-- First pass
my $pass1_job = $self->build_transcode_video_pass_job($title, 1, undef, $i+1, $psu->nr);
$pass1_job->set_progress_max($psu->frames);
$pass1_job->set_stash({
chunk => $i,
chunk_cnt => $psu->chunk_cnt,
psu => $psu->nr,
});
#-- Second pass
my $pass2_job = $self->build_transcode_video_pass_job($title, 2, undef, $i+1, $psu->nr);
$pass2_job->set_progress_max($psu->frames);
$pass2_job->set_stash({
chunk => $i,
chunk_cnt => $psu->chunk_cnt,
psu => $psu->nr,
});
$pass2_job->set_depends_on([$pass1_job]);
#-- Build group for both passes
push @all_video_chunk_jobs, Event::ExecFlow::Job::Group->new (
title => __x("Multipass video transcoding - title #{nr}, PSU #{psu}, ".
"chunk #{chunk}",
nr => $title->nr, psu => $psu->nr, chunk => $i+1),
jobs => [ $pass1_job, $pass2_job ],
parallel => 1,
);
}
else {
#-- Single pass transcoding
my $tc_video_job = $self->build_transcode_video_pass_job($title, 0, undef, $i+1, $psu->nr);
$tc_video_job->set_progress_max($psu->frames);
$tc_video_job->set_stash({
chunk => $i,
chunk_cnt => $psu->chunk_cnt,
psu => $psu->nr,
});
push @all_video_chunk_jobs, $tc_video_job;
}
}
return Event::ExecFlow::Job::Group->new (
title => __x("Process all video chunks - title #{nr}, PSU #{psu}",
nr => $title->nr, psu => $psu->nr ),
jobs => \@all_video_chunk_jobs,
parallel => 1,
);
}
sub build_cluster_merge_video_chunks_job {
my $self = shift;
my ($title, $psu) = @_;
my $info = __x("Merge video chunks - title #{nr}, PSU #{psu}",
nr => $title->nr, psu => $psu->nr);
my $command = sub {
$title->get_merge_video_audio_command;
};
my $progress_parser = sub {
my ($job, $buffer) = @_;
if ( $buffer =~ /\(\d+-(\d+)\)/ ) {
# avimerge
$job->set_progress_cnt ($1);
} elsif ( $buffer =~ /(\d+)/ ) {
# ogmmerge
$job->set_progress_cnt ($1);
}
};
return Event::ExecFlow::Job::Command->new (
title => $info,
command => $command,
diskspace_consumed => 0, # $diskspace_consumed,
progress_ips => __"fps",
progress_max => $psu->frames,
progress_parser => $progress_parser,
stash => {
prefer_local_access => 1,
psu => $psu->nr,
},
);
}
sub build_cluster_merge_psu_files_job {
my $self = shift;
my ($title) = @_;
my $info = __x("Merge all PSU files - title #{nr}",
nr => $title->nr);
my $command = sub {
$title->get_merge_psu_command;
};
my $progress_parser = sub {
my ($job, $buffer) = @_;
if ( $buffer =~ /\(\d+-(\d+)\)/ ) {
# avimerge
$job->set_progress_cnt ($1);
} elsif ( $buffer =~ /(\d+)/ ) {
# ogmmerge
$job->set_progress_cnt ($1);
}
};
return Event::ExecFlow::Job::Command->new (
title => $info,
command => $command,
diskspace_consumed => 0, # $diskspace_consumed,
progress_ips => __"fps",
progress_max => $title->frames,
progress_parser => $progress_parser,
stash => {
prefer_local_access => 1,
},
);
}
1;