/usr/local/CPAN/dvdrip/Video/DVDRip/Cluster/Node.pm


# $Id: Node.pm 2369 2009-02-22 18:27:33Z joern $

#-----------------------------------------------------------------------
# Copyright (C) 2001-2006 Jörn Reder <joern AT zyn.de>.
# All Rights Reserved. See file COPYRIGHT for details.
#
# This program is part of Video::DVDRip, which is free software; you can
# redistribute it and/or modify it under the same terms as Perl itself.
#-----------------------------------------------------------------------

package Video::DVDRip::Cluster::Node;
use Locale::TextDomain qw (video.dvdrip);

use base Video::DVDRip::Base;

use Carp;
use strict;

use FileHandle;
use Data::Dumper;
use Scalar::Util;

sub state                       { shift->{state}                        }
sub filename                    { shift->{filename}                     }
sub name                        { shift->{name}                         }
sub hostname                    { my $s = shift; $s->{hostname} || $s->{name} }
sub data_base_dir               { shift->{data_base_dir}                }
sub is_master                   { shift->{is_master}                    }
sub data_is_local               { shift->{data_is_local}                }
sub username                    { shift->{username}                     }
sub ssh_cmd                     { shift->{ssh_cmd}                      }
sub speed                       { shift->{speed}                        }
sub tc_options                  { shift->{tc_options}                   }
sub answered_last_ping          { shift->{answered_last_ping}           }
sub speed_index                 { shift->{speed_index}                  }

sub progress_cnt                { shift->{progress_cnt}                 }
sub progress_max                { shift->{progress_max}                 }
sub progress_merge              { shift->{progress_merge}               }
sub progress_start_time         { shift->{progress_start_time}          }
sub assigned_job                { shift->{assigned_job}                 }
sub assigned_chunk              { shift->{assigned_chunk}               }

sub set_state {
    my $self = shift, my ($state) = @_;

    $self->{state} = $state;

    Video::DVDRip::Cluster::Master->get_master->emit_event( "NODE_UPDATE",
        $self->name );

    return $state;
}

sub set_filename                { shift->{filename}             = $_[1] }
sub set_name                    { shift->{name}                 = $_[1] }
sub set_hostname                { shift->{hostname}             = $_[1] }
sub set_data_base_dir           { shift->{data_base_dir}        = $_[1] }
sub set_is_master               { shift->{is_master}            = $_[1] }
sub set_data_is_local           { shift->{data_is_local}        = $_[1] }
sub set_username                { shift->{username}             = $_[1] }
sub set_ssh_cmd                 { shift->{ssh_cmd}              = $_[1] }
sub set_speed                   { shift->{speed}                = $_[1] }
sub set_tc_options              { shift->{tc_options}           = $_[1] }
sub set_answered_last_ping      { shift->{answered_last_ping}   = $_[1] }
sub set_speed_index             { shift->{speed_index}          = $_[1] }

sub set_progress_cnt            { shift->{progress_cnt}         = $_[1] }
sub set_progress_max            { shift->{progress_max}         = $_[1] }
sub set_progress_merge          { shift->{progress_merge}       = $_[1] }
sub set_progress_start_time     { shift->{progress_start_time}  = $_[1] }
sub set_assigned_job            {
    my $self = shift;
    my ($job) = @_;
    $self->{assigned_job} = $job;
    Scalar::Util::weaken($self->{assigned_job});
    return $job;
}
sub set_assigned_chunk          { shift->{assigned_chunk}       = $_[1] }

sub test_finished               { shift->{test_finished}                }
sub test_result                 { shift->{test_result}                  }

sub set_test_finished           { shift->{test_finished}        = $_[1] }
sub set_test_result             { shift->{test_result}          = $_[1] }

sub alive { shift->{alive} }

sub set_alive {
    my $self = shift;
    my ($alive) = @_;

    my $was_alive = $self->alive;

    $self->{alive} = $alive;

    if ( not $alive ) {
        $self->stop if $was_alive;
        $self->set_state("offline");
        $self->save;

    }
    elsif ($self->state eq "offline"
        or $self->state eq "unknown" ) {
        $self->set_state("idle");
        $self->save;
    }

    1;
}

