Schedule::Load::Reporter - Distributed load reporting daemon


Schedule-Load documentation Contained in the Schedule-Load distribution.

Index


Code Index:

NAME

Top

Schedule::Load::Reporter - Distributed load reporting daemon

SYNOPSIS

Top

  use Schedule::Load::Reporter;

  Schedule::Load::Reporter->start(dhost=>('host1', 'host2'),
				  port=>1234,);

DESCRIPTION

Top

Schedule::Load::Reporter on startup connects to the requested server host and port. The server connected to can then poll this host for information about system configuration and current loading conditions.

start ([parameter=>value ...]);

Starts the reporter. Does not return.

PARAMETERS

Top

dhost

List of daemon hosts that may be running the slchoosed server. The second host is only used if the first is down, and so on down the list.

port

The port number of slchoosed. Defaults to 'slchoosed' looked up via /etc/services, else 1752.

fake

Specifies load management should not be used, for reporting of a "fake" hosts' status or scheduling a non-host related resource, like a license.

min_pctcpu

The minimum percentage of the CPU that a job must have to be included in the list of top processes sent to the client. Defaults to 3. Setting to 0 will consume a lot of bandwidth.

stored_filename

The filename to store persistent items in, such as if this host is reserved. Must be either local-per-machine, or have the hostname in it. Defaults to /usr/local/lib/rschedule/slreportd_{hostname}_store. Set to undef to disable persistence (thus if the machine reboots the reservation is lost.) The path must be **ABSOLUTE** as the daemons do a chdir.

DISTRIBUTION

Top

The latest version is available from CPAN and from http://www.veripool.org/.

Copyright 1998-2011 by Wilson Snyder. This package is free software; you can redistribute it and/or modify it under the terms of either the GNU Lesser General Public License Version 3 or the Perl Artistic License Version 2.0.

AUTHORS

Top

Wilson Snyder <wsnyder@wsnyder.org>

SEE ALSO

Top

Schedule::Load, slreportd


Schedule-Load documentation Contained in the Schedule-Load distribution.

# Schedule::Load::Reporter.pm -- distributed lock handler
# See copyright, etc in below POD section.
######################################################################

package Schedule::Load::Reporter;
require 5.004;
require Exporter;
@ISA = qw(Exporter);

use Socket;
use IO::Socket;
use IO::Select;  # IO::Select is ok instead of IO::Poll as we only have at max 2 handles
use POSIX;

use Proc::ProcessTable;
use Unix::Processors;
use Storable qw();
use Schedule::Load qw (:_utils);
use Schedule::Load::FakeReporter;

use Sys::Hostname;
use Time::HiRes qw (gettimeofday);
use IPC::PidStat;
use Config;

use strict;
use vars qw($VERSION $RSCHLIB $Debug %User_Names %Pid_Inherit
	    @Pid_Time_Base @Pid_Time $Os_Linux
	    $Distrust_Pctcpu $Divide_Pctcpu_By_Cpu $ProcTimeToSec
	    $Exister
	    );
use Carp;

######################################################################
#### Configuration Section

# Other configurable settings.
$Debug = $Schedule::Load::Debug;

$VERSION = '3.064';

$RSCHLIB = '/usr/local/lib';	# Edited by Makefile

$Os_Linux = $Config{osname} =~ /linux/i;
$Distrust_Pctcpu = $Config{osname} !~ /solaris/i;	# Only solaris has instantanous reporting
$Divide_Pctcpu_By_Cpu = 0;   # Older linuxes may require this
$ProcTimeToSec = ($Config{osname} =~ /linux/i) ? 1e-6 : 1e-3;  # Fix in Proc::ProcessTable 0.40

######################################################################
#### Globals

# This is the self elemenst sent over the socket:
# $self->{const}{config_element_name} = value	# Such as things from ENV
# $self->{load}{load_element} = value		# Overall loading info
# $self->{proc}{process#}{proc_element} = value	# Per process info

# Cache of user name based on UID
%User_Names = ();

# Cache of fixed loads based on PID
%Pid_Inherit = ();

######################################################################
#### Creator

