| perfSONAR_PS-Services-PingER documentation | Contained in the perfSONAR_PS-Services-PingER distribution. |
perfSONAR_PS::Services::MP::Scheduler - A module that will implements a very simple scheduler from which MP's can inherit to run tests etc.
This module allows a more intelligent method of running tests in a non-periodic fashion. It does this with the configuration of a perfSONAR_PS::Services::MP::Config module that provides a list and configuration of the tests to be run.
This module keeps two data structures of interest: METADATA which contains all the tests indexed by the metadataId, and SCHEDULE which maintains a hash of epoch time of the test with the metadataId (we do not point to the METADATA datastructure directly as we need the id's sometimes.)
# create a new MP that inherits this scheduler my $mp = perfSONAR_PS::Schedule::MP::PingER->new( $config ); # set up the schedule with the list of tests and fork off a manager class # that will spawn off tests according to a schedule that should be also # defined here. $mp->init();
This module exposes the following methods.
Create a new MP Scheduler class $conf is ref to hash of configuration settings
accessor/mutator for the schedule
accessor/mutator for the config storing the mp::config::schedule package
Initiate the mp. This should involve: - setting the configuration defaults to be used - setting up the schedule for the tests (by using a perfSONAR_PS::Services::MP::Config::Schedule - or inherited object) - attaching message handlers for the daemon architecture from $handler - forking off a scheduler to manage the starting of new measurements.
prepare the metadata; in this case, we add the relevant tests into the schedule in preparation for run()
Populates the schedule for tests to be run
is the mp service setup to register with a LS?
actually register the MP service with a LS, ie send some xml of the metadata available form the MP service (ie what tests it can run)
Adds the schedule list of tests from the perfSONAR_PS::MP::Config::Schedule object
Returns the current time in epoch seconds
removes the next test from the schedule
Returns ( $time, $testid ) for the time in epoch seconds $time when test with id $testid should be started. If there is no test defined, returns (undef,undef)
Gives the next testid without removing it from the schedule
Returns ( $time, $testid ); where $start is the epoch seconds when the test $testid should start. If there is no next test, then will return (undef, undef);
Places the test with id $testid into the schedule to run at time $time.
accessor/mutator for the number of max child threads/processes
Forks off a new instance that will act as a manager/boss class for the scheduling and forking off of new measurements.
Starts an endless loop scheduling and running tests as defined in $self->{'STORE'} until the program is terminated.
block signal interruption
allow interrupts to do as they did
Blocking function that sleeps until the next test. Problem is that ipc can cause the sleep to exit for any signal. Therefore, some cleverness in determine the actual slept time is required.
Spawns off a forked instance in order to run the test with id $testid. It will create a perfSONAR_PS::MP::Agent class that will actuall deal with the running of the test and collation of the results.
The forked instance will also deal with the storage defintions by calling the $self->storeData() method which should be overridden by inheriting classes to store the output of the $agent into an MA.
Forked instance will exit at the end of the test. Success and or failure is not propogated back up the stack.
Returns the relevant perfSONAR_PS::MP::Agent class to use for the test. As this class should be inherieted, this method should be overridden to return the appropraote agent to use for the hash $test that contains the parameters for the test.
Does the relevant storage of data collected from the $agent for the test id $testid.
perfSONAR_PS::Services::Config::Schedule, perfSONAR_PS::Services::MP::Agent::Base, perfSONAR_PS::Services::Base,
To join the 'perfSONAR-PS' mailing list, please visit:
https://mail.internet2.edu/wws/info/i2-perfsonar
The perfSONAR-PS subversion repository is located at:
https://svn.internet2.edu/svn/perfSONAR-PS
Questions and comments can be directed to the author, or the mailing list.
$Id: Base.pm 524 2007-09-05 17:35:50Z aaron $
Yee-Ting Li, ytl@slac.stanford.edu
You should have received a copy of the Internet2 Intellectual Property Framework along with this software. If not, see <http://www.internet2.edu/membership/ip.html>
Copyright (c) 2004-2007, Internet2 and the University of Delaware
All rights reserved.
| perfSONAR_PS-Services-PingER documentation | Contained in the perfSONAR_PS-Services-PingER distribution. |
package perfSONAR_PS::Services::MP::Scheduler;
# inherit from the base, so that the scheduling methods are available as change of # parent class use perfSONAR_PS::Services::Base; use base 'perfSONAR_PS::Services::Base'; use version; our $VERSION = 0.09; use fields qw( SCHEDULE METADATA MAXCHILDREN ); use Time::HiRes qw ( &gettimeofday ); use POSIX; use Log::Log4perl qw(get_logger); our $logger = get_logger( CLASS ); use constant CLASS => 'perfSONAR_PS::Services::MP::Scheduler'; use strict; # basename for configuration key our $basename = undef; my $MANAGER_PID = undef; my $CHILDREN_OCCUPIED = 0; my %CHILD = (); my $i = 0;
sub new { my $package = shift; my $self = $package->SUPER::new( @_ ); $self->{SCHEDULE} = {}; $self->{METADATA} = {}; $self->{MAXCHILDREN} = 2; return $self; }
sub schedule { my $self = shift; if ( @_ ) { $self->{SCHEDULE} = shift; } return $self->{SCHEDULE}; }
sub config { my $self = shift; if ( @_ ) { $self->{METADATA} = shift; } return $self->{METADATA}; }
sub init { my $self = shift; my $handler = shift; # this should be inherited # check handler type $logger->logdie( "Handler is of incorrect type: $handler") unless ( UNIVERSAL::can( $handler, 'isa') && $handler->isa( 'perfSONAR_PS::RequestHandler' ) ); # set up defaults in the config # create a config object and set it to the file 'file' my $schedule = perfSONAR_PS::Services::MP::Config::Schedule->new(); $schedule->load( 'file' ); # set up the schedule with the list of tests $self->addTestSchedule( $schedule ); # add the appropiate #$handler->addMessageHandler("SetupDataRequest", "", $self); #$handler->addMessageHandler("MetadataKeyRequest", "", $self); return 0; }
sub prepareMetadata { my $self = shift; my @testids = $self->config()->getAllTestIds(); $logger->debug( "TESTS: \n" . join "\n", @testids ); $logger->logdie( "Scheduler could not determine any tests to run.") if ( scalar @testids < 1 ); # popule schedule my $now = &getNowTime(); foreach my $id ( @testids ) { my $delta = $self->config()->getTestNextTimeFromNowById( $id ); next if ! defined $delta; my $time = $now + $delta; $logger->debug( "Add testid '$id' to run at '$time' delta=" . $delta ); $self->addNextTest( $time, $id ); } return 0; }
sub parseMetadata { my $self = shift; return 0; }
sub needLS($) { my ($self) = @_; return 0; }
sub registerLS($) { my $self = shift; return 0; }
sub addTestSchedule { my $self = shift; my $schedule = shift; $logger->logdie( 'missing argument schedule') unless defined $schedule; $self->config( $schedule ); $logger->logdie( "argument must be of object type perfSONAR_PS::Services::MP::Config::Schedule") unless UNIVERSAL::can( $schedule, 'isa') && $schedule->isa( 'perfSONAR_PS::Services::MP::Config::Schedule' ); $logger->info( "Initiating scheduler with " . scalar $self->config()->getAllTestIds() . " tests" ); $self->parseMetadata(); $self->prepareMetadata(); return 0; }
sub getNowTime { my ( $nowTime, $nowMSec ) = &gettimeofday; return $nowTime . '.' . $nowMSec; }
sub shiftNextTest { my $self = shift; my ( $time, $testid ) = $self->peekNextTest(); if ( defined $time ) { # may be an array my $array = $self->schedule()->{$time}; my $test = shift @$array; # put it back if we still have entries for same time if ( scalar @$array ) { $self->schedule()->{$time} = $array; } # or clear it if empty else { delete $self->schedule()->{$time}; } return ( $time, $testid ); } return ( undef, undef ); }
sub peekNextTest { my $self = shift; my @times = sort {$a<=>$b} keys %{$self->schedule()}; if ( scalar @times ) { my $time = $times[0]; my $test = $self->schedule()->{$time}->[0]; my ( $testid, @tmp ) = keys %$test; #FIXME return ( $time, $testid ); } return ( undef, undef ); }
sub addNextTest { my $self = shift; my $time = shift; my $testid = shift; # if tehre is already at test at this time, append it push @{$self->schedule()->{$time}}, { $testid => 1 }; return; }
sub maxChildren { my $self = shift; return $self->{MAXCHILDREN}; } # make sure we kill the forking manager also when a signal is sent sub KILL { my $logger = get_logger( CLASS ); kill( "TERM", $MANAGER_PID); my $pid = undef; exit; } # takes care of dead children sub REAPER { my $logger = get_logger( CLASS ); $SIG{CHLD} = \&REAPER; my $pid = undef; while( ( $pid = waitpid( -1, &WNOHANG) ) > 0 ) { my %map = reverse %CHILD; my $child = $map{$pid}; delete $CHILD{$child} if defined $child && $CHILD{$child}; $CHILDREN_OCCUPIED = keys %CHILD; $logger->debug( "Child thread $pid died (left " . $CHILDREN_OCCUPIED . ")" ); } return; }
sub run { my $self = shift; my $processName = shift; # don't bother if there are no tests if ( scalar keys %{$self->schedule()} < 1 ) { $logger->logdie( "No tests are scheduled - please check " . $self->config()->configFile() ); } my $sigset = POSIX::SigSet->new(SIGINT); sigprocmask( SIG_BLOCK, $sigset) or $logger->logdie( "Can't block SIGINT for fork: $!\n" ); my $pid = undef; $SIG{INT} = \&KILL; $SIG{CHLD} = \&REAPER; # fork! $logger->logdie( "fork failed: $!" ) unless defined ($pid = fork); if ($pid) { # Parent records the child's birth and returns. sigprocmask(SIG_UNBLOCK, $sigset) or $logger->logdie( "Can't unblock SIGINT for fork: $!\n" ); $MANAGER_PID = $pid; } elsif ($pid == 0) { # setup handler to exit children $SIG{INT} = sub { my $logger = get_logger( CLASS ); foreach my $pid ( keys %CHILD ){ #$logger->fatal( "killing $pid "); kill( "TERM", -$$ ); } }; $0 = $processName; $self->__run(); exit(0); } return 0; }
sub __run { my $self = shift; while( 1 ) { my $badExit = 0; # wait for a signal from a dead child or something # if all children are occupised if ( $CHILDREN_OCCUPIED >= $self->maxChildren() ) { $logger->warn("Scheduler at max forks (" . $self->maxChildren() . "); waiting..."); sleep; } if (! exists $CHILD{$i} ) { # block until next scheduled test is up my ( $testTime, $testid ) = $self->waitForNextTest(); if ( defined $testid ) { my $sigset = $self->blockInterrupts('while'); # actually do the test! my $now = &getNowTime(); ( $testTime, $testid ) = $self->shiftNextTest(); # determine when to run the next iteration of this test my $delta = $self->config()->getTestNextTimeFromNowById( $testid ); $logger->debug( "testid '" . $testid . "' will run again in $delta seconds"); $self->addNextTest( $now + $delta, $testid ); $self->unblockInterrupts( $sigset, 'while'); # determine the test details and run it $self->doTest( $i, $testid ); } else { $badExit = 1; } } my $sigset = $self->blockInterrupts('while-check'); if ( ! $badExit ) { $i++; $i -= $self->maxChildren() if( $i >= $self->maxChildren() ); } $self->unblockInterrupts($sigset, 'while-check'); } return; }
sub blockInterrupts { my $self = shift; my $str = shift; # need to block interrupts in case we are still loading etc. my $sigset = POSIX::SigSet->new; my $blockset = POSIX::SigSet->new( SIGCHLD, SIGHUP, SIGUSR1, SIGUSR2, SIGINT ); #$logger->debug( "BLOCKING INTERRUPTS for $str"); sigprocmask(SIG_BLOCK, $blockset, $sigset) or $logger->logdie( "Could not block interrupt signals: $!" ); return $sigset; }
sub unblockInterrupts { my $self = shift; my $sigset = shift; my $str = shift; return 1 if ! defined $sigset; #$logger->debug( "DONE BLOCKING INTERRUPTS for $str"); sigprocmask(SIG_SETMASK, $sigset) or $logger->logdie( "Could not restore interrupt signals: $!" ); return 1; }
sub waitForNextTest { my $self = shift; my $sigset = $self->blockInterrupts('wait'); my ( $time, $testid ) = $self->peekNextTest(); my $now = &getNowTime(); my $wait = $time - $now; $logger->debug( "Waiting $wait seconds for the next test at " . $time ); $self->unblockInterrupts($sigset, 'wait'); # wait some time for a signal if ( $wait > 0.0 ) { select( undef, undef, undef, $wait ); # if we are before the next test time, do not do anything! if ( $now < $time ) { return ( undef, undef ); } # we are behind schedule } else { # don't do anytthing } return ( $time, $testid ); }
sub doTest { my $self = shift; my $forkedProcessNumber = shift; my $testid = shift; $logger->logdie( "Missing argument testid") unless defined $testid; # block signal for fork my $sigset = POSIX::SigSet->new(SIGINT); sigprocmask( SIG_BLOCK, $sigset) or $logger->logdie( "Can't block SIGINT for fork: $!\n" ); my $pid = undef; # fork! $logger->logdie( "fork failed: $!" ) unless defined ($pid = fork); if ($pid) { # Parent records the child's birth and returns. sigprocmask(SIG_UNBLOCK, $sigset) or $logger->logdie( "Can't unblock SIGINT for fork: $!\n" ); # keep state for the parent to keep count of children etc. my $sigset = $self->blockInterrupts('fork'); $CHILD{$forkedProcessNumber} = $pid; $CHILDREN_OCCUPIED++; $self->unblockInterrupts($sigset,'fork'); return; } else { $SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did before $SIG{USR1} = 'DEFAULT'; $SIG{USR2} = 'DEFAULT'; $SIG{HUP} = 'DEFAULT'; # run the test my $test = $self->config()->getTestById( $testid ); $logger->debug( "RUN TEST: " . Data::Dumper::Dumper $test ); my $agent = $self->getAgent( $test ); # collector will return -1 if error occurs if ( $agent->collectMeasurements() < 0 ) { # error! $logger->fatal( "Could not collect measurement for '$testid'" ); } else { # get the results out $logger->info( "Collecting measurements for '$testid'"); # write to teh stores $self->storeData( $agent, $testid ); } exit; } }
sub getAgent { my $self = shift; $logger->logdie( "getAgent should be inherited"); # don't forget to init() the agent! return undef; }
sub storeData { my $self = shift; my $agent = shift; my $testid = shift; $logger->logdie( "storeData should be overridden"); return -1; } 1;