Parallel::MapReduce::Worker::SSH - MapReduce, remote worker via SSH


Parallel-MapReduce documentation Contained in the Parallel-MapReduce distribution.

Index


Code Index:

NAME

Top

Parallel::MapReduce::Worker::SSH - MapReduce, remote worker via SSH

SYNOPSIS

Top

  use Parallel::MapReduce::Worker::SSH;
  my $w = new Parallel::MapReduce::Worker::SSH (host => '10.0.10.2');

  # otherwise same interface as parent class Parallel::MapReduce::Worker

DESCRIPTION

Top

This subclass of Parallel::MapReduce::Worker implements a remote worker using SSH for launching and the resulting SSH tunnel for communicating.

By default, the package is trying an SSH client /usr/bin/ssh and is assuming that the Perl binary on the remote machine is /usr/bin/perl. Tweak the package variables $SSH and $PERL if these assumptions are wrong.

INTERFACE

Top

Constructor

The construct expects the following fields:

host (default: none)

At constructor time an SSH connection to the named host is attempted. Then a remote Perl program to implement the worker there is started. For this, obviously Parallel::MapReduce must be installed on the remote machine.

NOTE: Do not forget to call shutdown on an SSH worker, otherwise you will have a lot of lingering SSH connections.

SEE ALSO

Top

Parallel::MapReduce::Worker

COPYRIGHT AND LICENSE

Top


Parallel-MapReduce documentation Contained in the Parallel-MapReduce distribution.
package Parallel::MapReduce::Worker::SSH;

use strict;
use warnings;

use base 'Parallel::MapReduce::Worker';

use Data::Dumper;
use IPC::Run qw(start pump finish timeout);

our $log = Parallel::MapReduce::_log();

our $SSH  = '/usr/bin/ssh';
our $PERL = '/usr/bin/perl';

sub new {
    my $class = shift;
    my %opts  = @_;
    my $self = bless { host => $opts{host},
		       in   => '',
		       out  => '',
		       err  => '',
		   }, $class;
    $log->debug ("SSH starting up ".$self->{host});
    $self->{harness} = start [ split /\s+/, "$SSH ".$self->{host}." $PERL -I/home/rho/projects/mapreduce/lib -MParallel::MapReduce -MParallel::MapReduce::Worker::SSHRemote -e 'Parallel::MapReduce::Worker::SSHRemote::worker()'" ], 
                             \ $self->{in}, \ $self->{out}, \ $self->{err},
                             timeout( 20 ) ;
    $log->info ("SSH started up at ".$self->{host});
    return $self;
}

sub shutdown {
    my $self = shift;

    $self->{in} .= "exit\n";
    pump $self->{harness};	      # make sure the worker gets exit
    $self->{harness}->finish;           # make sure the worker is dead
}


sub map {
    my $self = shift;
    my $cs = shift;
    my $sl = shift;
    my $ss = shift;
    my $jj = shift;

    $self->{in} = $self->{out} = $self->{err} = '';
    $self->{in} .= "mapper\n";
    $self->{in} .= "$jj\n";
    $self->{in} .= "$sl\n";
    $self->{in} .= join (",",  @$ss ) . "\n";
    $self->{in} .= join ("\n", @$cs ) . "\n\n";
    $log->debug ("SSH map sent chunks: ".Dumper $cs) if $log->is_debug;

    pump $self->{harness} until $self->{out} =~ /\n\n/g;
    $log->debug ("SSH worker (map) sent back err".$self->{err});
    $log->debug ("SSH worker (map) sent back out".$self->{out});

    return [ split /\n/, $self->{out} ];
}

sub reduce {
    my $self = shift;
    my $ks = shift;
    my $ss = shift;
    my $jj = shift;

    $self->{in} = $self->{out} = $self->{err} = '';
    $self->{in} .= "reducer\n";
    $self->{in} .= "$jj\n";
    $self->{in} .= join (",",  @$ss ) . "\n";
    $self->{in} .= join ("\n", @$ks ) . "\n\n";
    $log->debug ("SSH reduce sent ".scalar @$ks." keys") if $log->is_debug;

    pump $self->{harness} until $self->{out} =~ /\n\n/g;
    $log->debug ("SSH worker (reduce) sent back err".$self->{err});
    $log->debug ("SSH worker (reduce) sent back out".$self->{out});
    return [ split /\n/, $self->{out} ];
}


our $VERSION = 0.05;

1;

__END__