/usr/local/CPAN/dvdrip/Video/DVDRip/Cluster/Master.pm


# $Id: Master.pm 2322 2007-08-05 16:56:51Z joern $
#-----------------------------------------------------------------------
# Copyright (C) 2001-2006 Jörn Reder <joern AT zyn.de>.
# All Rights Reserved. See file COPYRIGHT for details.
#
# This program is part of Video::DVDRip, which is free software; you can
# redistribute it and/or modify it under the same terms as Perl itself.
#-----------------------------------------------------------------------

package Video::DVDRip::Cluster::Master;
use Locale::TextDomain qw (video.dvdrip);

use base Video::DVDRip::Base;

use Event;
use constant NICE => -1;

use Video::DVDRip::Cluster::Node;
use Video::DVDRip::Cluster::Project;
use Video::DVDRip::Cluster::Pipe;
use Video::DVDRip::Cluster::Scheduler;

use Carp;
use strict;

use FileHandle;
use Data::Dumper;

sub config_filename             { shift->{config_filename}              }
sub data_dir                    { shift->{data_dir}                     }
sub node_dir                    { shift->{node_dir}                     }
sub project_dir                 { shift->{project_dir}                  }
sub nodes                       { shift->{nodes}                        }
sub projects                    { shift->{projects}                     }
sub job_id                      { shift->{job_id}                       }
sub project_id                  { shift->{project_id}                   }
sub in_job_control              { shift->{in_job_control}               }
sub node_check_watcher          { shift->{node_check_watcher}           }
sub rpc_server                  { shift->{rpc_server}                   }
sub scheduler                   { shift->{scheduler}                    }

sub set_config_filename         { shift->{config_filename}      = $_[1] }
sub set_data_dir                { shift->{data_dir}             = $_[1] }
sub set_node_dir                { shift->{node_dir}             = $_[1] }
sub set_project_dir             { shift->{project_dir}          = $_[1] }
sub set_nodes                   { shift->{nodes}                = $_[1] }
sub set_projects                { shift->{projects}             = $_[1] }
sub set_job_id                  { shift->{job_id}               = $_[1] }
sub set_project_id              { shift->{project_id}           = $_[1] }
sub set_in_job_control          { shift->{in_job_control}       = $_[1] }
sub set_node_check_watcher      { shift->{node_check_watcher}   = $_[1] }
sub set_rpc_server              { shift->{rpc_server}           = $_[1] }
sub set_scheduler               { shift->{scheduler}            = $_[1] }

my $MASTER_OBJECT;
sub get_master { $MASTER_OBJECT }

sub new {
    my $class = shift;
    my %par   = @_;
    my ( $logger, $rpc_server ) = @par{ 'logger', 'rpc_server' };

    my $self = bless {
        data_dir        => $ENV{HOME} . "/.dvdrip-master",
        node_dir        => $ENV{HOME} . "/.dvdrip-master/nodes",
        project_dir     => $ENV{HOME} . "/.dvdrip-master/projects",
        config_filename => $ENV{HOME} . "/.dvdrip-master/master.conf",
        nodes           => [],
        projects        => [],
        job_id          => 0,
        logger          => $logger,
        rpc_server      => $rpc_server,
        scheduler       => Video::DVDRip::Cluster::Scheduler->new(),
    }, $class;

    $MASTER_OBJECT = $self;

    $self->set_logger($logger);

    if ( not -d $self->data_dir ) {
        mkdir( $self->data_dir, 0755 )
            or croak "can't create directory '" . $self->data_dir . "'";

    }

    if ( not -d $self->node_dir ) {
        mkdir( $self->node_dir, 0755 )
            or croak "can't create directory '" . $self->node_dir . "'";

    }

    if ( not -d $self->project_dir ) {
        mkdir( $self->project_dir, 0755 )
            or croak "can't create directory '" . $self->project_dir . "'";

    }

    $self->log( __ "Master daemon activated" );

    $self->load;

    $self->enable_node_check
        unless $self->node_check_unnecessary;

    foreach my $signal ( "INT", "HUP", "TERM" ) {
	    Event->signal (
		    signal => $signal,
		    cb     => sub {
                        $self->log(
                            __x("Got signal {signal}.",
                                signal => $signal)
                        );
                        $self->shutdown("now");
                    },
	    );
    }

    return $self;
}

