/usr/local/CPAN/RWDE/RWDE/Scheduler/Scheduler.pm
# Object to handle subscriber records (Scheduler.pm)
# Multiple processes may be run locally or on multiple hosts.
# Upon startup, does sanity checks on the tables to ensure everything
# is as expected.
# On SIGTERM signal, kills off all of its own children. Upon restart,
# these will be resumed. SIGUSR1 turns on debug messages to syslog.
package RWDE::Scheduler::Scheduler;
use strict;
use warnings;
use Error qw(:try);
use POSIX qw(:sys_wait_h setsid ceil);
use RWDE::DB::DbRegistry;
use RWDE::DB::Record;
use RWDE::Exceptions;
use RWDE::Time;
use RWDE::Scheduler::SchedulerWorker;
use RWDE::Scheduler::Pending_action;
use base qw(RWDE::DB::DefaultDB RWDE::DB::Deletable RWDE::DB::Record RWDE::Runnable);
our ($db, $table, $index, $id, @fieldnames, $ccrcontext, %fields, %static_fields, %modifiable_fields, @static_fieldnames, @modifiable_fieldnames);
use vars qw($VERSION);
$VERSION = sprintf "%d", q$Revision: 535 $ =~ /(\d+)/;
BEGIN {
$table = 'schedulers';
$id = 'scheduler_id';
$index = 'schedulers_scheduler_id_seq';
#all of the static fields present in the scheduler table
%static_fields = (
# Field => [Type, Descr]
scheduler_id => [ 'int', 'Scheduler ID' ],
scheduler_created => [ 'timestamp', 'Scheduler created on' ], # datetime
lastmod => [ 'timestamp', 'last seen' ],
);
#all of the fields allowed to be modified in the scheduler table
%modifiable_fields = (
# Field => [Type, Descr]
scheduler_terminated => [ 'timestamp', 'Scheduler created on' ], # datetime
scheduler_name => [ 'char', 'Scheduler name' ],
scheduler_pid => [ 'char', 'Scheduler process id' ],
scheduler_status => [ 'char', 'Scheduler status' ],
);
%fields = (%static_fields, %modifiable_fields);
@static_fieldnames = sort keys %static_fields;
@modifiable_fieldnames = sort keys %modifiable_fields;
@fieldnames = sort keys %fields;
}
sub fetch_by_name {
my ($self, $params) = @_;
my $scheduler;
if (exists($$params{scheduler_name})) {
$scheduler = $self->fetch_one(
{
query => 'scheduler_name = ?',
query_params => [ $$params{scheduler_name} ]
}
);
}
else {
throw RWDE::DevelException({ info => 'Scheduler::Inappropriate parameters passed to fetch_by_name' });
}
return $scheduler;
}
sub get_workers {
my ($self, $params) = @_;
return RWDE::Scheduler::SchedulerWorker->fetch({ query => 'scheduler_id = ?', query_params => [ $self->scheduler_id ] });
}
sub get_pending_actions {
my ($self, $params) = @_;
if (!$self->{Pending_actions}) {
$self->{Pending_actions} = RWDE::Scheduler::Pending_action->fetch({ query => 'scheduler_id = ?', query_params => [ $self->scheduler_id ] });
}
return $self->{Pending_actions};
}
# Get a job for a worker, if available, and this scheduler is the one with most free schedulers, take the job
# Mark the job as taken inside of the database so nobody else takes it
# This has the side-effect of temporarily locking the db, but hopefully for a very short time
sub get_next_job {
my ($self, $params) = @_;
$self->debug_info('devel', 'Scheduler::get_next_job');
my $action;
try {
#begin a transaction
transaction {
#election query
$action = RWDE::Scheduler::Pending_action->fetch_one(
{
query => "action_due < NOW() AND scheduler_id IS NULL AND (action_scheduler_name IS NULL or action_scheduler_name = ?) ORDER BY action_priority,action_due",
query_params => [ $self->scheduler_name ],
}
);
$action->scheduler_id($self->scheduler_id);
$action->update_record();
};
}
catch RWDE::DataNotFoundException with {
my $ex = shift;
#there was no work found
};
return $action;
}
sub cleanup {
my ($self, $params) = @_;
# get all the outstanding workers
foreach my $worker (@{ $self->get_workers }) {
$worker->cleanup;
}
#get all the reserved jobs (might not be assigned to workers
foreach my $job (@{ $self->get_pending_actions }) {
$self->debug_info('devel', 'Cleaning_up for job: ' . $job->action_id);
$job->cleanup;
}
$self->delete_record;
return ();
}
sub terminate_scheduler {
my ($self, $params) = @_;
$self->syslog_msg('info', 'Terminating scheduler');
$self->scheduler_status('terminated');
$self->scheduler_terminated(RWDE::Time->now());
$self->scheduler_pid(0);
$self->update_record;
return;
}
sub fork_off {
my ($self, $params) = @_;
my $job = $$params{job};
# fork a child to run the job
my $cpid = fork;
if (not defined $cpid or $cpid) {
#parent
$self->add_job($cpid, $job);
return ();
}
else {
#child
my $result = RWDE::Scheduler::SchedulerWorker->run({ job => $job, scheduler_id => $self->scheduler_id });
exit($result);
}
}
# cleanup: After child terminates, perform housekeeping
sub child_cleanup {
my ($self, $cpid, $status) = @_;
$self->debug_info('devel', "Cleanup for child $cpid, exited with $status");
$self->remove_job($cpid);
return;
}
sub print_status {
my ($self) = @_;
$self->debug_info('info', sprintf("%-10s %-10s %10s\n", 'PID', 'MSGID', 'RUNNING'));
my $active_jobs_ref = $self->{active_jobs};
foreach my $cpid (keys %{$active_jobs_ref}) {
my $alive = kill 0, $cpid;
my $job = $active_jobs_ref->{$cpid};
$self->debug_info('info', sprintf("%-10d %-10d %10s\n", $cpid, $job->action_id, $alive ? 'yes' : 'no'));
}
return;
}
sub add_job {
my ($self, $cpid, $job) = @_;
my $active_jobs_ref = $self->{active_jobs};
$active_jobs_ref->{$cpid} = $job;
$self->{workers_free} = $self->{workers_free} - 1;
return;
}
sub remove_job {
my ($self, $cpid) = @_;
my $active_jobs_ref = $self->{active_jobs};
if (exists $active_jobs_ref->{$cpid}) {
#remove the job from the active jobs
delete $active_jobs_ref->{$cpid};
#we are going back into the worker pool now
$self->{workers_free} = $self->{workers_free} + 1;
}
else {
$self->syslog_msg('warning', 'waitpid got unknown pid: ' . $cpid);
}
return;
}
sub get_active_jobs {
my ($self) = @_;
my $active_jobs = $self->{active_jobs};
return keys(%{$active_jobs});
}
sub setup {
my ($self, $params) = @_;
# Process command line options:
# -s SECONDS seconds to sleep between scans. Default 180.
# -m PROCS Max number of workers. Default 5.
# -n NAME The name for the scheduler
#################################################################################################
my $sleeptime = $$params{s} || 30;
my $workers = $$params{m} || 5;
my $name = $$params{n} || "temp";
# Before we start, make sure we tidy up the remnants of my previous run(s)
try {
my $previous_run = $self->fetch_by_name({ scheduler_name => $name });
$previous_run->cleanup();
}
catch RWDE::DataNotFoundException with {
my $ex = shift;
# there are no schedulers with the same name
};
# non-persistent fields
$self->{workers_free} = $self->{workers} = $workers;
$self->{sleeptime} = $sleeptime;
$self->{cleanup} = $self->{terminating} = $self->{printstatus} = 0; # initialize for sig handler safety.
$self->{active_jobs} = {};
# setup signal handlers
$SIG{'TERM'} = $SIG{'INT'} = sub { $self->{terminating} = 1; };
$SIG{'INFO'} = sub { $self->{printstatus} = 1; };
$SIG{'CHLD'} = sub { $self->{cleanup} = 1; };
# persistent fields
$self->scheduler_name($name);
$self->scheduler_status('running');
$self->scheduler_pid($$);
# Register ourselves, so other schedulers know we are running
$self->create_record;
return ();
}
sub start {
my ($self, $params) = @_;
# set a listener for DB notifications
RWDE::DB::DbRegistry->add_db_settings({ db => $self->get_db, db_settings => ['LISTEN pending'], });
$self->syslog_msg('info', "Starting Scheduler:: name: " . $self->scheduler_name . " sleeptime: " . $self->{sleeptime} . " workers: " . $self->{workers});
while (1) {
# check for any child processes that have terminated
if ($self->{cleanup}) { # set by SIGCHLD handler
$self->debug_info('devel', 'reaping old children');
while ((my $cpid = waitpid(-1, WNOHANG)) > 0) {
$self->child_cleanup($cpid, $?);
}
$self->{cleanup} = 0;
}
if ($self->{printstatus}) { # set by SIGUSR1 handler
$self->print_status();
$self->{printstatus} = 0;
}
if ($self->{terminating} and ($self->get_active_jobs == 0)) {
$self->terminate_scheduler();
exit(0);
}
if ($self->{workers_free} > 0 && !($self->{terminating})) {
$self->debug_info('devel', 'Number of workers: ' . $self->{workers_free});
my $action = $self->get_next_job();
if (defined $action) {
$self->fork_off({ job => $action });
}
}
else {
$self->debug_info('devel', 'No free workers');
}
}
continue {
my $notifications_ref = RWDE::DB::DbRegistry->get_db_notifications({ sleeptime => $self->{sleeptime}, db => $self->get_db() });
}
return;
}
1;