perfSONAR_PS::Services::MP::Scheduler - A module that will implements a very


perfSONAR_PS-Services-PingER documentation Contained in the perfSONAR_PS-Services-PingER distribution.

Index


Code Index:

NAME

Top

perfSONAR_PS::Services::MP::Scheduler - A module that will implements a very simple scheduler from which MP's can inherit to run tests etc.

DESCRIPTION

Top

This module allows a more intelligent method of running tests in a non-periodic fashion. It does this with the configuration of a perfSONAR_PS::Services::MP::Config module that provides a list and configuration of the tests to be run.

This module keeps two data structures of interest: METADATA which contains all the tests indexed by the metadataId, and SCHEDULE which maintains a hash of epoch time of the test with the metadataId (we do not point to the METADATA datastructure directly as we need the id's sometimes.)

SYNOPSIS

Top

  # create a new MP that inherits this scheduler
  my $mp = perfSONAR_PS::Schedule::MP::PingER->new( $config );

  # set up the schedule with the list of tests and fork off a manager class
  # that will spawn off tests according to a schedule that should be also
  # defined here.
  $mp->init();

API

Top

This module exposes the following methods.

new

Create a new MP Scheduler class $conf is ref to hash of configuration settings

schedule

accessor/mutator for the schedule

config

accessor/mutator for the config storing the mp::config::schedule package

init( $handler )

Initiate the mp. This should involve: - setting the configuration defaults to be used - setting up the schedule for the tests (by using a perfSONAR_PS::Services::MP::Config::Schedule - or inherited object) - attaching message handlers for the daemon architecture from $handler - forking off a scheduler to manage the starting of new measurements.

prepareMetadata

prepare the metadata; in this case, we add the relevant tests into the schedule in preparation for run()

parseMetadata

Populates the schedule for tests to be run

needLS( boolean )

is the mp service setup to register with a LS?

registerLS()

actually register the MP service with a LS, ie send some xml of the metadata available form the MP service (ie what tests it can run)

addTestSchedule( $config )

Adds the schedule list of tests from the perfSONAR_PS::MP::Config::Schedule object

getNowTime

Returns the current time in epoch seconds

shiftNextTest()

removes the next test from the schedule

Returns ( $time, $testid ) for the time in epoch seconds $time when test with id $testid should be started. If there is no test defined, returns (undef,undef)

peekNextTest()

Gives the next testid without removing it from the schedule

Returns ( $time, $testid ); where $start is the epoch seconds when the test $testid should start. If there is no next test, then will return (undef, undef);

addNextTest( $time, $testid )

Places the test with id $testid into the schedule to run at time $time.

maxChildren( $number )

accessor/mutator for the number of max child threads/processes

run()

Forks off a new instance that will act as a manager/boss class for the scheduling and forking off of new measurements.

__run( )

Starts an endless loop scheduling and running tests as defined in $self->{'STORE'} until the program is terminated.

blockInterrupts

block signal interruption

unblockInterrupts

allow interrupts to do as they did

waitForNextTest( )

Blocking function that sleeps until the next test. Problem is that ipc can cause the sleep to exit for any signal. Therefore, some cleverness in determine the actual slept time is required.

doTest( $pid, $testid )

Spawns off a forked instance in order to run the test with id $testid. It will create a perfSONAR_PS::MP::Agent class that will actuall deal with the running of the test and collation of the results.

The forked instance will also deal with the storage defintions by calling the $self->storeData() method which should be overridden by inheriting classes to store the output of the $agent into an MA.

Forked instance will exit at the end of the test. Success and or failure is not propogated back up the stack.

getAgent( $test )

Returns the relevant perfSONAR_PS::MP::Agent class to use for the test. As this class should be inherieted, this method should be overridden to return the appropraote agent to use for the hash $test that contains the parameters for the test.

storeData( $agent, $testid )

Does the relevant storage of data collected from the $agent for the test id $testid.

SEE ALSO

Top

perfSONAR_PS::Services::Config::Schedule, perfSONAR_PS::Services::MP::Agent::Base, perfSONAR_PS::Services::Base,

To join the 'perfSONAR-PS' mailing list, please visit:

  https://mail.internet2.edu/wws/info/i2-perfsonar

The perfSONAR-PS subversion repository is located at:

  https://svn.internet2.edu/svn/perfSONAR-PS 

Questions and comments can be directed to the author, or the mailing list.

VERSION

Top

$Id: Base.pm 524 2007-09-05 17:35:50Z aaron $

AUTHOR

Top

Yee-Ting Li, ytl@slac.stanford.edu

LICENSE

Top

You should have received a copy of the Internet2 Intellectual Property Framework along with this software. If not, see <http://www.internet2.edu/membership/ip.html>

COPYRIGHT

Top


perfSONAR_PS-Services-PingER documentation Contained in the perfSONAR_PS-Services-PingER distribution.
package perfSONAR_PS::Services::MP::Scheduler;

# inherit from the base, so that the scheduling methods are available as change of 
# parent class

use perfSONAR_PS::Services::Base;
use base 'perfSONAR_PS::Services::Base';

use version; our $VERSION = 0.09; 

use fields qw( SCHEDULE METADATA MAXCHILDREN );

use Time::HiRes qw ( &gettimeofday );
use POSIX;

use Log::Log4perl qw(get_logger);
our $logger = get_logger( CLASS );

use constant CLASS => 'perfSONAR_PS::Services::MP::Scheduler';

use strict;

# basename for configuration key
our $basename = undef;

my $MANAGER_PID = undef;
my $CHILDREN_OCCUPIED = 0;
my %CHILD = ();
my $i = 0;


sub new {
	my $package = shift;
	
	my $self = $package->SUPER::new( @_ );
	$self->{SCHEDULE} = {};
	$self->{METADATA} = {};
	$self->{MAXCHILDREN} = 2;

	return $self;
}


sub schedule
{
	my $self = shift;
	if ( @_ ) {
		$self->{SCHEDULE} = shift;
	}
	return $self->{SCHEDULE};
}


sub config
{
	my $self = shift;
	if ( @_ ) {
		$self->{METADATA} = shift;
	}
	return $self->{METADATA};
}


sub init
{
	my $self = shift;
	my $handler = shift;

	# this should be inherited

	# check handler type
	$logger->logdie( "Handler is of incorrect type: $handler")
		unless ( UNIVERSAL::can( $handler, 'isa') 
			&& $handler->isa( 'perfSONAR_PS::RequestHandler' ) );

	# set up defaults in the config

	# create a config object and set it to the file 'file'
	my $schedule = perfSONAR_PS::Services::MP::Config::Schedule->new();
	$schedule->load( 'file' );
	
    # set up the schedule with the list of tests
	$self->addTestSchedule( $schedule );	

	# add the appropiate 
	#$handler->addMessageHandler("SetupDataRequest", "", $self);
	#$handler->addMessageHandler("MetadataKeyRequest", "", $self);

	return 0;
}

sub prepareMetadata
{
	my $self = shift;
	
	my @testids = $self->config()->getAllTestIds();
	
	$logger->debug( "TESTS: \n" . join "\n", @testids );
	$logger->logdie( "Scheduler could not determine any tests to run.")
		if ( scalar @testids < 1 );
	
	# popule schedule
	my $now = &getNowTime();
	foreach my $id ( @testids ) {
		my $delta = $self->config()->getTestNextTimeFromNowById( $id );
		next if ! defined $delta;
		my $time = $now + $delta;
		$logger->debug( "Add testid '$id' to run at '$time' delta=" . $delta );
		$self->addNextTest( $time, $id );
	}	
	return 0;
}


sub parseMetadata
{
	my $self = shift;
	return 0;
}


sub needLS($) {
	my ($self) = @_;
	return 0;
}

sub registerLS($) {
	my $self = shift;
	return 0;
}



sub addTestSchedule
{
	my $self = shift;
	my $schedule = shift;

	$logger->logdie( 'missing argument schedule')
		unless defined $schedule;

	$self->config( $schedule );
	
	$logger->logdie( "argument must be of object type perfSONAR_PS::Services::MP::Config::Schedule")
		unless UNIVERSAL::can( $schedule, 'isa') 
				&& $schedule->isa( 'perfSONAR_PS::Services::MP::Config::Schedule' );

	$logger->info( "Initiating scheduler with " . scalar $self->config()->getAllTestIds() . " tests" );

	$self->parseMetadata();
	$self->prepareMetadata();
	
	return 0;
}




sub getNowTime
{
  	my ( $nowTime, $nowMSec ) = &gettimeofday;
	return $nowTime . '.' . $nowMSec;
}


sub shiftNextTest
{
	my $self = shift;
	my ( $time, $testid ) = $self->peekNextTest();

	if ( defined $time ) {
		# may be an array	
		my $array = $self->schedule()->{$time};
		my $test = shift @$array;

		# put it back if we still have entries for same time
		if ( scalar @$array ) {
			$self->schedule()->{$time} = $array;
		}
		# or clear it if empty
		else {
			delete $self->schedule()->{$time};
		}	
		return ( $time, $testid );
	}
	return ( undef, undef );
}


sub peekNextTest
{
	my $self = shift;

	my @times = sort {$a<=>$b} keys %{$self->schedule()};
	if ( scalar @times ) {

		my $time = $times[0];
		my $test = $self->schedule()->{$time}->[0];
		my ( $testid, @tmp ) = keys %$test; #FIXME
	
		return ( $time, $testid );

	}
	
	return ( undef, undef );
}


sub addNextTest
{
	my $self = shift;
	my $time = shift;
	my $testid = shift;

	# if tehre is already at test at this time, append it
	push @{$self->schedule()->{$time}}, { $testid => 1 };
	
	return;
}


sub maxChildren
{
	my $self = shift;
	return $self->{MAXCHILDREN};
}


# make sure we kill the forking manager also when a signal is sent
sub KILL
{
	my $logger = get_logger( CLASS );
	kill( "TERM", $MANAGER_PID);
	my $pid = undef;
	exit;
}


# takes care of dead children
sub REAPER {                
	my $logger = get_logger( CLASS );        
    $SIG{CHLD} = \&REAPER;
    my $pid = undef;
    while( ( $pid = waitpid( -1, &WNOHANG) ) > 0 ) 
    {
		my %map = reverse %CHILD;
   	 	my $child = $map{$pid};
			
		delete $CHILD{$child} if defined $child && $CHILD{$child};
		
		$CHILDREN_OCCUPIED = keys %CHILD;
		$logger->debug( "Child thread $pid died (left " . $CHILDREN_OCCUPIED . ")" );

    }
    return;
}


sub run
{
	my $self = shift;
	my $processName = shift;
	
	# don't bother if there are no tests
	if ( scalar keys %{$self->schedule()} < 1 ) {
		$logger->logdie( "No tests are scheduled - please check " 
				. $self->config()->configFile() );
	}
	
	my $sigset = POSIX::SigSet->new(SIGINT);
    sigprocmask( SIG_BLOCK, $sigset)
        or $logger->logdie( "Can't block SIGINT for fork: $!\n" );
    
	my $pid = undef;
	$SIG{INT} = \&KILL;
	$SIG{CHLD} = \&REAPER;

    # fork!
    $logger->logdie( "fork failed: $!" ) 
		unless defined ($pid = fork);

    if ($pid) 
    {
        # Parent records the child's birth and returns.
        sigprocmask(SIG_UNBLOCK, $sigset)
            or $logger->logdie( "Can't unblock SIGINT for fork: $!\n" );
				
		$MANAGER_PID = $pid;
		
	} elsif ($pid == 0) {

		# setup handler to exit children
		$SIG{INT} = sub { 
						  my $logger = get_logger( CLASS );
						  foreach my $pid ( keys %CHILD ){
							#$logger->fatal( "killing $pid ");
							kill( "TERM", -$$ );
						  } 
						};

		$0 = $processName;
		$self->__run();
		
		exit(0);
	}
	
	return 0;
}

sub __run
{
	my $self = shift;

	while( 1 )
	{
    
		my $badExit = 0;
		
		# wait for a signal from a dead child or something
		# if all children are occupised

		if ( $CHILDREN_OCCUPIED >= $self->maxChildren() ) {
			$logger->warn("Scheduler at max forks (" . $self->maxChildren() . "); waiting...");
			sleep;
		}
	
		if (! exists $CHILD{$i} )
		{
			# block until next scheduled test is up
			my ( $testTime, $testid ) = $self->waitForNextTest();
	
			if ( defined $testid )
			{
				my $sigset = $self->blockInterrupts('while');
				
				# actually do the test!
				my $now = &getNowTime();
				( $testTime, $testid ) = $self->shiftNextTest();
				
				# determine when to run the next iteration of this test
				my $delta = $self->config()->getTestNextTimeFromNowById( $testid );
				$logger->debug( "testid '" . $testid . "' will run again in $delta seconds");
				$self->addNextTest( $now + $delta, $testid );
				
				$self->unblockInterrupts( $sigset, 'while');
				
				# determine the test details and run it
				$self->doTest( $i, $testid );
				
			} 
			else {
				$badExit = 1;
			}
		}
	
		my $sigset = $self->blockInterrupts('while-check');
		
		if ( ! $badExit ) {
			$i++;
			$i -= $self->maxChildren() if( $i >= $self->maxChildren() );
		}

		$self->unblockInterrupts($sigset, 'while-check');

	}
	return;
}


sub blockInterrupts
{
	my $self = shift;
        my $str = shift;
        # need to block interrupts in case we are still loading etc.
        my $sigset   = POSIX::SigSet->new; 
        my $blockset = POSIX::SigSet->new( SIGCHLD, SIGHUP, SIGUSR1, SIGUSR2, SIGINT );

        #$logger->debug( "BLOCKING INTERRUPTS for $str");

        sigprocmask(SIG_BLOCK, $blockset, $sigset) 
            or $logger->logdie( "Could not block interrupt signals: $!" );      
        
        return $sigset;
}
sub unblockInterrupts
{
	my $self = shift;
        my $sigset = shift;
        my $str = shift;
        return 1 if ! defined $sigset;
        
        #$logger->debug( "DONE BLOCKING INTERRUPTS for $str");
        
        sigprocmask(SIG_SETMASK, $sigset)
            or $logger->logdie( "Could not restore interrupt signals: $!" );
        return 1;
}



sub waitForNextTest
{
	my $self = shift;

	my $sigset = $self->blockInterrupts('wait');

	my ( $time, $testid ) = $self->peekNextTest();
	my $now = &getNowTime();
	my $wait = $time - $now;
	$logger->debug( "Waiting $wait seconds for the next test at " . $time );

	$self->unblockInterrupts($sigset, 'wait');

	# wait some time for a signal
	if ( $wait > 0.0 ) {
		
		select( undef, undef, undef, $wait );

		# if we are before the next test time, do not do anything!
		if ( $now < $time )
		{
			return ( undef, undef );
		}

	# we are behind schedule
	} else {
		# don't do anytthing
	}

	return ( $time, $testid );
}

sub doTest
{
	my $self = shift;
	my $forkedProcessNumber = shift;
	my $testid = shift;
	
	$logger->logdie( "Missing argument testid")
		unless defined $testid;
		
   # block signal for fork
    my $sigset = POSIX::SigSet->new(SIGINT);
    sigprocmask( SIG_BLOCK, $sigset)
        or $logger->logdie( "Can't block SIGINT for fork: $!\n" );
 
	my $pid = undef;

    # fork!
    $logger->logdie( "fork failed: $!" ) 
		unless defined ($pid = fork);
    
    if ($pid) 
    {
        # Parent records the child's birth and returns.
        sigprocmask(SIG_UNBLOCK, $sigset)
            or $logger->logdie( "Can't unblock SIGINT for fork: $!\n" );

        # keep state for the parent to keep count of children etc.
		my $sigset = $self->blockInterrupts('fork');
		
        $CHILD{$forkedProcessNumber} = $pid;
        $CHILDREN_OCCUPIED++;
        
		$self->unblockInterrupts($sigset,'fork');

        return;
    }
    else 
    {
   
        $SIG{INT} = 'DEFAULT';      # make SIGINT kill us as it did before
		$SIG{USR1} = 'DEFAULT';
		$SIG{USR2} = 'DEFAULT';
	    $SIG{HUP} = 'DEFAULT';
		
		# run the test
		my $test = $self->config()->getTestById( $testid );
		$logger->debug( "RUN TEST: " . Data::Dumper::Dumper $test );
		my $agent = $self->getAgent( $test );
		
		# collector will return -1 if error occurs
		if ( $agent->collectMeasurements() < 0 ) {
			
			# error!
			$logger->fatal( "Could not collect measurement for '$testid'" );
			
		} else {

			# get the results out
			$logger->info( "Collecting measurements for '$testid'");	
			# write to teh stores
			$self->storeData( $agent, $testid );

		}			
		exit;
    }

}


sub getAgent
{
	my $self = shift;
	$logger->logdie( "getAgent should be inherited");
	# don't forget to init() the agent!
	return undef;
}

sub storeData
{
	my $self = shift;
	my $agent = shift;
	my $testid = shift;

	$logger->logdie( "storeData should be overridden");
	
	return -1;
}

1;