/usr/local/CPAN/App-FQStat/App/FQStat/Scanner.pm
package App::FQStat::Scanner;
# App::FQStat is (c) 2007-2009 Steffen Mueller
#
# This program is free software; you can redistribute it and/or
# modify it under the same terms as Perl itself.
use strict;
use warnings;
use Time::HiRes qw/sleep/;
use String::Trigram ();
use DateTime ();
use Time::Zone ();
use App::FQStat::Debug;
# run qstat
sub run_qstat {
warnenter if ::DEBUG;
my $forced = shift;
lock($::ScannerStartRun);
if (not defined $::ScannerThread) {
warnline "Creating new (initial?) scanner thread" if ::DEBUG;
$::ScannerThread = threads->new(\&App::FQStat::Scanner::scanner_thread);
}
elsif ($::ScannerThread->is_joinable()) {
warnline "Joining scanner thread" if ::DEBUG;
my $return = $::ScannerThread->join();
($::Records, $::NoActiveNodes) = @$return;
$::Summary = [];
$::Initialized = 1;
{ lock($::RecordsChanged); $::RecordsChanged = 1; }
warnline "Joined scanner thread. Creating new scanner thread" if ::DEBUG;
$::ScannerThread = threads->new(\&App::FQStat::Scanner::scanner_thread);
}
elsif (!$::ScannerThread->is_running()) {
warnline "scanner thread not running. Creating new scanner thread" if ::DEBUG;
undef $::ScannerThread;
$::ScannerThread = threads->new(\&App::FQStat::Scanner::scanner_thread);
}
elsif ($forced) {
warnline "scanner thread running. Force in effect, setting StartRun" if ::DEBUG;
$::ScannerStartRun = 1;
}
}
sub scanner_thread {
warnenter if ::DEBUG;
{
lock($::ScannerStartRun);
$::ScannerStartRun = 0;
}
my @lines;
my @args;
{
lock($::SummaryMode);
if ($::SummaryMode) {
push @args, '-u', '*';
}
else {
lock($::User);
push @args, '-u', ( (defined($::User) && $::User ne '') ? $::User : '*');
}
}
my $timebefore = time();
my $qstat = App::FQStat::Config::get("qstatcmd");
my $output = App::FQStat::System::run_capture($qstat, @args);
if (not defined $output) {
die "Running 'qstat' failed!";
}
my $duration = time()-$timebefore;
# Update the update interval according to the time it takes
{
lock($::Interval);
if ($duration >= $::Interval) {
$::Interval = ($duration > $::Interval*1.8 ? $duration+1.0 : $::Interval*1.8);
}
elsif ($duration < $::Interval and $duration > $::UserInterval) {
$::Interval = ($::Interval/1.1 > $::UserInterval ? $::Interval/1.1 : $::UserInterval);
}
}
@lines = split /\n/, $output;
shift @lines;
shift @lines;
my $noActiveNodes = 0;
foreach my $line (@lines) {
$line =~ s/^\s+//;
my $rec = [split /\s+/, $line];
$rec->[7] = '' if not $rec->[7] =~ /\D/;
my @date = split /\//, $rec->[5];
@date = @date[1, 0, 2];
my @jobdesc;
@jobdesc = (
$rec->[0], # F_id
$rec->[1], # F_prio
$rec->[2], # F_name
$rec->[3], # F_user
$rec->[4], # F_status
join('.', @date), # F_date
$rec->[6], # F_time
$rec->[7], # F_queue
);
$noActiveNodes++ if $rec->[4] =~ /^\s*r\s*$/;
$line = \@jobdesc;
}
reverse_records(\@lines) if $::RecordsReversed; # retain state of reversal
sort_current(\@lines);
lock($::DisplayOffset);
lock(@::Termsize);
my $limit = @lines - $::Termsize[1]+4;
if ($::DisplayOffset and $::DisplayOffset > $limit) {
$::DisplayOffset = $limit;
}
sleep 0.1; # Note to self: fractional sleep without HiRes => CPU=100%
warnline "End of scanner_thread" if ::DEBUG;
return [\@lines, $noActiveNodes];
}
# sorts the qstat output by $::SortField
sub sort_current {
warnenter if ::DEBUG;
my $lines = shift;
my $sortfield;
{
lock($::SortField);
if (not defined $::SortField or $::SortField eq '' or not exists $::Columns{$::SortField}) {
warnline "Nothing to sort" if ::DEBUG;
return;
}
$sortfield = $::SortField;
}
my $key = $sortfield;
my $key_index = ::RECORD_KEY_CONSTANT()->{$key};
my $order;
$order = $::Columns{$sortfield}{order} unless $sortfield eq 'status';
$order = 'status' if $sortfield eq 'status';
warnline "Sorting: key=$key order=$order" if ::DEBUG;
return if not defined $order;
my $time = time(); # for debugging / profiling
if ($order eq 'status') {
@$lines =
map { $_->[0] }
sort { $a->[1] <=> $b->[1] }
map {
my $s = $_->[::F_status];
if ($s =~ /[Ed]/) { $s = 0 }
elsif ($s =~ /r/) { $s = 1 }
elsif ($s =~ /t/) { $s = 2 }
elsif ($s =~ /w/) { $s = 3 }
else { $s = 4 }
[$_, $s]
}
@$lines;
}
elsif ($order eq 'time') {
::debug "Sorting by time";
@$lines =
map { $_->[0] }
sort { $a->[1] <=> $b->[1] or $a->[2] <=> $b->[2] or $a->[3] <=> $b->[3] }
map { [$_, split(/:/, $_->[$key_index])] }
@$lines;
}
elsif ($order eq 'date') {
::debug "Sorting by date";
@$lines =
map { $_->[0] }
sort { $b->[1] <=> $a->[1] or $b->[2] <=> $a->[2] or $b->[3] <=> $a->[3] }
map { [$_, split(/\./, $_->[$key_index])] }
@$lines;
}
elsif ($order eq 'num') {
::debug "Sorting numerically";
@$lines =
sort { $a->[$key_index] <=> $b->[$key_index] }
@$lines;
}
elsif ($order eq 'num_highlow') {
::debug "Sorting numerically high to low";
@$lines =
sort { $b->[$key_index] <=> $a->[$key_index] }
@$lines;
}
else { # default to alpha
::debug "Sorting alphabetically";
@$lines =
sort { $a->[$key_index] cmp $b->[$key_index] }
@$lines;
}
{
lock($::RecordsReversed);
lock($::RecordsChanged);
reverse_records($::Records) if $::RecordsReversed;
$::RecordsChanged = 1 if $::RecordsReversed;
}
if (::DEBUG()) {
my $diff = time()-$time;
::debug "Sorting took $diff seconds.";
}
}
# reverse the current set of records
sub reverse_records {
warnenter if ::DEBUG;
my $lines = shift;
@$lines = reverse @$lines;
}
# calculate the job summary
sub calculate_summary {
warnenter if ::DEBUG;
$::Summary = [];
my $offset = Time::Zone::tz_local_offset();
my $curtime = time() + $offset;
# cluster by user name
my %user_clusters;
foreach my $job (@$::Records) {
my $user = $job->[::F_user];
$user_clusters{$user} ||= [];
push @{$user_clusters{$user}}, $job;
}
if (App::FQStat::Config::get("summary_clustering")) {
my $trigram = String::Trigram->new(
minSim => App::FQStat::Config::get("summary_clustering_similarity"),
warp => 1.2,
cmpBase => [],
);
my %user_name_clusters;
# cluster by similarity
foreach my $user (keys %user_clusters) {
$trigram->reInit([]);
my %jname_clusters;
my $jobs = $user_clusters{$user};
foreach my $job (@$jobs) {
my $jname = $job->[::F_name];
# ignore numbers
$jname =~ s/\d+//g;
if (keys %jname_clusters) {
my @bestmatch;
my $sim = $trigram->getBestMatch($jname, \@bestmatch);
if (@bestmatch and $sim) {
push @{ $jname_clusters{$bestmatch[0]} }, $job;
}
else {
$jname_clusters{$jname} ||= [];
push @{ $jname_clusters{$jname} }, $job;
$trigram->extendBase([$jname]);
}
}
else {
$jname_clusters{$jname} = [$job];
$trigram->extendBase([$jname]);
}
}
foreach my $jname (keys %jname_clusters) {
my $clustername = "$user;$jname";
$user_name_clusters{$clustername} ||= [];
push @{$user_name_clusters{$clustername}}, @{$jname_clusters{$jname}};
delete $jname_clusters{$jname};
}
} # end foreach user cluster
%user_clusters = %user_name_clusters;
}
# actually calculate the summaries for each cluster
foreach my $user (keys %user_clusters) {
my $jobs = $user_clusters{$user};
my %n_status = (r => 0, E => 0, h => 0, 'qw' => 0);
my $prio_sum = 0;
my $nprio = 0;
my $runtime_sum = 0;
my $njobs_started = 0;
my $max_runtime = 0;
foreach my $job (@$jobs) {
my $prio = $job->[::F_prio];
$nprio++, $prio_sum += $prio if $prio > 1.e-2;
# find job status
for ($job->[::F_status]) {
if (/^[rt]$/) { $n_status{r}++; }
elsif (/(?:^d|E)/) { $n_status{E}++; }
elsif (/h(?:qw|r|t)/) { $n_status{h}++; }
else { $n_status{qw}++; }
}
if ($job->[::F_status] =~ /^h?[rt]$/) {
my ($day, $month, $year) = split /\./, $job->[::F_date];
my ($hour, $minute, $second) = split /:/, $job->[::F_time];
my $dt = DateTime->new(year => $year, month => $month, day => $day,
hour => $hour, minute => $minute, second => $second);
my $runtime = $curtime - $dt->epoch();
$runtime_sum += $runtime;
$max_runtime = $runtime if $runtime > $max_runtime;
$njobs_started++;
}
}
($user, my $jobname) = split /;/, $user;
my $runtime = '';
if ($njobs_started) {
my $seconds = $runtime_sum/$njobs_started;
my $hours = int($seconds / 3600);
my $minutes = int($seconds / 60 - $hours*60);
$seconds = int($seconds) % 60;
$runtime = sprintf('%02u:%02u:%02u', $hours, $minutes, $seconds);
$seconds = $max_runtime;
$hours = int($seconds / 3600);
$minutes = int($seconds / 60 - $hours*60);
$seconds = int($seconds) % 60;
$max_runtime = sprintf('%02u:%02u:%02u', $hours, $minutes, $seconds);
}
else {
$max_runtime = '';
}
my $line = [ $user, $jobname, @n_status{'r', 'E', 'h', 'qw'}, ($nprio?$prio_sum/$nprio:0), $runtime, $njobs_started, $max_runtime ];
push @$::Summary, $line;
} # end for each user
@$::Summary =
sort {
$b->[3] <=> $a->[3] #errors
or $b->[8] <=> $a->[8] #nodes-used
or $b->[2] <=> $a->[2] #running
}
@$::Summary;
return(1);
}
1;