sub start {
    # Establish the reporter
    @_ >= 1 or croak 'usage: Schedule::Load::Reporter->start ({options})';
    my $proto = shift;
    my $class = ref($proto) || $proto;
    my $self = {
	%Schedule::Load::_Default_Params,
	#Documented
	#Undocumented
	timeout=>$Debug?2:30,		# Sec before host socket connect times out
	alive_time=>$Debug?10:30,	# Sec to send alive message (must be sooner than Chooser's ping_dead_time)
	stats_interval=>$Debug?2:60,	# Sec between polling of interval based plugin statistics
	const_changed=>0,		# const or stored has changed, update in chooser
	plugins => [],			# Plugin objects 
	@_};
    bless $self, $class;

    # More defaults (can't be above due to needing other elements)
    $self->{const}{hostname} ||= hostname();
    $self->{const}{slreportd_hostname} ||= hostname();
    $self->{const}{slreportd_version} ||= $VERSION;
    $self->{stored_filename} ||= ($RSCHLIB."/rschedule/slreportd_".$self->{const}{hostname}."_store");

    (defined $self->{dhost}) or croak 'Require a host parameter';
    #foreach (@{$self->{dhost}}) { print "Host $_\n"; }

    my $select = IO::Select->new();

    $Exister = new IPC::PidStat();
    $select->add($Exister->fh);

    $self->pt();	# Create process table
    $self->fake_pt();	# Create process table

    # Load constants
    $self->_fill_const;
    $self->_fill_stored;
    $self->_fill_dynamic;

    my $inbuffer = '';

    my $poll_interval = $self->{alive_time};  # How often to wake up while loop, at maximum
    $poll_interval = $self->{stats_interval} if $poll_interval > $self->{stats_interval};
    $poll_interval ||= 1;  # as 0 would busy-wait!
    my $last_alive_sec = 0;
    my $last_stats_sec = 0;

    foreach my $plugin (@{$self->{plugins}}) {
	# Call twice, as some stats are interval based
	$plugin->poll();  # Initialize plugin stats
	$plugin->poll();  # Initialize plugin stats
    }

  service_loop:
    while (1) {
	my ($now_sec, $now_usec) = gettimeofday();

	# See if chooser is alive
	if ($self->{socket}
	    && (($now_sec - $last_alive_sec) >= $self->{alive_time})) {
	    _alive_check ($self);
	    $last_alive_sec = $now_sec;
	}

	# See if stats need polling
	if (($now_sec - $last_stats_sec) >= $self->{stats_interval}) {
	    foreach my $plugin (@{$self->{plugins}}) {
		$plugin->poll($now_sec, $now_usec);
	    }
	    $last_stats_sec = $now_sec;
	}

	if (! $self->{socket}) {
	    # Open the socket to first found host
	    foreach my $host (@{$self->{dhost}}) {
		last if ($self->_open_host($host));
	    }
	    $select->remove($select->handles);
	    $select->add($Exister->fh);
	    $select->add($self->{socket}) if $self->{socket};
	    $inbuffer = '';
	}

	# Wait for someone to become active
	# or send a alive message every 60 secs (in case slchoosed goes down & up)
	sleep($poll_interval) if ($select->count() == 0); # select won't block if no fd's

	foreach my $fh ($select->can_read ($poll_interval)) {
	    print "Servicing input\n" if $Debug;
	    if ($fh == $Exister->fh) {
		_exist_traffic();
	    }
	    else {
		# Snarf input
		if ($inbuffer !~ /\n/) {
		    my $data='';
		    my $rv = $fh->sysread($data, POSIX::BUFSIZ);
		    if (!defined $rv || (length $data == 0)) {
			# May have disconnected; force an alive check
			$last_alive_sec = 0;
			next service_loop;
		    }
		    $inbuffer .= $data;
		}

		while ($inbuffer =~ s/(.*?)\n//) {
		    my $line = $1;
		    chomp $line;
		    print "REQ $line\n" if $Debug;
		    my ($cmd, $params) = _pthaw($line, $Debug);
		    # Commands
		    if ($cmd eq "report_get_dynamic") {
			$self->_fill_and_send;
		    } elsif ($cmd eq "report_fwd_set") {
			$self->_set_stored($params);
		    } elsif ($cmd eq "report_fwd_comment") {
			$self->_comment($params);
		    } elsif ($cmd eq "report_fwd_fixed_load") {
			$self->_fixed_load($params);
		    } elsif ($cmd eq "report_restart") {
			# Overall fork loop will deal with it.
			warn "-Info: report_restart\n" if $Debug;
			exit(0);
		    } else {
			warn "%Error: Bad request from server: $line\n" if $Debug;
		    }
		}
	    }
	}
    }
}

