Parallel::Pvm::Scheduler - Perl extension for distributing jobs through PVM


Parallel-Pvm-Scheduler documentation Contained in the Parallel-Pvm-Scheduler distribution.

Index


Code Index:

NAME

Top

Parallel::Pvm::Scheduler - Perl extension for distributing jobs through PVM

SYNOPSIS

Top

  use Parallel::Pvm::Scheduler;
  my $prm = new Parallel::Pvm::Scheduler();

  # Create an array of tasks:
  $tasks[0][0] = "/myhome/program1";
  $tasks[0][1] = "program1 parameter 1";
  $tasks[0][2] = "program1 parameter 2";

  $tasks[1][0] = "/myhome/program2_noparameters";

  $tasks[2][0] = "/myhome/program1";
  $tasks[2][1] = "program1 parameter 1";

  # Submit the tasks
  for ($i = 0; $i < $taskcount; $i++)
  {
        $prm->submit("user-defined task description", $args[$i]);
  }

  # Wait for the tasks to complete
  $prm->recaptureHosts(1);




DESCRIPTION

Top

Parallel-Pvm-Scheduler is a module designed to allow one to distribute a large number of independent jobs across a cluster using PVM. It first queries the number of available machines in the cluster, then submits exactly 1 jobs per available machine. When a job is finished, it recaptures the machine and submits the next job. It repeats this until all the jobs are complete.

If all the available machines are used, it will wait until 1 becomes free before submitting the next job, hence it will sleep until a task is complete.

EXPORT

None by default.

SEE ALSO

Top

Parallel::Pvm perl module

AUTHOR

Top

Ryan Golhar, <golharam@umdnj.edu<gt>

COPYRIGHT AND LICENSE

Top

new

 Title   : new
 Usage   : my $prm = new Parallel::Pvm::Scheduler();
 Function: Creates and initialized a PVM resource manager
 Returns : Parallel::Pvm::Scheduler

getHostCount

 Title   : getHostCount
 Usage   : $hostcount = $prm->getHostCount();
 Function: Returns the number of hosts within the PVM
 Returns : integer

getFreeHostCount

 Title   : getFreeHostCount
 Usage   : $freehostcount = $prm->getFreeHostCount();
 Function: Returns the number of free hosts within the PVM
 Returns : integer

submit

 Title   : submit
 Usage   : $prm->getFreeHostCount($taskdesc, @argv);
 Function: Submits a task contained with an argument vector
 Args    : $taskdesc is a taskdescription.  It can be anything set by the user
 	   @argv is an argument vector.  The first element is the program to execute
	   the remaining elements are the parameters to the program.  
 Notes   : To execute multiple programs in serial for a single task, make a perl script
 	   and submit the perl script as the task.

recaptureHosts

 Title   : recaptureHosts
 Usage   : $prm->recaptureHosts($block);
 Function: Checks tasks for completion and prints their output
 Args    : $block 
		= 0, then iterate through all the tasks and query for their completion.
			if done, free the machine
		= 1, then iterate through all tasks waiting for them to complete.  Does not
			return until the tasks are complete.
=cut




_allocateHost

 Title   : _allocateHost
 Function: Internal Function: Used to allocate which host runs the next task
=cut




_deallocateHost

 Title   : _deallocateHost
 Function: Internal Function: Frees a host making it available for another task
=cut


Parallel-Pvm-Scheduler documentation Contained in the Parallel-Pvm-Scheduler distribution.
package Parallel::Pvm::Scheduler;

use 5.008;
use strict;
use warnings;

use Parallel::Pvm 1.40;
;
require Exporter;

our @ISA = qw(Exporter);

# Items to export into callers namespace by default. Note: do not export
# names by default without a very good reason. Use EXPORT_OK instead.
# Do not simply export all your public functions/methods/constants.

# This allows declaration	use Parallel::Pvm::Scheduler ':all';
# If you do not need this, moving things directly into @EXPORT or @EXPORT_OK
# will save memory.
our %EXPORT_TAGS = ( 'all' => [ qw(
	
) ] );

our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );

our @EXPORT = qw(
	
);

our $VERSION = '0.01';


# Preloaded methods go here.


my $MSGTAG = 1;
my $MSGTASKEXIT = 999;

sub new {
	my ($class, @args) = @_;
	my ($info,@CONF) = Parallel::Pvm::config;
	my $FREEHOSTS = scalar(@CONF);
	my %HOSTS;
	my @TID;
	my %TIDcmd;
	
	# Map host id to hostname and set busy flag to zero
	foreach my $node (@CONF) {
		my $hostid = $node->{'hi_tid'};

		$HOSTS{$hostid}{'name'} = $node->{'hi_name'};
		$HOSTS{$hostid}{'busy'} = 0;
	}
	
	my $self = {CONF => \@CONF, FREEHOSTS => $FREEHOSTS, HOSTS => \%HOSTS, TID => \@TID, TIDCMD => \%TIDcmd};
	bless $self;
	return $self;
}

sub DESTROY {
	Parallel::Pvm::exit;
}