my $FPING;
sub check_prerequisites {
    my $class = shift;

    foreach my $path ( "/usr/bin", "/usr/sbin" ) {
        if ( -f "$path/fping" and -x "$path/fping" ) {
            $FPING = "$path/fping";
            last;
        }
    }

    if ( !$FPING ) {
        croak "can't find a executable fping in /usr/bin and /usr/sbin";
    }

    my ( $mode, $uid ) = ( stat($FPING) )[ 2, 4 ];
    my $suid = $mode & 04000;

    croak "$FPING is not suid root"
        if not $suid or $uid != 0;

    1;
}

sub node_check_unnecessary {
    my $self = shift;
    return;
    return if $self->rpc_server->get_clients_connected;
    return if @{ $self->job_get_unfinished_projects };
    return 1;
}

sub enable_node_check {
    my $self = shift;

    return if $self->node_check_watcher;

    $self->node_check;

    my $watcher = Event->timer(
        interval => 5,
        cb       => sub { $self->node_check },
        desc     => "node check timer"
    );

    $self->log("Node check watcher enabled");

    $self->set_node_check_watcher($watcher);

    1;
}

sub disable_node_check {
    my $self = shift;

    return if not $self->node_check_watcher;

    $self->node_check_watcher->cancel;
    $self->set_node_check_watcher(undef);

    $self->log( __ "Node check watcher disabled" );

    1;
}

sub node_check {
    my $self = shift;

    my $nodes_list;
    foreach my $node ( @{ $self->nodes } ) {
        $nodes_list .= $node->hostname . " "
            if $node->state ne 'stopped'
            and not $node->is_master;
    }

    return 1 if not $nodes_list;

    my $command = "$FPING $nodes_list";

    my $buffer;

    Video::DVDRip::Cluster::Pipe->new(
        timeout      => 8,
        command      => $command,
        no_log       => 1,
        cb_line_read => sub {
            $self->log(4, "fping: $_[0]");
            $buffer .= $_[0] . "\n";
            1;
        },
        cb_finished => sub {
            if ( $buffer =~ /^\s*$/ ) {
                $self->log( __ "Warning: node check fping reported nothing" );
                return;
            }
            my $node_name;
            my $idle_nodes;
            foreach my $node ( @{ $self->nodes } ) {
                next if $node->state eq 'stopped';
                if ( $node->is_master ) {
                    ++$idle_nodes if $node->state eq 'idle';
                    next;
                }

                $node_name = $node->hostname;
                if ( $buffer =~ /^$node_name\s+is\s+alive/m ) {
                    if ( not $node->alive and $node->answered_last_ping == 2 )
                    {
                        $self->log(
                            __x("Node '{node_name}' is now online.",
                                node_name => $node_name
                            )
                        );
                        $node->set_alive(1);
                    }
                    if ( not $node->alive and $node->answered_last_ping == 1 )
                    {
                        $self->log(
                            __x("Node '{node_name}' is still reachable. Will be online in 5 seconds.",
                                node_name => $node_name
                            )
                        );
                        $node->set_answered_last_ping(2);
                    }
                    if ( not $node->alive and not $node->answered_last_ping )
                    {
                        $self->log(
                            __x("Node '{node_name}' is now reachable. Will be online in 10 seconds.",
                                node_name => $node_name
                            )
                        );
                        $node->set_answered_last_ping(1);
                    }
                    if ( $node->alive == 0.5 ) {
                        $self->log(
                            __x("Node '{node_name}' is Ok again",
                                node_name => $node_name
                            )
                        );
                        $node->set_alive(1);
                    }
                }
                else {
                    $node->set_answered_last_ping(0);
                    if ( $node->alive == 0.5 ) {
                        $self->log(
                            __x("Warning: Node '{node_name}' is unreachable",
                                node_name => $node_name
                            )
                            )
                            if $node->alive
                            or $node->state eq 'unknown';
                        $node->set_alive(0);
                    }
                    elsif ( $node->alive ) {
                        $self->log(
                            __x("Warning: Node '{node_name}' possibly offline",
                                node_name => $node_name
                            )
                        );
                        $node->set_alive(0.5);
                    }
                    else {
                        $node->set_alive(0);
                    }
                }
                ++$idle_nodes if $node->state eq 'idle';
            }

            $self->disable_node_check
                if $self->node_check_unnecessary;

            $self->job_control
                if $idle_nodes
                and not $self->in_job_control;
        },
    )->open;

    1;
}