######################################################################
######################################################################
#### Accessors

sub pt {
    my $self = shift;
    if (!$self->{pt}) {
	$self->{pt} = new Proc::ProcessTable( 'cache_ttys' => 1 );
    }
    return $self->{pt};
}

sub fake_pt {
    my $self = shift;
    if (!$self->{fake_pt}) {
	$self->{fake_pt} = Schedule::Load::FakeReporter::ProcessTable
	    ->new (reportref=>$self);
    }
    return $self->{fake_pt};
}

######################################################################
######################################################################
#### Sending

sub _open_host {
    my $self = shift;
    my $host = shift;
    # Open a socket to the given host return true if successful

    print "Trying host $host $self->{port}\n" if $Debug;
    my $fh = Schedule::Load::Socket->new(
					 PeerAddr  => $host,
					 PeerPort  => $self->{port},
					 Timeout   => $self->{timeout},
				         );
    $self->{socket} = $fh;
    $self->{socket} = undef if (!$fh || !$fh->connected());
    if ($self->{socket}) {
	# Send constants to the host, that will tell it we live
	$self->{stored_read} = 0;   # Reread stored info in case redundant reporters
	$self->{const_changed} = 1;
	$self->{const}{_update} = 0;
	$self->_fill_and_send;
	$self->{const}{_update} = 1;   # So chooser can skip calling start function
    }
    print "   Host $host $self->{port} is ".($self->{socket}?"up":"down")."!\n" if $Debug;
    return $self->{socket};
}

sub _alive_check {
    my $self = shift;
    my $msg = "report_ping\n";
    # Send a line to the socket to see if all is well.
    # This also keeps at least part of the reporter paged-in.
    my $fh = $self->{socket};
    # Below may die if slchoosed goes down:
    # Our fork() loop will catch it and restart
    my $ok = $fh->send_and_check($msg);
    if (!$ok || !$fh || !$fh->connected()) {
	print "Disconnect\n" if $Debug;
	$self->{socket} = undef;
    }
}

######################################################################
######################################################################
######################################################################
#### Send_Hash loading

sub _fill_and_send {
    my $self = shift;
    # Fill dynamic values and send
    $self->_fill_stored;
    $self->_fill_dynamic;
    if ($self->{const_changed}) {
	$self->{const_changed} = 0;
	$self->_send_hash('const');
	$self->_send_hash('stored');
    }
    # Dynamic must be last, it triggers sending info back to user
    $self->_send_hash('dynamic');
}

sub _fill_const {
    my $self = shift;
    # fill constant values into self
    # (Values that don't change with loading -- known at startup)
    $self->{const_changed} = 1;

    # Load our required keys
    $self->{const}{cpus}          ||= Unix::Processors->max_online();
    $self->{const}{physical_cpus} ||= Unix::Processors->max_physical();
    $self->{const}{max_clock}     ||= Unix::Processors->max_clock();
    $self->{const}{osname}    ||= $Config{osname};
    $self->{const}{osvers}    ||= $Config{osvers};
    $self->{const}{archname}  ||= $Config{archname};
    foreach my $field (qw(reservable)) {
	$self->{const}{$field} = 0 if !defined $self->{const}{$field};
    }

    # Look for some special processes (assume init makes them)
    foreach my $p (@{$self->pt->table}) {
	if ($p->fname eq "nicercizerd") {
	    $self->{const}{nicercizerd} = 1;
	}
    }
}

