/usr/local/CPAN/GRID-Machine/GRID/Machine.pm
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# Based on the idea of IPC::PerlSSH by Paul Evans, 2006,2007 -- leonerd@leonerd.org.uk
# (C) Casiano Rodriguez-Leon 2007 -- casiano@ull.es
package GRID::Machine;
use strict;
use Scalar::Util qw(blessed reftype);
use List::Util qw(first);
use Module::Which;
use IPC::Open2();
use IPC::Open3();
use Carp;
use File::Spec;
use File::Temp;
use IO::File;
use base qw(Exporter);
use GRID::Machine::IOHandle;
use GRID::Machine::Process;
require POSIX;
require Cwd;
no Cwd;
our @EXPORT_OK = qw(is_operative read_modules qc slurp_file);
# We need to include the common shared perl library
use GRID::Machine::MakeAccessors; # Order is important. This must be the first!
use GRID::Machine::Message;
use GRID::Machine::Result;
our $VERSION = '0.127';
my %_taken_id;
{
my $logic_id = 0;
sub new_logic_id {
$logic_id++ while $_taken_id{$logic_id};
return $logic_id++;
}
}
####################################################################
# Usage : my $REMOTE_LIBRARY = read_modules(@Remote_modules);
# Purpose : Concatenates the contents of the files associated with
# the file descriptors
# Returns : The string with the contents of all those files
# Throws : exception if a module can not be found
sub read_modules {
my $m = "";
for my $descriptor (@_) {
my %modules = %{which($descriptor)};
for my $module (keys(%modules)) {
my $path = which($module)->{$module}{path};
unless (defined($path) and -r $path) {
die "Can't find module $module\n";
}
$m .= "# source from: #line 1 \"$path\"\n";
local $/ = undef;
open my $FILE, "< $path";
$m .= <$FILE>;
close($FILE);
}
}
return $m;
}
# ssh [-1246AaCfgKkMNnqsTtVvXxY] [-b bind_address] [-c cipher_spec]
# [-D [bind_address:]port] [-e escape_char]
# [-F configfile] [-i identity_file] [-L [bind_address:]port:host:hostport]
# [-l login_name] [-m mac_spec]
# [-O ctl_cmd] [-o option] [-p port] [-R [bind_address:]port:host:hostport] i
# [-S ctl_path] [-w tunnel:tunnel]
# [user@]hostname [command]
#
sub find_host {
my $command = shift;
my %option;
die "Error in GRID::Machine findhost. No command provided\n" unless $command;
$command =~ s{^\s*
(\S+ # ssh
(?:\s+-[1246AaCfgKkMNnqsTtVvXxYy])* # -6 -A -f ... options without arg
)
\s*
}{}x;
$option{ssh} = $1;
while ($command =~ s{^\s*(-\w)\s+(\S*)}{}g) {
$option{$1} = $2;
}
$command =~ s{^\s*([\w+.\@]+)}{};
$option{host} = $1;
return \%option;
}
# Inheritance: not considered
{ # closure for attributes
my @legal = qw(
cleanup
command
debug
err
host
includes
log
logic_id
perl
perloptions
prefix
pushinc unshiftinc
readfunc
readpipe
remotelibs
report
scp
sendstdout
ssh
sshpipe
sshoptions
startdir startenv
survive
tmpdir
uses
wait
writefunc
writepipe
);
my %legal = map { $_ => 1 } @legal;
GRID::Machine::MakeAccessors::make_accessors(@legal);
########################################################
sub RemoteProgram {
my ($USES,
$REMOTE_LIBRARY,
$class,
$host,
$log,
$err,
$logic_id,
$startdir,
$startenv,
$pushinc,
$unshiftinc,
$sendstdout,
$cleanup,
$prefix,
$portdebug,
$report,
$tmpdir,
)
= @_;
return << "EOREMOTE";
#line 1 "$prefix/REMOTE.pm"
package GRID::Machine;
use strict;
use warnings;
$USES
$REMOTE_LIBRARY
my \$rperl = $class->new(
host => '$host',
log => '$log',
err => '$err',
logic_id => '$logic_id',
clientpid => $$,
startdir => '$startdir',
startenv => $startenv,
pushinc => [ qw{ @$pushinc } ],
unshiftinc => [ qw{ @$unshiftinc } ],
sendstdout => $sendstdout,
cleanup => $cleanup,
prefix => '$prefix', # Where to install modules
debug => $portdebug,
report => q{$report},
tmpdir => q{$tmpdir},
);
\$rperl->main();
__END__
EOREMOTE
} # end of sub RemoteProgram
sub new {
my $class = shift;
my %opts = @_;
my $a = "";
die __PACKAGE__."::new: Illegal arg <$a>\n" if $a = first { !exists $legal{$_} } keys(%opts);
my $portdebug = $opts{debug} || 0;
my $sendstdout = 1;
$sendstdout = $opts{sendstdout} if exists($opts{sendstdout});
###########################################################################
# We have a "shared library" of common functions between this end and the
# remote end
# The user can specify some libs that will be loaded at boot time
my $remotelibs = $opts{remotelibs} || [];
my @Remote_modules = qw(
GRID::Machine::MakeAccessors
GRID::Machine::Message
GRID::Machine::Result
GRID::Machine::REMOTE
);
push @Remote_modules, $_ for @$remotelibs;
my $REMOTE_LIBRARY = read_modules(@Remote_modules);
my $host = "";
my ( $readfunc, $writefunc ) = ( $opts{readfunc}, $opts{writefunc} );
# THIS IS NEW --> LOGIC ID FOR MACHINE
my $logic_id;
if ($opts{logic_id}) {
$logic_id = $opts{logic_id};
$_taken_id{$logic_id} = 1;
}
else {
$logic_id = new_logic_id();
}
my $log = $opts{log} || '';
my $err = $opts{err} || '';
my $report = $opts{report} || '';
my $tmpdir = $opts{tmpdir} || '';
my $wait = $opts{wait} || 15;
my $cleanup = $opts{cleanup};
my $ssh = $opts{ssh} || 'ssh';
my $sshoptions = $opts{sshoptions} || '';
my $perloptions = $opts{perloptions} || '';
my $scp = $opts{scp} || 'scp -q -p';
my $sshpipe = $ssh;
my $prefix = $opts{prefix} || 'perl5lib/';
$cleanup = 1 unless defined($cleanup);
my $pid;
my $port = 22;
my $identity = '';
my $user = '';
my $options;
my ( $readpipe, $writepipe ); # pipes to communicate with the remote machine
if( !defined $readfunc || !defined $writefunc ) {
my @command;
if( exists $opts{command} ) {
my $c = $opts{command};
$c = "@$c" if (reftype($c) && (reftype($c) eq "ARRAY"));
my $options = find_host($c);
$host = $options->{host};
$port = $options->{-p};
$identity = $options->{-i};
$user = $options->{-l};
if ($identity) {
$scp .= " -i $identity";
$sshpipe .= " -i $identity";
}
if ($port) {
$scp .= " -P $port";
$sshpipe .= " -p $port";
}
$host = $user.'@'.$host if $user && $host !~ /\@/;
#$c .= ' perl' unless $c =~ /perl/;
@command = ( $c );
}
elsif ($opts{host}) {
$host = $opts{host} or
die __PACKAGE__."->new() requires a host, a command or a Readfunc/Writefunc pair";
my @sshoptions;
if ($host =~ s/:(\d+$)//) {
$port = $1;
@sshoptions = ('-p', $port);
}
if (reftype($sshoptions) && (reftype($sshoptions) eq 'ARRAY')) {
push @sshoptions, @$sshoptions;
}
elsif ($sshoptions) {
push @sshoptions, split /\s+/, $sshoptions;
}
# Test remote ssh operation. Thanks to Alex White
{
# surround each options with quotes in case option contains a space
my @test_ssh_options = map { qq{'$_'} } @sshoptions;
my $errmessg = "Can't execute perl in machine '$host' via '$ssh' ".
(@test_ssh_options? "with options '@test_ssh_options' " : '').
"using automatic authentication in less than $wait seconds";
unless (is_operative("$ssh @test_ssh_options", $host, "perl -v", $wait)) {
warn $errmessg;
die unless $opts{survive};
return;
}
}
my %sshoptions = map { $sshoptions[$_] =~ /^-[pli]$/? @sshoptions[$_, $_+1] : () } 0..$#sshoptions;
if ($sshoptions{-p}) {
$scp .= " -P $sshoptions{'-p'}";
$sshpipe .= " -p $sshoptions{'-p'}";
}
if ($sshoptions{-i}) {
$scp .= " -i $sshoptions{'-i'}";
$sshpipe .= " -i $sshoptions{'-i'}";
}
$host = $sshoptions{'-l'}.'@'.$host if $sshoptions{'-l'} && $host !~ /\@/;
my @perloptions;
if (reftype($perloptions) && (reftype($perloptions) eq 'ARRAY')) {
push @perloptions, @$perloptions;
}
elsif ($perloptions) {
push @perloptions, split /\s+/, $perloptions;
}
if ($portdebug && $portdebug =~ /^\d+$/) {
#my $purehost = $host;
#$purehost =~ s/^[\w.]*\@//;
#my $perl = qq{PERLDB_OPTS="RemotePort=$purehost:$portdebug" }.($opts{perl} || 'perl -d');
my $perl = qq{PERLDB_OPTS="RemotePort=localhost:$portdebug" }.($opts{perl} || 'perl -d');
@command = ( "$ssh @sshoptions $host $perl @perloptions" );
print <<"HELPMSG";
Debugging with '@command'
Remember to run in a separate terminal
gmdb $host
or connect in another terminal via ssh to $host and run in $host netcat:
netcat -v -l -p $portdebug
or, better, if you have 'socat' installed in $host:
socat -d READLINE,history=\$HOME/.perldbhistory TCP4-LISTEN:$portdebug,reuseaddr
HELPMSG
}
else {
@command = ( $ssh, @sshoptions, $host, $opts{perl} || "perl", @perloptions );
}
}
else { # not host not command: no ssh. IS a local open2!!!
$host = '';
$ssh = '';
$sshpipe = '';
$port = '';
$scp = $opts{scp} || "cp"; # unix
my @perloptions;
if (reftype($perloptions) && (reftype($perloptions) eq 'ARRAY')) {
push @perloptions, @$perloptions;
}
elsif ($perloptions) {
push @perloptions, split /\s+/, $perloptions;
}
if ($portdebug && $portdebug =~ /^\d+$/) {
my $perl = qq{PERLDB_OPTS="RemotePort=localhost:$portdebug" }.($opts{perl} || 'perl -d');
@command = ( "$perl @perloptions" );
print <<"HELPMSG";
Debugging with '@command', run netcat:
netcat -v -l -p $portdebug
or, better, if you have 'socat' installed:
socat -d READLINE,history=\$HOME/.perldbhistory TCP4-LISTEN:$portdebug,reuseaddr
HELPMSG
}
else {
@command = ( $opts{perl} || "perl", @perloptions );
}
}
open my $saverr, ">& STDERR";
open STDERR, "> /dev/null";
$pid = IPC::Open2::open2( $readpipe, $writepipe, @command );
close STDERR;
open STDERR, ">&", $saverr; # restore
$readfunc = sub {
if( defined $_[1] ) {
read( $readpipe, $_[0], $_[1] );
}
else {
$_[0] = <$readpipe>;
die "Premature EOF received" unless defined($_[0]);
length( $_[0] );
}
};
$writefunc = sub {
syswrite $writepipe, $_[0];
};
}
my $startdir = $opts{startdir} || '';
my $startenv = $opts{startenv} || {};
my @startenv = map { "'$_' => '$startenv->{$_}', "} keys(%$startenv);
$startenv = "{ @startenv }";
my $pushinc = $opts{pushinc} || [];
die "Arg 'pushinc' of new must be an ARRAY ref\n" unless reftype($pushinc) eq 'ARRAY';
my $unshiftinc = $opts{unshiftinc} || [];
die "Arg 'unshiftinc' of new must be an ARRAY ref\n" unless reftype($unshiftinc) eq 'ARRAY';
my $uses = $opts{uses} || [];
die "Arg 'uses' of new must be an ARRAY ref\n" unless reftype($uses) eq 'ARRAY';
my $USES = '';
$USES .= "use $_;\n" for @$uses;
# Now stream it the "firmware"
my $remoteprogram = RemoteProgram( # Watch the order!!!. TODO: use named parameters
$USES,
$REMOTE_LIBRARY,
$class,
$host,
$log,
$err,
$logic_id,
$startdir,
$startenv,
$pushinc,
$unshiftinc,
$sendstdout,
$cleanup,
$prefix,
$portdebug,
$report,
$tmpdir,
);
my $self = {
debug => $portdebug,
host => $host,
identity => $identity,
logic_id => $logic_id,
pid => $pid,
port => $port,
prefix => $prefix,
PROCESSPIDS => [],
readfunc => $readfunc,
readpipe => $readpipe,
scp => $scp,
sendstdout => $sendstdout,
ssh => $ssh,
sshpipe => $sshpipe,
wait => $wait,
writepipe => $writepipe,
writefunc => $writefunc,
};
my $machineclass = "$class"."::".(0+$self);
bless $self, $machineclass;
my $misa;
{
no strict 'refs';
$misa = \@{"${machineclass}::ISA"};
}
unshift @{$misa}, 'GRID::Machine'
unless first { $_ eq 'GRID::Machine' } @{$misa};
$self->putstringcode($remoteprogram, 'REMOTE.pm') if $portdebug;
$writefunc->( $remoteprogram );
# Allow the user to include their own
$self->include('GRID::Machine::Core');
$self->include('GRID::Machine::RIOHandle');
$self->makemethods(
[ 'fork', filter=>'result',
around => sub {
my $self = shift;
my $r = $self->call( 'fork', @_ );
$r->{machine} = $self;
$r
},
],
[ 'async', filter=>'result',
around => sub {
my $self = shift;
my $r = $self->call( 'async', @_ );
$r->{machine} = $self;
$r
},
],
[ 'waitpid', filter=>'result', ],
[ 'waitall', filter=>'result', ],
[ 'kill', filter=>'result', ],
[ 'poll', filter=>'result', ],
);
my $includes = $opts{includes} || [];
die "Arg 'includes' of new must be an ARRAY ref\n" unless reftype($includes) eq 'ARRAY';
$self->include($_) for @$includes;
$self->send_operation("GRID::Machine::DEBUG_LOAD_FINISHED") if $portdebug;
return $self;
}
} # end of closure
sub _get_result {
my $self = shift;
my ($type, @result);
{
($type, @result) = $self->read_operation();
if ($type eq 'GRID::Machine::GPRINT') {
print @result;
redo;
}
elsif ($type eq 'GRID::Machine::GPRINTF') {;
printf @result;
redo;
}
}
my $result = shift @result;
$result->type($type) if blessed($result) and $result->isa('GRID::Machine::Result');
return $result; # void context
}
sub eval {
my $self = shift;
my ( $code, @args ) = @_;
my ($package, $filename, $line) = caller;
$code = <<"EOCODE";
#package $package;
#line $line "$filename"
$code
EOCODE
$self->send_operation( "GRID::Machine::EVAL", $code, \@args );
return $self->_get_result();
}
sub compile {
my $self = shift;
my $name = shift;
die "Illegal name. Full names aren't allowed\n" unless $name =~ m{^[a-zA-Z_]\w*$};
$self->send_operation( "GRID::Machine::STORE", $name, @_ );
return $self->_get_result( );
}
sub exists {
my $self = shift;
my $name = shift;
$self->send_operation( "GRID::Machine::EXISTS", $name );
my ($type, $result) = $self->read_operation();
return $result if $type eq "RETURNED";
return;
}
sub sub {
my $self = shift;
my $name = shift;
my $code = shift;
my %args = @_;
if ($code !~ /^#line \d+/m) {
my ($package, $filename, $line) = caller;
$code = <<"EOCODE";
#package $package; sub $name
#line $line "$filename"
$code
EOCODE
}
my $ok = $self->compile( $name, $code, @_);
return $ok if (blessed($ok) && $ok->type eq 'DIED');
# Don't overwrite existing methods
my $class = ref($self);
if ($class->can($name)) {
warn "Machine "
.$self->host
." already has a method $name.";
return $ok;
};
# Install it as a singleton method of the GRID::Machine object
my $sub;
if ($args{around}) {
$sub = $args{around};
}
else {
$sub = sub { my $self = shift; $self->call( $name, @_ ) };
}
no strict 'refs';
*{$class."::$name"} = $sub;
return $ok;
}
sub makemethod {
my $self = shift;
my $name = shift;
my %args = @_;
my ($rpackage, $rname) = $name =~ m{(.*)\b(\w+)$};
# Don't overwrite existing methods
my $class = ref($self);
warn "Machine ".$self->host ." already has a method $rname." if $class->can($rname);
$self->send_operation( "GRID::Machine::MAKEMETHOD", $name, @_ );
my $ok = $self->_get_result( );
return $ok if (blessed($ok) && $ok->type eq 'DIED');
# Install it as a singleton method of the GRID::Machine object
my $sub;
if ($args{around}) {
$sub = $args{around};
}
else {
$sub = sub { my $self = shift; $self->call( $name, @_ ) };
}
no strict 'refs';
*{$class."::$rname"} = $sub;
}
sub makemethods {
my $self = shift;
$self->makemethod(@$_) for @_;
}
# -dk- modified by Casiano
# $m->callback( 'tutu' );
# $m->callback( tutu => sub { ... } );
# $m->callback( sub { ... } );
sub callback {
my $self = shift;
my $name = shift;
my $cref = shift;
if (UNIVERSAL::isa($name, 'CODE')) {
my $id = 0+$name;
$self->{callbacks}->{$id} = $name;
return bless { id => $id }, 'GRID::Machine::_RemoteStub';
}
die "Error: Illegal name for callback: $name\n" unless $name =~ m{^[a-zA-Z_:][\w:]*$};
if (UNIVERSAL::isa($cref, 'CODE')) {
$self->{callbacks}->{$name} = $cref;
}
else {
my $fullname;
if ($name =~ /^.*::(\w+)$/) {
$fullname = $name;
$name = $1;
}
else {
$fullname = caller()."::$name";
}
{
no strict 'refs';
$self->{callbacks}->{$name} = *{$fullname}{CODE};
}
die "Error building callback $fullname: Not a CODE ref\n"
unless UNIVERSAL::isa($self->{callbacks}->{$name}, 'CODE');
}
$self->send_operation( "GRID::Machine::CALLBACK", $name);
return $self->_get_result( );
}
##############################################################################
# Support for reading and sending modules
# May be I have to send this code to a separated module
sub _slurp_perl_code {
my ($input, $lineno) = @_;
my($level,$from,$code);
$from=pos($$input);
$level=1;
while($$input=~/([{}])/gc) {
substr($$input,pos($$input)-1,1) eq '\\' #Quoted
and next;
$level += ($1 eq '{' ? 1 : -1)
or last;
}
$level
and die "Unmatched { opened at line $lineno";
$code = substr($$input,$from,pos($$input)-$from-1);
return $code;
}
####################################################################
# Usage : $input = slurp_file($filename, 'trg');
# Purpose : opens "$filename.trg" and sets the scalar
# Parameters : file name and extension (not icluding the dot)
# Comments : Is this O.S dependent?
sub slurp_file {
my ($filename, $ext) = @_;
croak "Error in slurp_file opening file. Provide a filename!\n"
unless defined($filename) and length($filename) > 0;
$ext = "" unless defined($ext);
$filename .= ".$ext" unless (-r $filename) or ($filename =~ m{[.]$ext$});
local $/ = undef;
open my $FILE, $filename or croak "Can't open file $filename";
my $input = <$FILE>;
close($FILE);
return $input;
}
# Reads a module and install all the subroutines in such module
# as methods of the GRID::machine object
# TODO: linenumbers
{
my $self;
sub SERVER {
return $self;
}
sub include {
$self = shift;
my $desc = shift;
my %args = @_;
$self->modput($desc) if $self->{debug};
my $exclude = $args{exclude} || [];
my %exclude;
if (reftype($exclude) eq 'ARRAY') {
%exclude = map { $_ => 1 } @$exclude;
}
elsif (defined($exclude)) {
die "Error: the 'exclude' parameter must be an ARRAY ref\n";
}
my $alias = $args{alias} || {};
die "Error: the 'alias' parameter must be a HASH ref\n" unless UNIVERSAL::isa($alias, 'HASH');
my %alias = %$alias;
my %modules = %{which($desc)};
for my $m (keys(%modules)) {
my $file = which($m)->{$m}{path};
unless (defined($file) and -r $file) {
die "Can't find module $m\n";
}
my $input = slurp_file($file, 'pm');
while ($input=~ m(
# sub x filter y { ... }
(?:\bsub\s+([a-zA-Z_]\w*)((?:\s+\#gm\s+.*)*)\s*{) # 1 False } (for vi)
|(__DATA__) # 2
|(__END__) # 3
|(\n=(?:head[1-4]|pod|over|begin|for)) # 4 pod
|(\#.*) # 5
|("(?:\\.|[^"])*") # 6 "double quoted string" #"
|('(?:\\.|[^'])*') # 7 'single quoted string' #'
# to be done: <<"HERE DOCS"
# q, qq, etc.
|(?:use\s+(.*)) # 8 use Something qw(chuchu chim);
|(LOCAL\s+{) # 9 False } # Execute code in the local side
)gx)
{
my ($name, $filter, $data, $end, $pod, $comment, $dq, $sq, $use, $local)
= ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);
# Finish if found __DATA__ or __END__
last if defined($data) or defined($end);
if (defined($pod)) { # POD found: skip it
next if ($input=~ m{\n\n=cut\n}gc);
last; # Not explicit '=cut' therefore everything is documentation
}
next if defined($comment) or defined($dq) or defined($sq);
if (defined($use)) {
$self->eval("use $use")->ok or die "Can't use lib '$use' in ".$self->host."\n";
next;
}
if (defined($local)) {
# execute this code on the local side
my $code = _slurp_perl_code(\$input, 0);
eval($code);
die "Error executing LOCAL: $@\n" if $@;
next;
}
# sub found: install it
my $code = _slurp_perl_code(\$input, 0);
my $alias = $name;
$alias = $alias{$name} if defined($alias{$name});
unless ($exclude{$name}) {
my @args;
if ($filter) {
$filter =~s/^\s*#gm //gm;
@args = eval $filter;
}
my $r = $self->sub($alias, $code, @args);
$r->ok or die "Can't compile sub '$alias' in ".$self->host.":\n$r\n";
}
}
}
}
} # closure
#######################################################################3
sub call
{
my $self = shift;
#my ( $name, @args ) = @_;
my $name = shift;
# id-list of anonymous inline callback stubs (-dk-)
my @ids;
foreach my $a (@_) {
push @ids, $a->{id} if UNIVERSAL::isa($a, 'GRID::Machine::_RemoteStub')
}
$self->send_operation( "GRID::Machine::CALL", $name, \@_ );
my $result = $self->_get_result_or_callback(@ids); # -dk-
# cleanup (-dk-) See examples/anonymouscallback2.pl
#foreach my $id (@ids) {
# delete $self->{callbacks}->{$id}
#}
return $result;
}
# -dk-
sub _get_result_or_callback {
my $self = shift;
my ($type, @list);
{
@list = $self->read_operation();
$type = shift @list;
if ($type eq 'GRID::Machine::GPRINT') {
print @list;
redo;
}
if ($type eq 'GRID::Machine::GPRINTF') {;
printf @list;
redo;
}
if ($type eq 'CALLBACK') {
my $name = shift @list;
# FIXME: eval callback to catch and propagate exceptions
$self->send_operation('RESULT', $self->{callbacks}->{$name}->(@list));
redo;
}
}
my $result = shift @list;
$result->type($type) if blessed($result) and $result->isa('GRID::Machine::Result');
return $result; # void context
}
# True if machine accepts automatic ssh connections
# Eric version. Thanks Eric!
sub is_operative {
my $ssh = shift;
my $host = (shift || '');
$ssh = '' if $host eq '';
my $command = shift || 'perl -v';
my $seconds = shift || 15;
my $operative;
my $devnull = File::Spec->devnull();
my ( $savestdout, $savestdin, $savestderr);
eval {
local $SIG{ALRM} = sub { die "Can't connect to $host via ssh in less than $seconds seconds $@$!"; };
alarm($seconds);
open($savestdout, ">& STDOUT"); # factorize!
open($savestderr, ">& STDERR"); # factorize!
open(STDOUT,">", $devnull);
open(STDERR,">", $devnull);
open($savestdin, "<& STDIN");
open(STDIN,">", $devnull);
$operative = !system("$ssh $host $command");
open(STDOUT, ">&", $savestdout);
open(STDERR, ">&", $savestderr);
open(STDIN, "<&", $savestdin);
alarm(0);
};
if($@) {
open(STDOUT, ">&", $savestdout);
open(STDERR, ">&", $savestderr);
open(STDIN, "<&", $savestdin);
return 0;
}
return $operative;
}
sub putstringcode {
my $self = shift;
my $code = shift;
my $target = shift;
my $fh = File::Temp->new(UNLINK => 1);
my $fname = $fh->filename;
print $fh $code;
close($fh);
my $dest = "$self->{prefix}/$target";
my $host = $self->host;
die "put error: host is not defined\n" unless defined($host);
my $scp = $self->{scp};
die "put error: scp is not defined\n" unless defined($scp);
system("$scp $fname $host:$dest") and die "GRID::Machine::put Error: Copying file $fname to $host:$dest\n";
unlink $fname;
}
sub put {
my $self = shift;
my $files = shift;
die "Error in put: provide source files\n" unless UNIVERSAL::isa($files, "ARRAY") && @$files;
my @files = @{$files};
my $dest = shift || $self->getcwd()->result;
# Check if $dest is a relative path
unless (File::Spec->file_name_is_absolute($dest)) {
$dest = File::Spec->catpath('', $self->getcwd()->result, $dest);
}
# Warning: bug. "host" may be is not defined!!!!!!!!!!
my $host = $self->host;
die "put error: host is not defined\n" unless defined($host);
my $scp = $self->{scp};
die "put error: scp is not defined\n" unless defined($scp);
# Check if @files exist in the local system
# Check if they exist in the remote system. If so what permits they have?
# host is local?
$host = ($host eq '')? '' : "$host:";
system("$scp @files $host$dest") and die "GRID::Machine::put Error: Copying files @files to $host:$dest\n";
return 1;
}
sub get {
my $self = shift;
my $files = shift;
die "Error in get: provide source files\n" unless UNIVERSAL::isa($files, "ARRAY") && @$files;
my @files = @{$files};
my $dest = shift || Cwd::getcwd();
#Warning: bug. host may be is not defined!!!!!!!!!!
my $host = $self->host;
die "put error: host is not defined\n" unless defined($host);
my $scp = $self->{scp};
die "put error: scp is not defined\n" unless defined($scp);
my $from = shift || $self->getcwd()->result;
for (@files) {
# Check if $from is a relative path
unless (File::Spec->file_name_is_absolute($_)) {
$_ = File::Spec->catpath('', $from, $_);
}
$_ = "$host:$_";
}
# Check if @files exist
system("$scp @files $dest") and die "get Error: copying files @files\n";
return 1;
}
sub run {
my $m = shift;
my $command = shift;
my $r = $m-> system($command);
print "$r";
return !$r->stderr;
}
# Install a module (.pm) or a family of modules (Parse::) on the remote machine
# does not deal with dependences
sub modput {
my $self = shift;
my @args;
for my $descriptor (@_) {
my %modules = %{which($descriptor)};
for my $module (keys(%modules)) {
# TODO: Check if that module already exists
#
# Obtains the relative path of the module
my $path = which($module)->{$module}{path};
unless (defined($path) and -r $path) {
die "Can't find module $module\n";
}
my $base = which($module)->{$module}{base};
my $relpath = File::Spec->abs2rel($path, $base);
# Sends the file with .pm extension
my $m = "";
open my $FILE, "< $path";
binmode $FILE;
my $size = -s $path;
read($FILE, ,$m, $size);
close($FILE);
push @args, $relpath, $m;
# Directory "auto"
(my $end = which($module)->{$module}{pm}) =~ s/::/\//g;
my $rel_auto_path = "auto/" . $end;
my $abs_auto_path = $base . "/" . $rel_auto_path;
if (-e $abs_auto_path) {
chdir($abs_auto_path);
my @auto_files = glob('*');
foreach my $auto_file (@auto_files) {
my $m = "";
my $rel_auto_file_path = $rel_auto_path . "/" . $auto_file;
open my $FILE, "< $auto_file";
binmode $FILE;
my $size = -s $auto_file;
read($FILE, ,$m, $size);
close($FILE);
push @args, $rel_auto_file_path, $m;
}
}
}
}
$self->send_operation("GRID::Machine::MODPUT", @args);
return $self->_get_result();
}
# Not finished
sub module_transfer {
my $self = shift;
my $olddir = $self->getcwd->result;
my $dir = $self->prefix;
$self->chdir($dir);
for my $dist (@_) {
my %modules = %{which($dist)};
for my $m (keys(%modules)) {
my $path = which($m)->{$m}{path};
unless (defined($path) and -r $path) {
die "Can't find module $m\n";
}
$self->put([$path]);
} # for
} # for
$self->chdir($olddir);
}
# Warning! needs more exception control
# Must be based on rsync instead of put
# Include tar.gz case: expand automatically
# and use the corresponding directory
# perhaps a hook callback after the fileswere transferred?
# Control de args: check!!
sub copyandmake {
my $m = shift;
my %arg = @_;
my $dir = $arg{dir} || die "copyandmake error: Provide a directory\n";
my $target = $arg{target} || '';
my $files = $arg{files} || [];
my $existsmakefile = first { /\b[mM]akefile$/ } @$files;
#my $existsmakefile = first { /.*Makefile/ } @$files;
my $make = $existsmakefile ? 'make' : '';
$make = $arg{make} if defined($arg{make});
my $makeargs = $arg{makeargs} || '';
my $cleanfiles = $arg{cleanfiles} || 0;
my $cleandirs = $arg{cleandirs} || 0;
my $keepdir = $arg{keepdir} || 0;
$m->mkdir($dir) unless $m->_x($dir)->result;
$m->mark_as_clean(dirs=> [ $dir ]) if $cleandirs;
my $olddir = $m->getcwd();
$m->chdir($dir);
# Must be done after changing directory ...
$m->mark_as_clean(files=>$files) if $cleanfiles;
unless ($m->_x($target)->result) {
if (@$files) {
$m->put($files);
}
if ($make) {
my $r = $m->system("$make $makeargs");
die "copyandmake error while executing $make $makeargs $!" unless $r->ok;
}
}
$m->chdir($olddir) if $keepdir;
}
sub copytarmake {
my $m = shift;
my %arg = @_;
my $dir = $arg{dir} || die "copytarmake error: Provide a directory\n";
my $file = $arg{file} || die "copytarmake error: Provide a tar file\n";
die "copytarmake error: file $file does not follow standard name convention\n" unless $file =~ m{([\w.-]+)\.tar(\.gz)?$};
my $make = $arg{make} || 'make';
my $makeargs = $arg{makeargs} || '';
# Shall I change dir at the end?
my $keepdir = $arg{keepdir} || 0;
my $olddir = $m->getcwd()->result;
my $host = $m->host;
# Create if it does not exists?
$m->chdir($dir);
$m->put([$file]) or die "Can't copy distribution to $host\n";
my $r = $m->eval(q{
my $dist = shift;
eval('use Archive::Tar');
if (Archive::Tar->can('new')) {
# Archive::Tar is installed, use it
my $tar = Archive::Tar->new;
$tar->read($dist, 1) or die "Archive::Tar error: Can't read distribution $dist\n";
$tar->extract() or die "Archive::Tar error: Can't extract distribution $dist\n";
}
else {
system('gunzip', $dist) and die "Can't gunzip $dist\n";
my $tar = $dist =~ s/\.gz$//;
system('tar', '-xf', $tar) or die "Can't untar $tar\n";
}
},
$file # arg for eval
);
die "$r" unless $r->ok;
$r = $m->system("$make $makeargs");
die "$r" unless $r->ok;
$m->chdir($olddir) if $keepdir;
}
# Add a SIGPIPE handler
sub openpipe {
my $self = shift;
my $exec = shift;
my $mode = shift;
my $host = $self->host;
my $ssh = $self->sshpipe;
my $r = $self->wrapexec($exec);
die $r unless $r->ok;
my $scriptname = $r->result;
my $perl = ($host eq '')? $^X : 'perl';
my $command = "$ssh $host $perl $scriptname";
$command = $mode? "$command |" : "| $command";
my $proc = IO::File->new;
my $pid = open($proc, $command) || die "Can't open <$ssh $host $perl $scriptname>\n";
push @{$self->{PROCESSPIDS}}, $pid;
return (wantarray ? ($proc, $pid) : $proc);
}
# Add a SIGPIPE handler
sub open {
my ($self, $descriptor) = @_;
# Output pipe
return $self->openpipe($descriptor, 1) if ($descriptor =~ s{\|\s*$}{});
# Input pipe
return $self->openpipe($descriptor, 0) if ($descriptor =~ s{^\s*\|}{});
$self->send_operation( "GRID::Machine::OPEN", $descriptor );
my $index = $self->_get_result()->result;
return bless { index => $index, server => $self }, 'GRID::Machine::IOHandle';
}
sub open2 {
my ($self, $from_child, $to_child, $command) = splice @_, 0, 4;
die "GRID::Machine::open2 error: wrong arguments\n"
unless defined($command) && UNIVERSAL::isa($self, 'GRID::Machine');
my $host = $self->host;
my $ssh = $self->sshpipe;
$command = "@$command" if reftype($command) && (reftype($command) eq 'ARRAY');
my $r = $self->wrapexec($command);
die $r unless $r->ok;
my $scriptname = $r->result;
my $c = "$ssh $host perl $scriptname";
#($from_child, $to_child) = (IO::File->new, IO::File->new);
my $pid = IPC::Open2::open2($from_child, $to_child, $c) || die "Can't open2 <$c>\n";
push @{$self->{PROCESSPIDS}}, $pid;
@_[1..2] = ($from_child, $to_child);
return $pid;
}
sub open3 {
my ($self, $to_child, $from_child, $err_child, $command) = @_;
die "GRID::Machine::open3 error: wrong arguments\n"
unless defined($command) && UNIVERSAL::isa($self, 'GRID::Machine');
my $host = $self->host;
my $ssh = $self->sshpipe;
$command = "@$command" if reftype($command) && (reftype($command) eq 'ARRAY');
my $r = $self->wrapexec($command);
die $r unless $r->ok;
my $scriptname = $r->result;
my $c = "$ssh $host perl $scriptname";
#($from_child, $to_child) = (IO::File->new, IO::File->new);
my $pid = IPC::Open3::open3($to_child, $from_child, $err_child, $c) || die "Can't open3 <$c>\n";
push @{$self->{PROCESSPIDS}}, $pid;
@_[1..3] = ($to_child, $from_child, $err_child);
return $pid;
}
sub DESTROY {
my $self = shift;
local $?;
$self->send_operation( "GRID::Machine::QUIT" );
my $ret = $self->_get_result( );
warn "Remote host ".$self->host
." threw an exception while quitting"
.$ret->errmsg
if blessed($ret) && ( $ret->type eq "DIED" );
waitpid $self->{pid}, 0 if defined $self->{pid};
}
sub qc {
my ($package, $filename, $line) = caller;
return <<"EOI";
#line $line $filename
@_
EOI
}
sub qx {
my $self = shift;
my $wantarray = wantarray();
my $r = $self->qqx($wantarray, $/, @_);
$wantarray? $r->Results : $r->result;
}
1;