sub hello {
    my $self = shift;

    $self->enable_node_check;

    1;
}

sub load {
    my $self = shift;

    my $filename = $self->config_filename;
    if ( not -f $filename ) {
        $self->save;
    }

    my $fh = FileHandle->new;
    open( $fh, $filename )
        or croak "can't read master config file '$filename'";
    my $data_blob = join( '', <$fh> );
    close $fh;

    my $data;
    $data = eval $data_blob;
    croak "Error loading master config file '$filename': $@" if $@;

    $self->set_job_id( $data->{job_id} );
    $self->set_project_id( $data->{project_id} );

    $self->load_nodes;
    $self->load_projects( project_order => $data->{project_order} );

    1;
}

sub save {
    my $self = shift;

    my $filename = $self->config_filename;

    my @project_order = map { $_->filename } @{ $self->projects };

    my $data = {
        job_id        => $self->job_id,
        project_id    => $self->project_id,
        project_order => \@project_order,
    };

    my $dd = Data::Dumper->new( [$data], ['data'] );
    $dd->Indent(1);
    my $data_blob = $dd->Dump;

    my $fh = FileHandle->new;
    open( $fh, "> $filename" )
        or croak "can't write master config file '$filename'";
    print $fh $data_blob;
    close $fh;

    1;
}