sub _fill_dynamic_pid {
    my $self = shift;
    my $p = shift;	# Processtable entry
    my $pctcpu = shift;
    # Fill a single PID into the dynamic structures

    # Create hash
    $self->{dynamic}{proc}{$p->pid}{pid} = $p->pid;
    my $procref = $self->{dynamic}{proc}{$p->pid};

    # Copy the process table
    # We look inside the private hash, I've requested a new
    # version of ProcessTable to get around this intrusion.
    foreach (keys %{$p}) {
	$procref->{$_} = $p->{$_};
    }
    $procref->{pctcpu} = $pctcpu;

    # Elements that require special work
    if ($Os_Linux) {
	# Something funky is going on with linux
	$procref->{nice} = $p->priority / 1;
	$procref->{nice0} = $procref->{nice};
    } else {
	$procref->{nice0} = $procref->{nice} - 20;
    }

    $procref->{time} = $p->time * $ProcTimeToSec;

    my $state = $p->state;
    $state = "cpu".$p->onpro if ($state eq "onprocessor");
    $procref->{state} = $state;

    my $uid = $p->uid;
    $uid ||= $p->euid if (exists ($p->{euid}));
    $procref->{uname} = $User_Names{$uid};
    if (!defined $procref->{uname}) {	# Cache user names
	$procref->{uname} = getpwuid($uid) || $uid;
	$User_Names{$uid} = $procref->{uname};
    }
}



