App::Context::ClusterController - a runtime environment of a Cluster Controller served by many Cluster Nodes
Index
Code Index:
NAME

App::Context::ClusterController - a runtime environment of a Cluster Controller served by many Cluster Nodes
SYNOPSIS

# ... official way to get a Context object ...
use App;
$context = App->context();
$config = $context->config(); # get the configuration
$config->dispatch_events(); # dispatch events
# ... alternative way (used internally) ...
use App::Context::ClusterController;
$context = App::Context::ClusterController->new();
#############################################################################
## $Id: ClusterController.pm 6785 2006-08-11 23:13:19Z spadkins $
#############################################################################
package App::Context::ClusterController;
$VERSION = (q$Revision: 6785 $ =~ /(\d[\d\.]*)/)[0]; # VERSION numbers generated by svn
use App;
use App::Context::Server;
@ISA = ( "App::Context::Server" );
use Date::Format;
use strict;
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
die "Controller must have a port defined (\$context->{options}{port})" if (!$self->{port});
$self->{num_async_events} = 0;
$self->{max_async_events_per_node} = $self->{options}{"app.context.max_async_events_per_node"} || 10;
$self->{max_async_events} = 0; # start with 0 because there are no nodes up
&App::sub_exit() if ($App::trace);
}
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$self->startup_nodes($options) if ($options->{startup});
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_begin {
my ($self) = @_;
$self->log({level=>2},"Starting Cluster Controller on $self->{host}:$self->{port}\n");
}
sub dispatch_events_end {
my ($self) = @_;
$self->log({level=>2},"Stopping Cluster Controller\n");
# nothing special yet
}
sub send_async_event_now {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
my $destination = $event->{destination};
if (! defined $destination) {
$self->log("ERROR: send_async_event_now(): node not assigned\n");
}
elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
my $node_host = $1;
my $node_port = $2;
my $args = "";
if ($event->{args}) {
$args = $self->{rpc_serializer}->serialize($event->{args});
}
my $response = $self->send_message($node_host, $node_port, "ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args", 1, undef, 1);
#print "$response = send_message($node_host, $node_port, ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args\n";
if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
my $runtime_event_token = $1;
my $destination = $event->{destination} || "local";
$self->{num_async_events}++;
$self->{node}{$destination}{num_async_events}++;
$self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
}
elsif ($response =~ /ERROR/) {
$self->set_node_down("$node_host:$node_port");
}
}
else {
$self->SUPER::send_async_event_now($event, $callback_event);
}
&App::sub_exit() if ($App::trace);
}
# $runtime_event_tokens take the following forms:
# $runtime_event_token = $pid; -- App::Context::Server::send_async_event_now() and ::finish_pid()
# $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token on the node
sub _abort_running_async_event {
&App::sub_entry if ($App::trace);
my ($self, $runtime_event_token, $event, $callback_event) = @_;
if ($runtime_event_token =~ /^[0-9]+$/) {
kill(9, $runtime_event_token);
}
elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
my $node_host = $1;
my $node_port = $2;
$self->send_async_message($node_host, $node_port, "ABORT-ASYNC-EVENT:$runtime_event_token");
}
else {
$self->log("Unable to abort running async event [$runtime_event_token] (controller)\n");
}
&App::sub_exit() if ($App::trace);
}
sub assign_event_destination {
&App::sub_entry if ($App::trace);
my ($self, $event) = @_;
my $assigned = undef;
if ($self->{num_async_events} < $self->{max_async_events}) {
# SPA 2006-07-01: I just commented this out. I shouldn't need it.
# $event->{destination} = $self->{host};
my $main_service = $self->{main_service};
if ($main_service && $main_service->can("assign_event_destination")) {
$assigned = $main_service->assign_event_destination($event, $self->{nodes}, $self->{node});
}
else {
$assigned = $self->assign_event_destination_by_round_robin($event);
}
}
&App::sub_exit($assigned) if ($App::trace);
return($assigned);
}
sub assign_event_destination_by_round_robin {
&App::sub_entry if ($App::trace);
my ($self, $event) = @_;
my $assigned = undef;
my $nodes = $self->{nodes};
if ($#$nodes > -1) {
my $node_idx = $self->{node}{ALL}{last_node_idx};
$node_idx = (defined $node_idx) ? $node_idx + 1 : 0;
$node_idx = 0 if ($node_idx > $#$nodes);
$event->{destination} = $nodes->[$node_idx];
$self->{node}{ALL}{last_node_idx} = $node_idx;
$assigned = 1;
}
&App::sub_exit($assigned) if ($App::trace);
return($assigned);
}
sub process_msg {
&App::sub_entry if ($App::trace);
my ($self, $msg) = @_;
my $verbose = $self->{verbose};
$self->log({level=>3},"process_msg: [$msg]\n");
my $return_value = $self->process_custom_msg($msg);
if (!$return_value) {
if ($msg =~ /^NODE-UP:(.*)/) {
$return_value = $self->set_node_up($1);
}
elsif ($msg =~ /^NODE-DOWN:(.*)/) {
$self->set_node_down($1);
$return_value = "OK";
}
elsif ($msg =~ /^ASYNC-EVENT-RESULTS:([^:]+):(.*)$/) {
my $runtime_event_token = $1;
my $results = $2;
if ($results ne "") {
$results = $self->{rpc_serializer}->deserialize($results);
if ($results && ref($results) eq "ARRAY" && $#$results == 0) {
$results = $results->[0];
}
}
my $async_event = $self->{running_async_event}{$runtime_event_token};
if ($async_event) {
delete $self->{running_async_event}{$runtime_event_token};
my ($event, $callback_event) = @$async_event;
my $destination = $event->{destination} || "local";
$self->{num_async_events}--;
$self->{node}{$destination}{num_async_events}--;
if ($callback_event) {
$callback_event->{args} = [] if (! $callback_event->{args});
push(@{$callback_event->{args}},
{event_token => $callback_event->{event_token}, returnval => $results, errnum => 0, errmsg => ""});
$self->send_event($callback_event);
}
}
else {
$self->log("WARNING: Unexpected Async Event Results: [$msg]\n");
}
$return_value = "OK";
}
else {
$self->log("ERROR: unknown [$msg]\n");
$return_value = "unknown [$msg]";
}
}
&App::sub_exit($return_value) if ($App::trace);
return($return_value);
}
# Can be overridden to provide customized processing.
sub process_custom_msg {
&App::sub_entry if ($App::trace);
my ($self, $msg) = @_;
my $return_value = "";
&App::sub_exit($return_value) if ($App::trace);
return($return_value);
}
sub state {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
my $state = "Cluster Controller: $self->{host}:$self->{port} procs[$self->{num_procs}/$self->{max_procs}:max] async_events[$self->{num_async_events}/$self->{max_async_events}:max/$self->{max_async_events_per_node}:per]\n[$datetime]\n";
$state .= "\n";
$state .= $self->_state();
&App::sub_exit($state) if ($App::trace);
return($state);
}
sub _state {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $state = "";
my (@nodes);
@nodes = @{$self->{nodes}} if ($self->{nodes});
$state .= "Nodes: up [@nodes] last dispatched [$self->{node}{ALL}{last_node_idx}]\n";
my ($memfree, $memtotal, $swapfree, $swaptotal);
foreach my $node (sort keys %{$self->{node}}) {
next if ($node eq "ALL");
$state .= sprintf(" %-16s %4s : %3d/%3d max : [Load:%4.1f][Mem:%5.1f%%/%7d][Swap:%5.1f%%/%7d] : [%19s]\n", $node,
$self->{node}{$node}{up} ? "UP" : "down",
$self->{node}{$node}{num_async_events} || 0,
$self->{node}{$node}{max_async_events} || 0,
$self->{node}{$node}{load} || 0,
$self->{node}{$node}{memtotal} ? 100*($self->{node}{$node}{memtotal} - $self->{node}{$node}{memfree})/$self->{node}{$node}{memtotal} : 0,
$self->{node}{$node}{memtotal} || 0,
$self->{node}{$node}{swaptotal} ? 100*($self->{node}{$node}{swaptotal} - $self->{node}{$node}{swapfree})/$self->{node}{$node}{swaptotal} : 0,
$self->{node}{$node}{swaptotal} || 0,
$self->{node}{$node}{datetime});
}
$state .= $self->SUPER::_state();
&App::sub_exit($state) if ($App::trace);
return($state);
}
sub set_node_down {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
my $runtime_event_token_prefix = $node;
$runtime_event_token_prefix =~ s/:/-/;
$self->reset_running_async_events($runtime_event_token_prefix);
$self->{node}{$node}{up} = 0;
$self->set_nodes();
&App::sub_exit() if ($App::trace);
}
sub set_node_up {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
my ($retval, $values);
if ($node =~ /^([^:]+:\d+):(.*)/) {
$node = $1;
$values = $2;
if ($values) {
foreach my $value (split(/,/, $values)) {
if ($value =~ /^([^=]+)=(.*)/) {
$self->{node}{$node}{$1} = $2;
}
}
}
}
$self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
if ($self->{node}{$node}{up}) {
$retval = "ok";
}
else {
$self->{node}{$node}{up} = 1;
$self->set_nodes();
$retval = "new";
}
&App::sub_exit($retval) if ($App::trace);
return($retval);
}
sub set_nodes {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my (@nodes);
foreach my $node (sort keys %{$self->{node}}) {
if ($self->{node}{$node}{up}) {
push(@nodes, $node);
}
}
$self->{nodes} = \@nodes;
$self->{max_async_events} = $self->{max_async_events_per_node} * ($#nodes + 1);
my $main_service = $self->{main_service};
if ($main_service && $main_service->can("capacity_change")) {
$main_service->capacity_change($self->{max_async_events}, \@nodes, $self->{node});
}
&App::sub_exit() if ($App::trace);
}
sub shutdown {
&App::sub_entry if ($App::trace);
my $self = shift;
$self->shutdown_nodes();
$self->write_node_file();
$self->SUPER::shutdown();
&App::sub_exit() if ($App::trace);
}
sub shutdown_nodes {
&App::sub_entry if ($App::trace);
my $self = shift;
foreach my $node (@{$self->{nodes}}) {
$self->send_message($node, undef, "QUIT", 0, undef, 1);
}
&App::sub_exit() if ($App::trace);
}
sub startup_nodes {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
my $startup = $options->{startup};
my ($node, $msg, $host, $port, $cmd);
if ($startup eq "1") {
$self->read_node_file();
}
else {
foreach $node (split(/,/,$startup)) {
$self->{node}{$node} = {};
}
}
my $cmd_fmt = $self->{options}{"app.context.node_start_cmd"} || "ssh -f {host} mvnode --port={port}";
foreach $node (keys %{$self->{node}}) {
$msg = $self->send_message($node, undef, "CONTROLLER-UP:", 0, undef, 1);
if ($msg =~ /ERROR:/) {
if ($node =~ /^([^:]+):([0-9]+)$/) {
$host = $1;
$port = $2;
$cmd = $cmd_fmt;
$cmd =~ s/{host}/$host/g;
$cmd =~ s/{port}/$port/g;
$self->log("Starting Node [$node]: [$cmd]\n");
system("$cmd < /dev/null &");
}
}
}
&App::sub_exit() if ($App::trace);
}
sub write_node_file {
&App::sub_entry if ($App::trace);
my $self = shift;
my $prefix = $self->{options}{prefix};
my $node_file = "$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
if (open(FILE, "> $node_file")) {
foreach my $node (@{$self->{nodes}}) {
print App::Context::ClusterController::FILE "$node\n";
}
close(App::Context::ClusterController::FILE);
}
else {
$self->log("WARNING: Can't write node file [$node_file]: $!\n");
}
&App::sub_exit() if ($App::trace);
}
sub read_node_file {
&App::sub_entry if ($App::trace);
my $self = shift;
my $prefix = $self->{options}{prefix};
my $node_file = "$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
my ($node);
if (open(FILE, "< $node_file")) {
while (<App::Context::ClusterController::FILE>) {
chomp;
if (/^[^:]+:[0-9]+$/) {
$node = $_;
# just take note of its existence. we don't know yet if it is up.
$self->{node}{$node} = {} if (!defined $self->{node}{$node});
}
}
close(App::Context::ClusterController::FILE);
}
else {
# This is not really a problem.
# $self->log("WARNING: Can't read node file [$node_file]: $!\n");
}
&App::sub_exit() if ($App::trace);
}
1;