sub load_nodes {
    my $self = shift;

    my $dir = $self->node_dir;

    my @nodes;
    foreach my $file (<$dir/*>) {
        $self->log( __x( "Loading node file '{file}'", file => $file ) );
        my $node
            = Video::DVDRip::Cluster::Node->new_from_file( filename => $file,
            );
        $node->reset;
        push @nodes, $node;
    }

    $self->set_nodes( \@nodes );

    1;
}

sub load_projects {
    my $self = shift;
    my %par = @_;
    my ($project_order) = @par{'project_order'};

    my $dir = $self->project_dir;

    my @projects;
    foreach my $filename ( @{$project_order} ) {
        next if not -r $filename;
        $self->log(
            __x( "Loading project file '{filename}'", filename => $filename )
        );
        my $project = Video::DVDRip::Cluster::Project->new_from_file(
            filename => $filename, );
        push @projects, $project;
    }

    $self->set_projects( \@projects );

    $self->scheduler->init;

    1;
}

sub emit_event {
    my $self = shift;
    my ($event, @args) = @_;

    my $rpc_server  = $self->rpc_server;
    my $log_clients = $rpc_server->get_logging_clients;

    my $sock;
    foreach my $client ( values %{$log_clients} ) {
        $sock = $client->get_sock;
        print $sock "EVENT\t$event\t" . join( "\t", @args ) . "\n";
    }

    1;
}

sub add_node {
    my $self = shift;
    my %par = @_;
    my ($node) = @par{'node'};

    my $filename = $self->node_dir . '/' . $node->name . '.node';

    croak "msg: " . __ "Node must have a name" if $node->name eq '';
    croak "msg: " . __ "Node with this name already exists" if -f $filename;

    $node->set_state("idle") if $node->is_master;

    push @{ $self->nodes }, $node;
    $node->set_filename($filename);
    $node->save;

    $self->log(
        __x("Node '{node_name}' saved to '{filename}'",
            node_name => $node->name,
            filename  => $filename
        )
    );

    1;
}

sub remove_node {
    my $self = shift;
    my %par = @_;
    my ($node) = @par{'node'};

    my $i = 0;
    foreach my $n ( @{ $self->nodes } ) {
        last if $n == $node;
        ++$i;
    }

    croak "Unknown node $node" if $i == @{ $self->nodes };

    unlink $node->filename;

    splice @{ $self->nodes }, $i, 1;

    $self->emit_event( "NODE_DELETED", $node->name );

    1;
}

sub get_project_index {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    my $projects = $self->projects;

    my $i = 0;
    foreach my $p ( @{$projects} ) {
        last if $p == $project;
        ++$i;
    }

    croak "Unknown project $project" if $i == @{$projects};

    return $i;
}

sub project_by_id {
    my $self = shift;
    my %par = @_;
    my ($id) = @par{'id'};

    my $p;
    foreach $p ( @{ $self->projects } ) {
        return $p if $p->id == $id;
    }

    croak "Unknown project id $id";
}

sub add_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    push @{ $self->projects }, $project;

    my $job_id   = $self->set_project_id( 1 + $self->project_id );
    my $filename = sprintf( "%s/%08d-%s.rip",
        $self->project_dir, $job_id, $project->name );

    $project->set_filename($filename);
    $project->set_state('not scheduled');
    $project->set_id($job_id);

    # save changes to project
    $project->save;

    $self->log(
        __x("Project with filename '{filename}' added",
            filename => $filename
        )
    );

    # save new state
    $self->save;

    $self->emit_event( "PROJECT_UPDATE", $project->id );

    1;
}

sub move_up_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    my $i = $self->get_project_index( project => $project );

    # already on top?
    return if $i == 0;

    # move project up
    my $projects = $self->projects;
    @{$projects}[ $i, $i - 1 ] = @{$projects}[ $i - 1, $i ];

    # save new state
    $self->save;

    $self->emit_event("PROJECT_LIST_UPDATE");

    1;
}

sub move_down_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    my $i = $self->get_project_index( project => $project );

    # already on bottom?
    my $projects = $self->projects;
    return if $i == @{$projects} - 1;

    # move project up
    @{$projects}[ $i, $i + 1 ] = @{$projects}[ $i + 1, $i ];

    # save new state
    $self->save;

    $self->emit_event("PROJECT_LIST_UPDATE");

    1;
}

sub schedule_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    # check for existence
    $self->get_project_index( project => $project );

    # create job plan
    $project->create_job_plan;

    # change project state
    $project->set_state('waiting');
    $self->scheduler->add_project($project);

    # save project's state
    $project->save;

    # emit update events to connected GUI clients
    $self->emit_event("PROJECT_LIST_UPDATE");

    # maybe the job controller can dispose some work now...
    $self->job_control;

    1;
}

sub cancel_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    return if $project->state ne 'running';
    
    $self->scheduler->cancel_project($project);

    # emit update events to connected GUI clients
    $self->emit_event("PROJECT_LIST_UPDATE");
    1;
}

sub restart_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    return if $project->state ne 'cancelled' &&
              $project->state ne 'error';
    
    $self->scheduler->restart_project($project);

    $self->emit_event("PROJECT_LIST_UPDATE");

    $self->scheduler->run;

    1;
}

sub remove_project {
    my $self = shift;
    my %par = @_;
    my ($project) = @par{'project'};

    # check for existence
    my $i = $self->get_project_index( project => $project );

    # check project state
    return if $project->state eq 'running';

    unlink $project->filename;
    splice @{ $self->projects }, $i, 1;

    $self->scheduler->remove_project($project)
        if $project->state ne 'not scheduled';

    $self->log(
        __x( "Project {project} removed", project => $project->label ) );

    $self->emit_event( "PROJECT_DELETED", $project->id );

    1;
}

sub projects_list {
    my $self = shift;

    my $nr;
    my @projects;
    foreach my $project ( @{ $self->projects } ) {
        push @projects,
            [
            $project->id,    ++$nr, $project->label,
            $project->state, $project->progress,
            ];
    }

    return \@projects;
}

sub jobs_list {
    my $self = shift;
    my %par = @_;
    my ($project_id) = @par{'project_id'};

    my $project = $self->project_by_id( id => $project_id );

    return $project->jobs_list;
}

sub nodes_list {
    my $self = shift;

    my $nr;
    my @nodes;
    foreach my $node ( @{ $self->nodes } ) {
        push @nodes,
            [
            $node->name,
            ++$nr,
            $node->name,
            ( $node->assigned_job ? $node->assigned_job->get_info : undef ),
            (     $node->assigned_job
                ? $node->assigned_job->get_progress_stats
                : $node->state
            ),
            ];
    }

    return \@nodes;
}

sub job_control {
    my $self = shift;

    $self->scheduler->run;

    1;
}

sub job_get_unfinished_projects {
    my $self = shift;

    my @projects;

    foreach my $project ( @{ $self->projects } ) {
        push @projects, $project
            if $project->state eq 'waiting'
            or $project->state eq 'running';
    }

    $self->enable_node_check if @projects;

    return \@projects;
}

sub shutdown {
    my $self = shift;
    my ($now) = @_;

    my $exec_flow_group = $self->scheduler->get_exec_flow_group;
    
    if ( $exec_flow_group->get_state eq 'running' ) {
        $self->log( __"Master shutdown. Stopping active jobs." );
        $exec_flow_group->cancel;
    }
    else {
        $self->log( __"Master shutdown" );
        Event::unloop_all();
        return;
    }

    Event->timer(
        interval => 2,
        cb       => sub { Event::unloop_all() },
        desc     => "dvd::rip shutdown timer"
    );

    $self->log( __ "Cluster control daemon will shutdown in 2 seconds..." );

    1;

}

sub get_online_nodes_cnt {
    my $self = shift;

    my $cnt = 0;
    foreach my $node ( @{ $self->nodes } ) {
        ++$cnt
            if $node->state  ne 'unknown'
            and $node->state ne 'offline';
    }

    return $cnt;
}

sub get_node_by_name {
    my $self = shift;
    my ($name) = @_;
    foreach my $node ( @{ $self->nodes } ) {
        return $node if $node->name eq $name;
    }
    return;
}

sub get_project_by_id {
    my $self = shift;
    my ($id) = @_;
    foreach my $project ( @{ $self->projects } ) {
        return $project if $project->id eq $id;
    }
    return;
}

sub get_master_node {
    my $self = shift;

    foreach my $node ( @{ $self->nodes } ) {
        return $node if $node->is_master;
    }

    return;
}

sub node_test {
    my $self   = shift;
    my %par    = @_;
    my ($node) = @par{'node'};

    my $master_node = $self->get_master_node;

    if ( $master_node ) {
print "RUN MASTER TESTS\n";
        $master_node->run_tests(
            cb_finished => sub {
print "RUN NODE TESTS\n";
                $node->run_tests(
                    cb_finished => sub {
                        $self->emit_event( "NODE_TEST_FINISHED", $node->name );
                    },
                );
            },
        );
    }
    else {
        $node->run_tests(
            cb_finished => sub {
                $self->emit_event( "NODE_TEST_FINISHED", $node->name );
            },
        );
    }
    
    1;
}

sub exec_flow_job {
    my $self = shift;
    
    return $self->scheduler->get_exec_flow_group;
}

sub get_job_from_id {
    my $self = shift;
    my ($job_id) = @_;
    return $self->scheduler->get_jobs_by_id->{$job_id};
}

sub reset_job {
    my $self = shift;
    my ($job_id) = @_;

    my $job = $self->get_job_from_id($job_id);

    $job->reset;

    my $project = $self->scheduler->get_projects_by_job_id->{$job_id};
    $project->set_state("not scheduled");
    $project->save;

    1;
}

1;