sub _fill_dynamic {
    my $self = shift;
    # fill process and system loading values into self

    $self->{dynamic} = {total_load => 0,
		        fixed_load => 0,
			report_load => 0,
			total_pctcpu => 0,
			total_size => 0,
			total_rss => 0,
		    };

    my ($sec, $usec) = gettimeofday();
    @Pid_Time_Base = ($sec,$usec) if !defined $Pid_Time_Base[0];
    my $deltastamp = ($sec-$Pid_Time_Base[0]) + 1e-6*($usec-$Pid_Time_Base[1]);
    @Pid_Time_Base = ($sec,$usec);

    # Fill in plugin statistics
    foreach my $plugin (@{$self->{plugins}}) {
	my $stats = $plugin->stats;
	foreach my $key (keys %{$stats}) {
	    $self->{dynamic}{$key} = $stats->{$key};
	}
    }

    # Note the $p refs cannot be cached, they change when a new table call occurs
    my @pidlist;
    if (!$self->{fake}) {
	push @pidlist, @{$self->pt->table};
    }
    push @pidlist, @{$self->fake_pt->table};

    my %pidinfo = ();

    # Find all parental references (should cache this at some point)
    foreach my $p (@pidlist) {
	$pidinfo{$p->pid}{parent} = $p->ppid;
    }

    # Push all logit's down towards parents
    foreach my $p (@pidlist) {
	# See which PIDs we will log
	my $pctcpu = $p->pctcpu || 0;
	$pctcpu = 0 if ($pctcpu eq "inf");	# Linux
	if ($Distrust_Pctcpu) {
	    my $ustime = ($p->utime+$p->stime);
	    if (!$ustime
		|| !defined $Pid_Time[$p->pid]
		|| $p->start != $Pid_Time[$p->pid][0]) {
		# Can't calculate, as p->start is wrong (on linux).  We'll assume the
		# pctcpu is ok.
		#$pctcpu = $ustime / (1000*($sec-$p->start));
		printf "PIDSTART %d SINCESTART %d-%d=%d UTIME %d LOAD %f\n"
		    ,$p->pid, $sec, $p->start, $sec-$p->start, $ustime, $pctcpu
		    if 0;
	    } else {
		$pctcpu = 100*(( ($ustime-$Pid_Time[$p->pid][1])
				 * $ProcTimeToSec)
			       / $deltastamp  # Seconds
			       );
		$pctcpu /= $self->{const}{cpus} if $Divide_Pctcpu_By_Cpu;
		printf "PIDCONT %d PCT %s CLOCK %d UTIME %d-%d=%d LOAD %f\n"
		    ,$p->pid, $p->pctcpu||0, $deltastamp,
		    ,$ustime, $Pid_Time[$p->pid][1], $ustime-$Pid_Time[$p->pid][1],
		    ,$pctcpu if $Debug;
	    }
	    $Pid_Time[$p->pid] = [$p->start, $ustime];
	}
	$pidinfo{$p->pid}{pctcpu} = $pctcpu;

	my $logit = ($pctcpu >= $self->{min_pctcpu}
		     && $p->pid != $$);	# Ignore ourself (hopefully not TOO much cpu time!)
	$pidinfo{$p->pid}{logit} = $logit;

	if ($p->uid) { # not root (speed things up)
	    my $searchpid = $p->pid;
	    #my $indent = 0;
	    while ($searchpid) {
		#printf " %s %s\n", $p->pid, " "x($indent++). $searchpid;
		$pidinfo{$searchpid}{logit_somechild} = 1 if $logit;
		$searchpid = $pidinfo{$searchpid}{parent};
	    }
	}
    }

    foreach my $p (@pidlist) {
	my $fixed_load = undef;
	my $cmndcomment = undef;
	my $logit = $pidinfo{$p->pid}{logit};
	if ($p->uid) { # not root
	    my $searchpid = $p->pid;
	    while ($searchpid) {
		if (defined $Pid_Inherit{$searchpid}) {
		    if ((!defined $fixed_load)
			&& defined $Pid_Inherit{$searchpid}{fixed_load}) {
			$fixed_load = $Pid_Inherit{$searchpid};
			if ($searchpid == $p->pid
			    && !$pidinfo{$searchpid}{logit_somechild}
			    ) {
			    $logit = 1;  # Show this fixed_load process, he has no children to show
			}
			printf "Found fixed_load %s\n", $p->pid if $Debug;
		    }
		    if ((!defined $cmndcomment)
			&& defined $Pid_Inherit{$searchpid}{cmndcomment}) {
			$cmndcomment = $Pid_Inherit{$searchpid}{cmndcomment};
		    }
		}
		$searchpid = $pidinfo{$searchpid}{parent};
	    }
	}

	# Load any processes with lots of time, or with fixed_loading
	# that isn't otherwise accounted for
	my $pctcpu = $pidinfo{$p->pid}{pctcpu};
	$pctcpu = 0 if $pctcpu eq 'nan';
	if ($logit) {
	    _fill_dynamic_pid ($self, $p, $pctcpu);
	    $self->{dynamic}{proc}{$p->pid}{cmndcomment} = $cmndcomment if $cmndcomment;
	}

	# Count total loading
	$self->{dynamic}{total_pctcpu} += $pctcpu;
	if (($p->pid != $$)) {	# Exclude ourself
	    my $load = ($self->{const}{load_pctcpu}
			? ($pctcpu/100.0)
			: (($p->state eq "run" || $p->state eq "onprocessor") ? 1:0));
	    $load = 1 if ($load > 0.90 && $load < 1.10);  # 90% of a CPU really is close to full CPU, as slreportd takes some time itself
	    if ($load) {
		$self->{dynamic}{total_load}  += $load;
		$self->{dynamic}{report_load} += $load if !defined $fixed_load;
		#print "PID ",$p->pid," ADD LOAD $load PCT $pctcpu\n" if $Debug;
	    }
	}

	# Count memory
	$self->{dynamic}{total_size} += _fix_overflow($p->size||0);  # Float, so doesn't overflow
	$self->{dynamic}{total_rss}  += _fix_overflow($p->rss||0);  # Float, so doesn't overflow
    }

    # Look for any fixed loads that died
    # Also add up fixed loading across all fixed_loads
    foreach my $pid (keys %Pid_Inherit) {
	if (!defined $pidinfo{$pid}
	    && $Pid_Inherit{$pid}{req_hostname} eq hostname()) {  # Not a fake load on a remote host
	    delete $Pid_Inherit{$pid};
	} else {
	    my $fixed_load = $Pid_Inherit{$pid}{fixed_load};
	    if (defined $fixed_load) {
		printf "Added fixed load for %s\n", $pid if $Debug;
		$self->{dynamic}{fixed_load} += $fixed_load;
	    }
	}
    }

    $self->{dynamic}{report_load} += $self->{dynamic}{fixed_load};
}