sub project_name {
    my $self = shift;
    my $job  = $self->assigned_job;
    return "" if not $job;
    return $job->project->label;
}

sub new {
    my $class = shift;
    my %par   = @_;
    my ( $name, $hostname, $data_base_dir )
        = @par{ 'name', 'hostname', 'data_base_dir' };

    my ( $is_master, $username, $tc_options )
        = @par{ 'is_master', 'username', 'tc_options' };

    my $self = {
        name          => $name,
        hostname      => $hostname,
        data_base_dir => $data_base_dir,
        is_master     => $is_master,
        username      => $username,
        tc_options    => $tc_options,
        alive         => $is_master,
    };

    bless $self, $class;

    if ($is_master) {
        $self->set_state("idle");
    }
    else {
        $self->set_state("unknown");
    }

    return $self;
}

sub new_from_file {
    my $class      = shift;
    my %par        = @_;
    my ($filename) = @par{'filename'};

    confess "missing filename" if not $filename;

    my $self = bless { filename => $filename, }, $class;

    $self->load;

    $self->set_filename($filename);

    return $self;
}

sub load {
    my $self = shift;

    my $filename = $self->filename;
    croak "no filename set"      if not $filename;
    croak "can't read $filename" if not -r $filename;

    my $fh = FileHandle->new;
    open( $fh, $filename ) or croak "can't read $filename";
    my $data_blob = join( '', <$fh> );
    close $fh;

    my $data;
    $data = eval($data_blob);
    croak "can't load $filename. Perl error: $@" if $@;

    %{$self} = %{$data};

    1;
}

sub save {
    my $self = shift;

    my $filename = $self->filename;
    confess "not filename set" if not $filename;

    my $assigned_job = $self->assigned_job;
    my $test_result  = $self->test_result;
    $self->set_assigned_job(undef);
    $self->set_test_result(undef);

    my $dd = Data::Dumper->new( [$self], ['data'] );
    $dd->Indent(1);
    my $data = $dd->Dump;

    $self->set_assigned_job($assigned_job);
    $self->set_test_result($test_result);

    my $fh = FileHandle->new;

    open( $fh, "> $filename" ) or confess "can't write $filename";
    print $fh $data;
    close $fh;

    Video::DVDRip::Cluster::Master->get_master->emit_event( "NODE_UPDATE",
        $self->name );

    1;
}

sub prepare_command {
    my $self = shift;
    my ($command, $job) = @_;

    if ( $job ) {
        my %params = (
            DVDRIP_NODE_DATA_BASE_DIR   => $self->data_base_dir,
            DVDRIP_NODE_NAME            => $self->name,
            DVDRIP_JOB_PSU              => sprintf("%02d",$job->get_stash->{psu}+0),
            DVDRIP_JOB_ADD_ONE_PSU      => sprintf("%02d",$job->get_stash->{psu}+1),
            DVDRIP_JOB_CHUNK            => sprintf("%05d",$job->get_stash->{chunk}),
            DVDRIP_JOB_AVI_NR           => sprintf("%02d",$job->get_stash->{avi_nr}),
            DVDRIP_JOB_CHUNK_CNT        => $job->get_stash->{chunk_cnt},
        );
        $command =~ s/(DVDRIP_[A-Z_]+)/$params{$1}/eg;
    }

    if ( $self->is_master ) {
        $command = "execflow $command" unless $command =~ /execflow/;
        $command = "umask 0002; $command";
        return $command;
    }

    $command = "umask 0002; $command";

    warn "Node's transcode options currently ignored" if $self->tc_options;

    #-- Set LD_ASSUME_KERNEL on the nodes as well,
    #-- if set on the master.
    $command = "export LD_ASSUME_KERNEL=$ENV{LD_ASSUME_KERNEL}; $command"
        if $ENV{LD_ASSUME_KERNEL};

    my $username = $self->username;
    my $name     = $self->hostname;
    my $ssh_cmd  = $self->ssh_cmd || 'ssh';

    $ssh_cmd .= " -o PreferredAuthentications=publickey";

    $command =~ s/execflow/`which nice`/g;
    $command =~ s/"/\\"/g;
    $command = qq{execflow $ssh_cmd $username\@$name "$command"};

    return $command;
}