sub getHostCount
{
	my ($self) = @_;
	my $CONF = $self->{CONF};
	
	return scalar(@$CONF);
}

sub getFreeHostCount
{
	my ($self) = @_;
	my $FREEHOSTS = $self->{FREEHOSTS};

	return $FREEHOSTS;
}

sub submit {
	my ($self, $text, @pvmargv) = @_;
	
	my $TID_ref = $self->{TID};
	my $TIDcmd_ref = $self->{TIDCMD};
	
	# If there are no free hosts, iterate through the taskids and 
	# determine which ones finished and free those hosts 
	while ($self->{FREEHOSTS} == 0) {
		recaptureHosts($self);

		# After checking and there still aren't any free hosts, 
		# sleep for 3 seconds, then try again
		if ($self->{FREEHOSTS} == 0) {
			sleep(3);
		}
	}
	
	# Find a free host and spawn on that host
	my $host = _allocateHost($self);
	my $info = Parallel::Pvm::spawn('pvmit', 1, PvmTaskHost, $host, \@pvmargv);
	if ($info <= 0) {
		print STDERR "Error: Could not spawn child: $info, @pvmargv\n";
		_deallocateHost($host);
	}
	push @$TID_ref, $info;			

	my $cmds = join(' ', @pvmargv);
	$TIDcmd_ref->{$info} = "$text\n$cmds";	

	print STDERR "Started $info on $host\n";			
}

sub recaptureHosts {
	# If block == 1, then recapture all hosts and block when waiting
	# else recapture whatever is available and continue
	my ($self, $block) = @_;

	my $HOSTS_ref = $self->{HOSTS};
	my $TID_ref = $self->{TID};
	my $TIDcmd_ref = $self->{TIDCMD};
	my $CONF_ref = $self->{CONF};
	
	$block = 0 if (!defined($block));
		
	# Iterate through the taskid and check to see if its done
	# If its done, process and remove it
	my $arrayindex = 0;
	my $childtid;
	
	while ($arrayindex < scalar(@$TID_ref)) {
		$childtid = @$TID_ref[$arrayindex];
		
		if ($childtid < 0) {
			print STDERR "Error with PVM Task: $childtid\n";
		} else {
			my $bufid = Parallel::Pvm::probe($childtid, $MSGTAG);
			if ($bufid < 0) {
				print STDERR "There was an error probing on $childtid: $bufid!\n";
			} elsif ($bufid > 0) {
				my $hostid = Parallel::Pvm::tidtohost($childtid);
				print STDERR "Recieved message from $childtid on ".$HOSTS_ref->{$hostid}{'name'}."\n";			
			
				$bufid = Parallel::Pvm::recv($childtid, $MSGTAG);
				my $output = Parallel::Pvm::unpack;
				print "($childtid) ", $TIDcmd_ref->{$childtid}, "\n";
				print "$output\n" if (defined($output));

				_deallocateHost($self, $hostid);
				
				# remove this child taskid
				splice(@$TID_ref, $arrayindex, 1);
				delete($TIDcmd_ref->{$childtid});
				
				# we don't want to increment the arrayindex or else 
				# we'll skip over the next task id.
				next;	
			} else {
				# $bufid == 0
				# wait until this tasks completes
				if ($block == 1) {
					sleep(3);
					next;
				}
			}
		}
		$arrayindex++;
	}
	
	if ($block == 1) {
		if ($self->{FREEHOSTS} != scalar(@$CONF_ref)) {
			print STDERR "There was a problem reconciling freehosts with pvm configuration!!!\n";
			
			foreach my $node (@$CONF_ref) {
				my $hostid = $node->{'hi_tid'};
				my $hostname = $node->{'hi_name'};

				if ($HOSTS_ref->{$hostid}{'busy'} != 0) {
					print STDERR "$hostname is not free\n";
				}
			}
		}
	}
}

sub _allocateHost {
	my ($self) = @_;
	my $HOSTS_ref = $self->{HOSTS};
	
	# Locate a free host, allocate it, return it.
	# First available algorithm
	foreach my $hostid (keys %$HOSTS_ref) {
	
		if ($HOSTS_ref->{$hostid}{'busy'} == 0) {
			my $hostname = $HOSTS_ref->{$hostid}{'name'};
			
			print STDERR "Allocating host $hostname\n";
			
			$HOSTS_ref->{$hostid}{'busy'} = 1;
			$self->{FREEHOSTS}--;			
		
			return $hostname;
		}
	}
	
	# if we get here, no hosts were free.
	die "Unable to locate free host!\n";
}

sub _deallocateHost {
	my ($self, $hostid) = @_;
	my $HOSTS_ref = $self->{HOSTS};

	if ($HOSTS_ref->{$hostid}{'busy'} == 0) {
		die "Host not allocated: ". $HOSTS_ref->{$hostid}{'name'} ."!\n";
	}
	
	print STDERR "Deallocating host ". $HOSTS_ref->{$hostid}{'name'} ."\n";
	$HOSTS_ref->{$hostid}{'busy'} = 0;
	$self->{FREEHOSTS}++;
}

1;
__END__