/usr/local/CPAN/Net-ParSCP/Net/ParSCP.pm
package Net::ParSCP;
use strict;
use warnings;
use IO::Select;
use Pod::Usage;
use Net::HostLanguage;
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(
parpush
exec_cssh
help
version
usage
$VERBOSE
$DRYRUN
);
our $VERSION = '0.15';
our $DRYRUN = 0;
############################################################
sub version {
my $errmsg = shift;
print "Version: $VERSION\n";
pod2usage(
-verbose => 99,
-sections => "AUTHOR|COPYRIGHT AND LICENSE",
-exitval => 0,
);
}
############################################################
sub usage {
my $errmsg = shift;
warn "$errmsg\n";
pod2usage(
-verbose => 99,
-sections => "NAME|SYNOPSIS|OPTIONS",
-exitval => 1,
);
}
sub help {
pod2usage(
-verbose => 99,
-sections => "NAME|SYNOPSIS|OPTIONS",
-exitval => 0,
);
}
sub exec_cssh {
my @machines = @_;
my $csshcommand = 'cssh ';
$csshcommand .= "$_ " for @machines;
warn "Executing system command:\n\t$csshcommand\n" if $VERBOSE;
my $pid;
exec("$csshcommand &");
die "Can't execute cssh\n";
}
sub wait_for_answers {
my $readset = shift;
my %proc = %{shift()};
my $np = keys %proc; # number of processes
my %output;
my @ready;
my %result;
for (my $count = 0; $count < $np; ) {
push @ready, $readset->can_read unless @ready;
my $handle = shift @ready;
my $name = $proc{0+$handle};
unless (defined($name) && $name) {
warn "Error. Received message from unknown handle\n";
$name = 'unknown';
}
my $partial = '';
my $numBytesRead;
$numBytesRead = sysread($handle, $partial, 65535, length($partial));
$output{$name} .= $partial;
if (defined($numBytesRead) && !$numBytesRead) {
# eof
if ($VERBOSE) {
print "$name output:\n";
$output{$name} =~ s/^/$name:/gm if length($output{$name});
print "$output{$name}\n";
}
$readset->remove($handle);
$count ++;
if (close($handle)) {
$result{$name} = 1;
}
else {
warn $! ? "Error closing scp to $name $!\n"
: "Exit status $? from scp to $name\n";
print "$output{$name}\n" unless $VERBOSE;
$result{$name} = 0;
}
}
}
return \%result;
}
# parse_sourcefile: Find out what source machines are involved
# A hash %source is returned. Keys are the source machines.
# Values are the list of source paths
# machine => [ paths ]
# The special key '' (emtpy string) represents the local machine
{
my $nowhitenocolons = '(?:[^\s:]|\\\s)+'; # escaped spaces are allowed
sub parse_sourcefile {
my $sourcefile = shift;
my @externalmachines = $sourcefile =~ /($nowhitenocolons):($nowhitenocolons)/g;
my @localpaths = $sourcefile =~ /(?:^|\s) # begin or space
($nowhitenocolons)
(?:\s|$) # end or space
/xg;
my %source;
$source{''} = \@localpaths if @localpaths; # '' is the local machine
while (my ($clusterexp, $path) = splice(@externalmachines, 0, 2)) {
if (exists $source{$clusterexp} ) {
push @{$source{$clusterexp}}, $path;
}
else {
$source{$clusterexp} = [ $path ]
}
}
return %source;
}
}
# Gives the same value for entries $entry1 and $entry2
# in the hash referenced by $rh
sub make_synonymous {
my ($rh, $entry1, $entry2, $defaultvalue) = @_;
if (exists $rh->{$entry1}) {
$rh->{$entry2} = $rh->{$entry1}
}
elsif (exists $rh->{$entry2}) {
$rh->{$entry1} = $rh->{$entry2};
}
else {
$rh->{$entry1} = $rh->{$entry2} = $defaultvalue;
}
}
sub spawn_secure_copies {
my %arg = @_;
my $readset = $arg{readset};
my $configfile = $arg{configfile};
my $destination = $arg{destination};
my @destination = ref($destination)? @$destination : $destination;
my %cluster = %{$arg{cluster}};
my %method = %{$arg{method}};
my $scp = $arg{scp} || 'scp';
my $scpoptions = $arg{scpoptions} || '';
my $sourcefile = $arg{sourcefile};
my $name = $arg{name};
# hash source: keys: source machines. values: lists of source paths for that machine
my (%pid, %proc, %source);
my $sendfiles = sub {
my ($m, $cp) = @_;
# @= is a macro and means "the name of the target machine"
my $targetname = exists($name->{$m}) ? $name->{$m} : $m;
$cp =~ s/@=/$targetname/g;
# @# stands for source machine: decompose transfer
for my $sm (keys %source) {
my $sf = $sm? "$sm:@{$source{$sm}}" : "@{$source{$sm}}"; # $sm: source machine
my $fp = $cp; # $fp: path customized for this source machine
# what if it is $sm eq '' the localhost?
my $sn = $sm;
$sn = $name->{$sm} if (exists $name->{$sm});
$fp =~ s/@#/$sn/g;
my $target = ($m eq 'localhost')? $fp : "$m:$fp";
warn "Executing system command:\n\t$scp $scpoptions $sf $target\n" if $VERBOSE;
unless ($DRYRUN) {
my $pid = open(my $p, "$scp $scpoptions $sf $target 2>&1 |");
if (exists $pid{$m}) {
push @{$pid{$m}}, $pid;
}
else {
$pid{$m} = [ $pid ];
}
warn "Can't execute scp $scpoptions $sourcefile $target", next unless defined($pid);
$proc{0+$p} = $m;
$readset->add($p);
}
}
};
# '' and 'localhost' are synonymous
make_synonymous($name, '', 'localhost', 'localhost');
$VERBOSE++ if $DRYRUN;
# @# stands for the source machine: decompose the transfer, one per source machine
%source = parse_sourcefile($sourcefile); # if "@destination" =~ /@#/;
# expand clusters in sourcefile
for my $ce (keys %source) {
next unless $ce; # go ahead if local machine
my $set = translate($configfile, $ce, \%cluster, \%method);
# leave it as it is if is a single node
next unless $set->members > 1;
my $paths = $source{$ce};
$source{$_} = $paths for $set->members;
delete $source{$ce};
}
for (@destination) {
my ($clusterexp, $path);
unless (/^([^:]*):([^:]*)$/) {
warn "Error. Destination '$_' must have just one colon (:). Skipping transfer.\n";
next;
}
if ($1) { # There is a target cluster expression
($clusterexp, $path) = split /\s*:\s*/;
my $set = translate($configfile, $clusterexp, \%cluster, \%method);
next unless $set;
$sendfiles->($_, $path) for ($set->members);
}
else { # No target cluster: target is the local machine
$path = $2;
$scpoptions .= '-r';
$sendfiles->('localhost', $path);
}
} # for @destination
return (\%pid, \%proc);
}
sub parpush {
my %arg = @_;
my ($cluster, $method) = parse_configfile($arg{configfile});
my $readset = IO::Select->new();
# $proc is a hash ref. keys: memory address of some IO stream.
# Values the name of the assoc. machine.
# $pid is a hash ref
# keys: machine names. Values: process Ids
my ($pid, $proc) = spawn_secure_copies(
readset => $readset,
cluster => $cluster,
method => $method,
%arg,
);
my $okh = {};
$okh = wait_for_answers($readset, $proc) unless $DRYRUN;;
return wantarray? ($okh, $pid) : $okh;
}
1;
__END__