sub reset {
    my $self = shift;

    my $startup_state = $self->state;

    $self->set_alive(0);
    $self->set_state($startup_state);
    $self->set_answered_last_ping(0);
    $self->set_state( $self->is_master ? 'idle' : 'unknown' )
        if $startup_state  ne 'stopped'
        and $startup_state ne 'aborted';
    $self->save;

    1;
}

sub progress {
    my $self = shift;
    return "" if not $self->assigned_job;
    return $self->assigned_job->get_progress_text;
}

sub stop {
    my $self = shift;

    my $job = $self->assigned_job;

    $self->set_state('stopped');
    $self->log( __x( "Node '{node}' stopped", node => $self->name ) );
    $self->save;

    $job->cancel if $job;

    1;
}

sub start {
    my $self = shift;

    croak "Can't start a non stopped node"
        if $self->state  ne 'stopped'
        and $self->state ne 'aborted';

    $self->log( __x( "Node '{node}' started", node => $self->name ) );

    $self->set_alive(0);
    $self->set_state( $self->is_master ? 'idle' : 'unknown' );
    $self->set_answered_last_ping(1);
    $self->save;

    Video::DVDRip::Cluster::Master->get_master->node_check;
    Video::DVDRip::Cluster::Master->get_master->job_control;

    1;
}

sub job_info {
    my $self = shift;

    my $job = $self->assigned_job;

    my $info;
    if ( not $job ) {
        $info = $self->state;
    }
    else {
        $info = $job->nr . ": " . $job->project->label . ": " . $job->info;
    }

    return $info;
}

sub run_tests {
    my $self          = shift;
    my %par           = @_;
    my ($cb_finished) = @par{'cb_finished'};

    # First reset the finished flag
    $self->set_test_finished(0);

    # get test command for this node
    my $command = $self->get_test_command;

    my $popen_command = $self->prepare_command($command);

    my $output = "";
    Video::DVDRip::Cluster::Pipe->new(
        command      => $popen_command,
        timeout      => 5,
        cb_line_read => sub { $output .= $_[0] . "\n" },
        cb_finished => sub {
            $self->log(
                __x( "Node {name} finished tests", name => $self->name ) );
            $self->set_test_result(
                $self->parse_test_output( output => $output ) );
            $self->set_test_finished(1);
            &$cb_finished() if $cb_finished;
        }
    )->open;

    1;
}

sub get_test_command {
    my $self = shift;

    my $data_base_dir = $self->data_base_dir;

    my $command          = "sh -c 'export LC_ALL=C; ";
    my $create_test_file =
        $self->is_master
        ? "touch $data_base_dir/00DVDRIP-CLUSTER; "
        : "";

    # 1. confirm ssh connection
    $command
        .= "echo --ssh_connect--; " . "echo Ok; " . "echo --ssh_connect--;";

    # 2. get content of data_base_dir
    $command .= "echo --data_base_dir_content--; "
        . $create_test_file
        . "cd $data_base_dir && echo * | perl -pe \"s/ /chr(10)/eg\" | sort;"
        . "echo --data_base_dir_content--; ";

    # 3. try writing in the data_base_dir
    my $test_file = "$data_base_dir/" . $self->name . "-file-write-test";
    $command .= "echo --write_test--; "
        . "echo node write test > $test_file && echo SUCCESS; "
        . "rm -f $test_file; "
        . "echo --write_test--; ";

    # 4. program versions
    my $depend = $self->depend_object;
    
    $command .= "echo --program_versions--; ";
    foreach my $tool ( sort keys %{$depend->tools} ) {
        my $def = $depend->tools->{$tool};
        next if not $def->{cluster};
        $command .= $def->{version_cmd}." 2>&1;";
    }
    $command .= "echo --program_versions--; ";
    $command .= "' 2>&1";

    return $command;
}

sub parse_test_output {
    my $self     = shift;
    my %par      = @_;
    my ($output) = @par{'output'};

    # parse output
    my %result;
    $result{output} = $output;
    foreach my $case (
        qw ( ssh_connect data_base_dir_content
             write_test program_versions ) ) {
        $output =~ s/--$case--\n(.*?)--$case--//s;
        $result{$case} = $1;
    }

    $result{output_rest} = $output;

    return \%result;
}

1;