sub _fixed_load {
    my $self = shift;
    my $params = shift;

    my $load = $params->{load};
    my $pid = $params->{pid};
    print "Fixed load of $load PID $pid\n" if $Debug;
    $load = $self->{const}{cpus} if $load<0;   # Allow -1 for all CPUs
    $Pid_Inherit{$pid}{fixed_load} = $load;
    $Pid_Inherit{$pid}{pid} = $params->{pid};
    $Pid_Inherit{$pid}{uid} = $params->{uid};
    $Pid_Inherit{$pid}{req_pid} = $params->{req_pid};
    $Pid_Inherit{$pid}{req_hostname} = $params->{req_hostname} || $params->{host} || hostname();
    if ($load==0) {
	delete $Pid_Inherit{$pid};
    }
}

sub _comment {
    my $self = shift;
    my $params = shift;

    my $cmndcomment = $params->{comment};
    my $pid = $params->{pid};
    print "Command Commentary '$cmndcomment' PID $pid\n" if $Debug;
    $Pid_Inherit{$pid}{pid} = $pid;
    $Pid_Inherit{$pid}{uid} = $params->{uid};
    $Pid_Inherit{$pid}{cmndcomment} = $cmndcomment;
}

######################################################################
#### Math

sub _fix_overflow {
    my $value = shift;
    # Bug in Proc::ProcessTable before version 0.40 causes 32 bit overflow
    my $float = 0.1 + $value;
    $float = 4.0*1024*1024*1024 - $float if $float<0;
    return $float;
}

######################################################################
#### Sending the hash to slchoosed

sub _send_hash {
    my $self = shift;
    my $field = shift;
    # Send the hash over the file handle

    my $fh = $self->{socket};
    return if !$fh;
    my $ok = $fh->send_and_check(_pfreeze("report_$field", $self->{$field}, $Debug));
    if (!$ok || !$fh || !$fh->connected()) { $self->{socket} = undef; }
}

######################################################################
######################################################################
#### Existance

sub _exist_traffic {
    # Handle UDP responses from our $Exister->pid_request calls.
    print "UDP PidStat in...\n" if $Debug;
    my ($pid,$exists,$onhost) = $Exister->recv_stat();
    return if !defined $pid;
    return if !defined $exists || $exists;   # We only care about known-missing processes
    print "  UDP PidStat PID $onhost:$pid no longer with us.  RIP.\n" if $Debug;
    foreach my $pref (values %Pid_Inherit) {
	if ($pref && $pref->{req_pid}==$pid && $pref->{req_hostname} eq $onhost) {
	    delete $Pid_Inherit{$pref->{pid}};
	}
    }
}

######################################################################
######################################################################
######################################################################
######################################################################
#### Stored configuration

sub _fill_stored {
    my $self = shift;
    # Get stored fields
    # SHOULD: If already cached, check the file date and reread if needed
    # BUT: Currently only this program changes it, so we don't care!
    if (!$self->{stored_read}) {
	$self->{stored} = {
	    reserved=>0,
	};
	if (defined $self->{stored_filename}
	    && -r $self->{stored_filename}) {
	    print "Retrieve $self->{stored_filename}\n" if $Debug;
	    $self->{stored} = Storable::retrieve($self->{stored_filename});
	}
	$self->{const_changed} = 1;
	$self->{stored_read} = 1;
    }
}

sub _set_stored {
    my $self = shift;
    my $params = shift;
    # Set a stored field to a given value

    $self->_fill_stored();	# Make sure up-to-date
    $self->{const_changed} = 1;

    foreach my $var (keys %{$params}) {
	my $value = $params->{$var};
	next if $var eq "host";
	print "_set_const($var = $value)\n" if $Debug;
	if ($params->{set_const}) {
	    $self->{const}{$var} = $value;
	} else {
	    $self->{stored}{$var} = $value;
	}
    }

    if (!$params->{set_const}
	&& defined $self->{stored_filename}) {
	print "Store $self->{stored_filename}\n" if $Debug;
	Storable::nstore $self->{stored}, $self->{stored_filename};
	chmod 0666, $self->{stored_filename};
    }
}

######################################################################
#### Package return
1;

######################################################################
__END__