| App-Context documentation | Contained in the App-Context distribution. |
* Signature: $self->wait_for_event($event_token)
* Param: $event_token string
* Return: void
* Throws: App::Exception
* Since: 0.01
Sample Usage:
$self->wait_for_event($event_token);
The wait_for_event() method is called when an asynchronous event has been sent and no more processing can be completed before it is done.
The user() method returns the username of the authenticated user. The special name, "guest", refers to the unauthenticated (anonymous) user.
* Signature: $username = $context->user();
* Param: void
* Return: string
* Throws: <none>
* Since: 0.01
Sample Usage:
$username = $context->user();
| App-Context documentation | Contained in the App-Context distribution. |
############################################################################# ## $Id: Server.pm 6786 2006-08-11 23:22:48Z zroberts $ ############################################################################# package App::Context::POE::Server; $VERSION = (q$Revision: 6786 $ =~ /(\d[\d\.]*)/)[0]; # VERSION numbers generated by svn use strict; use vars qw(@ISA); use warnings; use App::Context::POE; @ISA = ( "App::Context::POE" ); use POSIX ":sys_wait_h"; use Sys::Hostname; use Date::Format; use Date::Parse; use Time::HiRes qw(gettimeofday tv_interval); #sub POE::Kernel::TRACE_STATISTICS () { 1 } #sub POE::Kernel::TRACE_PROFILE () { 1 } use POE; use POE::Component::Server::SimpleHTTP; use POE::Component::IKC::Server; use HTTP::Status qw/RC_OK/; use Socket qw(INADDR_ANY); use Storable qw(lock_store lock_retrieve); sub _init { &App::sub_entry if ($App::trace); my ($self, $options) = @_; $options = {} if (!defined $options); $self->SUPER::_init($options); App->mkdir($options->{prefix}, "data", "app", "Context"); $| = 1; # autoflush STDOUT (not sure this is required) ### Configuration stuff my $host = hostname; $self->{hostname} = $host; $host =~ s/\..*//; # get rid of fully qualified domain name $self->{host} = $host; $options->{port} ||= 8080; $self->{port} = $options->{port}; $options->{http_port} ||= $options->{port}+1; $self->{poe_kernel_name} = "poe_$self->{host}_$self->{port}"; $self->{poe_kernel_http_name} = $self->{poe_kernel_name} . "_httpd"; $self->{poe_session_name} = "poe_session"; $self->{poe_kernel} = $poe_kernel; $self->{num_procs} = 0; $self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10; $self->{max_async_events} = $self->{options}{"app.context.max_async_events"} if (defined $self->{options}{"app.context.max_async_events"}); $self->{max_async_events} ||= 10; $self->{num_async_events} = 0; $self->{async_event_count} = 0; $self->{pending_async_events} = []; $self->{running_async_event} = {}; $self->{poe_profile} = $options->{poe_profile}; $self->{poe_profile} = 60 if ($self->{poe_profile} && $self->{poe_profile} == 1); $self->{poe_trace} = $options->{poe_trace}; $self->{verbose} = $options->{verbose}; $self->{poe_states} = [qw( _start _stop _default poe_sigchld poe_sigterm poe_sigignore poe_shutdown poe_alarm poe_profile ikc_register ikc_unregister ikc_shutdown poe_run_event poe_event_loop_extension poe_dispatch_pending_async_events poe_server_state poe_http_server_state poe_debug poe_http_debug poe_http_test_run poe_enqueue_async_event poe_enqueue_async_event_finished poe_remote_async_event_finished )]; $self->{poe_ikc_published_states} = [qw( poe_server_state poe_enqueue_async_event poe_remote_async_event_finished )]; ### Does nothing by default, used by ClusterController, maybe other subclasses? $self->_init2a($options); ### Do log rotation ### TODO: this should be refactored out if ($self->{options}{log_rotate}) { my $rotate_sec = $self->{options}{log_rotate}; $rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31); # interpret as days my $time = time(); my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time)); # I need a base which is midnight local time my $next_rotate_time = ((int(($time - $base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time; $self->schedule_event( tag => "context-log-rotation", method => "log_file_open", args => [0], # don't overwrite time => $next_rotate_time, interval => $rotate_sec, # and every X seconds hereafter ); } ### Does nothing by default $self->_init2b($options); &App::sub_exit() if ($App::trace); } ### Used by subclasses sub _init2a { &App::sub_entry if ($App::trace); my ($self, $options) = @_; $self->_init_poe($options); &App::sub_exit() if ($App::trace); } sub _init_poe { &App::sub_entry if ($App::trace); my ($self, $options) = @_; ### Set up a server POE::Component::IKC::Server->spawn( port => $self->{port}, name => $self->{poe_kernel_name}, ); $self->log({level=>3},"Listening for Inter-Kernel Communications on $self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug}; POE::Component::IKC::Responder->spawn(); my $session_name = $self->{poe_session_name}; POE::Component::Server::SimpleHTTP->new( 'ALIAS' => $self->{poe_kernel_http_name}, 'ADDRESS' => INADDR_ANY, 'PORT' => $self->{options}{http_port}, 'HANDLERS' => [ { 'DIR' => '/debug', 'SESSION' => $session_name, 'EVENT' => 'poe_http_debug', }, { 'DIR' => '/testrun', 'SESSION' => $session_name, 'EVENT' => 'poe_http_test_run', }, { 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' => 'poe_http_server_state', }, ], ); $self->log({level=>3},"Listening for HTTP Requests on $self->{host}:$self->{options}{http_port}\n") if $self->{poe_trace}; &App::sub_exit() if ($App::trace); } ### Used by subclasses sub _init2b { &App::sub_entry if ($App::trace); my ($self, $options) = @_; &App::sub_exit() if ($App::trace); } sub shutdown { &App::sub_entry if ($App::trace); my $self = shift; ### Shut down servers ### TODO ### Shut down children $self->shutdown_child_processes(); ### Call SUPER shutdown $self->SUPER::shutdown(); &App::sub_exit() if ($App::trace); } sub shutdown_child_processes { &App::sub_entry if ($App::trace); my $self = shift; if ($self->{proc}) { foreach my $pid (keys %{$self->{proc}}) { kill(15, $pid); } } &App::sub_exit() if ($App::trace); } sub dispatch_events { &App::sub_entry if ($App::trace); my ($self, $max_events_occurred) = @_; my $verbose = $self->{verbose}; my $options = $self->{options}; my ($role, $port, $startup, $shutdown); $self->dispatch_events_begin(); ### Set up init_objects, untouched and snagged from App::Context::POE::Server my $objects = $options->{init_objects}; my ($service_type, $name, $service); foreach my $object (split(/ *[;,]+ */, $objects)) { if ($object) { if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) { $service_type = $1; $name = $2; } else { $service_type = "SessionObject"; $name = $object; } $service = $self->service($service_type, $name); # instantiate it. that's all. $self->log({level=>3},"dispatch_events: $service_type $name instantiated [$service]\n"); $self->{main_service} = $service if (!$self->{main_service}); } } eval { ### POE Server begins here POE::Session->create( object_states => [ $self => $self->{poe_states} ] ); $poe_kernel->run(); }; if ($@) { $self->log({level=>2},$@); } $self->dispatch_events_end(); $self->shutdown(); &App::sub_exit() if ($App::trace); } sub dispatch_events_begin { &App::sub_entry if ($App::trace); my ($self) = @_; my $verbose = $self->{verbose}; $self->log({level=>3},"Starting Dispatching Events on Server on $self->{host}:$self->{port}\n"); &App::sub_exit() if ($App::trace); } sub dispatch_events_end { my ($self) = @_; my $verbose = $self->{verbose}; $self->log({level=>3},"Stopping Dispatching Events on Server.\n"); } sub state { &App::sub_entry if ($App::trace); my ($self) = @_; my $datetime = time2str("%Y-%m-%d %H:%M:%S", time()); my $state = "Server: $self->{host}:$self->{port} procs[$self->{num_procs}/$self->{max_procs}:max] async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n"; $state .= "\n"; $state .= $self->_state(); &App::sub_exit($state) if ($App::trace); return($state); } sub _state { &App::sub_entry if ($App::trace); my ($self) = @_; my $state = ""; my $options = $self->{options}; my $objects = $options->{init_objects}; my ($service_type, $name, $service); foreach my $object (split(/ *[;,]+ */, $objects)) { if ($object) { if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) { $service_type = $1; $name = $2; } else { $service_type = "SessionObject"; $name = $object; } $service = $self->service($service_type, $name); # instantiate it. that's all. if ($service->can("state")) { $state .= "\n"; $state .= $service->state(); } } } my $main_service = $self->{main_service}; $state .= "\n"; $state .= "Running Async Events:\n"; my ($async_event, $event, $callback_event, @args, $args_str, $event_token, $runtime_event_token, $str); foreach $runtime_event_token (sort keys %{$self->{running_async_event}}) { $async_event = $self->{running_async_event}{$runtime_event_token}; ($event, $callback_event) = @$async_event; $str = ""; if ($main_service && $main_service->can("format_async_event")) { $str = $main_service->format_async_event($event, $callback_event, $runtime_event_token); } if ($str) { $state .= " "; $state .= $main_service->format_async_event($event, $callback_event, $runtime_event_token); $state .= "\n"; } else { @args = (); @args = @{$event->{args}} if ($event->{args}); $args_str = join(",",@args); $state .= sprintf(" %-20s %-20s %-24s", $event->{event_token}, $runtime_event_token, "$event->{name}.$event->{method}($args_str)"); if ($callback_event) { @args = (); @args = @{$callback_event->{args}} if ($callback_event->{args}); $args_str = join(",",@args); $state .= "$callback_event->{name}.$callback_event->{method}($args_str)"; } $state .= "\n"; } } $state .= "\n"; $state .= "Pending Async Events: count [$self->{async_event_count}]\n"; foreach $async_event (@{$self->{pending_async_events}}) { ($event, $callback_event) = @$async_event; $str = ""; if ($main_service && $main_service->can("format_async_event")) { $str = $main_service->format_async_event($event, $callback_event); } if ($str) { $state .= " "; $state .= $main_service->format_async_event($event, $callback_event); $state .= "\n"; } else { @args = (); @args = @{$event->{args}} if ($event->{args}); $args_str = join(",",@args); $state .= sprintf(" %-20s %-40s", $event->{event_token}, "$event->{name}.$event->{method}($args_str)"); if ($callback_event) { @args = (); @args = @{$callback_event->{args}} if ($callback_event->{args}); $args_str = join(",",@args); $state .= " => $callback_event->{name}.$callback_event->{method}($args_str)"; } $state .= "\n"; } } $state .= "\n"; ### Only enable this in development, requires a library uncomment as well $state .= $self->_state_poe(); ### THIS DOESN'T WORK YET #$state .= $self->_state_q(); $state .= $self->SUPER::_state(); &App::sub_exit($state) if ($App::trace); return($state); } sub _state_poe { my ($self) = @_; my $state = ""; ### POE state dumping - Currently commented out because it doesn't gain us much ### in the way of visibility, and POE::API::Peek is a CPAN pain ### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY if ($self->{poe_peek}) { App->use("POE::API::Peek"); my $api = POE::API::Peek->new(); my @queue = $api->event_queue_dump(); $state .= "POE event_queue_dump\n"; my $first = 1; my $poe_stuff = [qw(ID index priority event type source destination)]; for my $item (@queue) { if ($first) { $state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n", @$poe_stuff); $first = 0; } $state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @{$item}{@$poe_stuff}); } $state .= "\n"; } return $state; } ### THIS DOESN'T WORK YET, THROWS AN EXCEPTION sub _state_q { my $self = @_; my $HOTEL_SITE_QNAME = "q-hotel_site"; my $HOTEL_COMPUTE_QNAME = "q-hotel_compute"; my $state = ""; for my $qname ("q-hotel_site", "q-hotel_compute") { $state .= "$qname\n"; ### EXCEPTION IS THROWN HERE my $q = $self->work_queue($qname); my $entries = $q->{data}; foreach my $entry (@$entries) { $state .= sprintf(" {", join("|",%$entry), "}\n"); } $state .= "\n"; } $state .= "\n"; return $state; } sub debug { &App::sub_entry if ($App::trace); my ($self) = @_; my $datetime = time2str("%Y-%m-%d %H:%M:%S", time()); my $debug = "DEBUG --- Server: $self->{host}:$self->{port} procs[$self->{num_procs}/$self->{max_procs}:max] async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n"; $debug .= "\n"; my $service = $self->{main_service}; $debug .= $service->debug(); &App::sub_exit($debug) if ($App::trace); return($debug); } # TODO: Implement this as a fork() or a context-level message to a node to fork(). # i.e. messages such as "EVENT:" and "EVENT-OK:" # Save the callback_event according to an event_token. # Then implement cleanup_pid to send the callback_event. sub send_async_event { &App::sub_entry if ($App::trace); my ($self, $event, $callback_event, $unique_event_name, $unique_event_method, $count) = @_; my $event_token = $self->new_event_token(); $event->{event_token} = $event_token; $callback_event->{event_token} = $event_token if ($callback_event); if ( ( !$unique_event_name && !$unique_event_method) || ( $unique_event_name && $unique_event_method && ($self->count_in_pending_async_events($unique_event_name, $unique_event_method) <= $count)) ) { push(@{$self->{pending_async_events}}, [ $event, $callback_event ]); $self->poe_yield($self->{poe_kernel}, "poe_dispatch_pending_async_events"); } &App::sub_exit($event_token) if ($App::trace); return($event_token); } sub count_in_pending_async_events { &App::sub_entry if ($App::trace); my ($self, $event_name, $event_method) = @_; my $count = 0; for my $pending_event (@{$self->{pending_async_events}}) { if ( ($pending_event->[0]->{name} eq $event_name) && ($pending_event->[0]->{method} eq $event_method) ) { $count++; } } &App::sub_exit($count) if ($App::trace); return($count); } sub new_event_token { &App::sub_entry if ($App::trace); my ($self) = @_; $self->{async_event_count} ++; my $event_token = "$self->{host}-$self->{port}-$self->{async_event_count}"; &App::sub_exit($event_token) if ($App::trace); return($event_token); } sub dispatch_pending_async_events { &App::sub_entry if ($App::trace); my ($self, $max_events) = @_; my $t0 = [gettimeofday]; $self->log({level=>3},"dispatch_pending_async_events enter: max_events=[$max_events]\n") if $self->{poe_trace}; $max_events ||= 9999; my $pending_async_events = $self->{pending_async_events}; my ($async_event, $assigned, $event, $in_process); my $events_occurred = 0; my $i = 0; my $event_capacity_exists = 1; my $max_i = $#$pending_async_events; while ($i <= $max_i && $events_occurred < $max_events) { $async_event = $pending_async_events->[$i]; $event = $async_event->[0]; if ($event->{destination}) { $self->send_async_event_now(@$async_event); $events_occurred ++; splice(@$pending_async_events, $i, 1); # remove $pending_async_events->[$i] $max_i--; } elsif ($event_capacity_exists) { $assigned = $self->assign_event_destination($event); if ($assigned) { $self->send_async_event_now(@$async_event); $events_occurred ++; # keep $i the same splice(@$pending_async_events, $i, 1); # remove $pending_async_events->[$i] $max_i--; } else { # [undef] no servers are eligible for assignment $event_capacity_exists = 0; # there's no sense looking at the other pending async events $i++; # look at the next one } } else { # [0] this async_event is not eligible to run $i++; # look at the next one } } $self->log({level=>3},"dispatch_pending_async_events exit: events_occurred=[$events_occurred] time=[" . sprintf("%.4f", tv_interval($t0, [gettimeofday])) . "]\n") if $self->{poe_trace}; &App::sub_exit($events_occurred) if ($App::trace); return($events_occurred); } sub assign_event_destination { &App::sub_entry if ($App::trace); my ($self, $event) = @_; my $assigned = undef; if ($self->{num_procs} < $self->{max_procs} && (!defined $self->{max_async_events} || $self->{num_async_events} < $self->{max_async_events})) { $event->{destination} = $self->{host}; $assigned = 1; } &App::sub_exit($assigned) if ($App::trace); return($assigned); } ### This is used by the node, but overridden by the controller sub send_async_event_now { &App::sub_entry if ($App::trace); my ($self, $event, $callback_event) = @_; $self->profile_start("send_async_event_now") if $self->{poe_profile}; if ($event->{destination} eq "in_process") { my $event_token = $self->send_async_event_in_process($event, $callback_event); } else { ### TODO: potentially use POE child processes instead my $pid = $self->fork(); if (!$pid) { # running in child my $exitval = 0; my (@results); eval { @results = $self->send_event($event); }; if ($@) { @results = ($@); } if ($#results > -1 && defined $results[0] && $results[0] ne "") { my $ipc_file = $self->{options}{prefix} . "/data/app/Context/$$"; ### Use Storable as IPC if ($self->{options}{poe_storable_ipc}) { my $results = (@results == 1) ? $results[0] : \@results; my $success = lock_store($results, $ipc_file); } ### Use a string value as IPC else { if (open(FILE, "> $ipc_file")) { print App::Context::POE::Server::FILE @results; close(App::Context::POE::Server::FILE); } else { $exitval = 1; } } } $self->shutdown(); $self->exit($exitval); } my $destination = $event->{destination} || "local"; $self->{num_async_events}++; $self->{node}{$destination}{num_async_events}++; my $runtime_event_token = $pid; $self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ]; } $self->profile_stop("send_async_event_now") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); }
sub wait_for_event { &App::sub_entry if ($App::trace); my ($self, $event_token) = @_; &App::sub_exit() if ($App::trace); } sub fork { &App::sub_entry if ($App::trace); my ($self) = @_; my $pid = $self->SUPER::fork(); if ($pid) { # the parent process has a new child process $self->{num_procs}++; $self->{proc}{$pid} = {}; } else { # the new child process has no sub-processes $self->{num_procs} = 0; $self->{proc} = {}; $SIG{INT} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->exit(102); }; # SIG 2 $SIG{QUIT} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->exit(103); }; # SIG 3 $SIG{TERM} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->shutdown(); $self->exit(115); }; # SIG 15 } &App::sub_exit($pid) if ($App::trace); return($pid); } sub finish_pid { &App::sub_entry if ($App::trace); my ($self, $pid, $exitval, $sig) = @_; $self->{num_procs}--; delete $self->{proc}{$pid}; my $runtime_event_token = $pid; my $async_event = $self->{running_async_event}{$runtime_event_token}; if ($async_event) { my ($event, $callback_event) = @$async_event; my $returnval = ""; my $ipc_file = $self->{options}{prefix} . "/data/app/Context/$pid"; ### Use Storable as IPC if ($self->{options}{poe_storable_ipc}) { $returnval = lock_retrieve($ipc_file); unlink($ipc_file); } ### Use a string value as IPC else { if (open(FILE, $ipc_file)) { if ($callback_event) { $returnval = join("",<App::Context::POE::Server::FILE>); } close(App::Context::POE::Server::FILE); unlink($ipc_file); } } my $destination = $event->{destination} || "local"; $self->{num_async_events}--; $self->{node}{$destination}{num_async_events}--; delete $self->{running_async_event}{$runtime_event_token}; if ($callback_event) { $callback_event->{args} = [] if (! $callback_event->{args}); my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid [sig=$sig]" : ""; push(@{$callback_event->{args}[2]{args}}, { event_token => $callback_event->{event_token}, returnval => $returnval, errnum => $exitval, errmsg => $errmsg }); $self->send_event($callback_event); } elsif ($sig == 9) { # killed without a chance to finish its work $self->finish_killed_async_event($event); } } &App::sub_exit() if ($App::trace); } sub finish_killed_async_event { &App::sub_entry if ($App::trace); my ($self, $event) = @_; &App::sub_exit() if ($App::trace); } sub find_runtime_event_token { &App::sub_entry if ($App::trace); my ($self, $event_token) = @_; my $running_async_event = $self->{running_async_event}; my ($runtime_event_token_found, $async_event); foreach my $runtime_event_token (keys %$running_async_event) { $async_event = $running_async_event->{$runtime_event_token}; ### async_event = [ event, callback_event ] if ($async_event->[1]{event_token} eq $event_token) { $runtime_event_token_found = $runtime_event_token; last; } } &App::sub_exit($runtime_event_token_found) if ($App::trace); return($runtime_event_token_found); } sub reset_running_async_events { &App::sub_entry if ($App::trace); my ($self, $runtime_event_token_prefix) = @_; $runtime_event_token_prefix =~ s/:/-/; # in case they send "localhost:8080" instead of "localhost-8080" my $running_async_event = $self->{running_async_event}; my ($runtime_event_token, $async_event); foreach $runtime_event_token (keys %$running_async_event) { $async_event = $running_async_event->{$runtime_event_token}; if ($async_event && $runtime_event_token =~ /^$runtime_event_token_prefix\b/) { $self->reset_running_async_event($runtime_event_token); } } &App::sub_exit() if ($App::trace); } sub reset_running_async_event { &App::sub_entry if ($App::trace); my ($self, $runtime_event_token) = @_; my $async_event = $self->abort_running_async_event($runtime_event_token); if ($async_event) { my $pending_async_events = $self->{pending_async_events}; unshift(@$pending_async_events, $async_event); } &App::sub_exit($async_event) if ($App::trace); return($async_event); } sub abort_async_event { &App::sub_entry if ($App::trace); my ($self, $event_token) = @_; my $pending_async_events = $self->{pending_async_events}; my ($async_event); my $aborted = 0; # first look for it in the pending list for (my $i = 0; $i <= $#$pending_async_events; $i++) { $async_event = $pending_async_events->[$i]; if ($async_event->[0]{event_token} eq $event_token) { splice(@$pending_async_events, $i, 1); $aborted = 1; last; } } # then look for it in the running list if (!$aborted) { my $runtime_event_token = $self->find_runtime_event_token($event_token); if ($runtime_event_token) { $async_event = $self->abort_running_async_event($runtime_event_token); } } &App::sub_exit($async_event) if ($App::trace); return($async_event); } sub abort_running_async_event { &App::sub_entry if ($App::trace); my ($self, $runtime_event_token) = @_; my $running_async_event = $self->{running_async_event}; my $pending_async_events = $self->{pending_async_events}; my $async_event = $running_async_event->{$runtime_event_token}; if ($async_event) { $self->{num_async_events}--; delete $self->{running_async_event}{$runtime_event_token}; $self->_abort_running_async_event($runtime_event_token, @$async_event); } &App::sub_exit($async_event) if ($App::trace); return($async_event); } # $runtime_event_tokens take the following forms: # $runtime_event_token = $pid; -- App::Context::POE::Server::send_async_event_now() and ::finish_pid() sub _abort_running_async_event { &App::sub_entry if ($App::trace); my ($self, $runtime_event_token, $event, $callback_event) = @_; if ($runtime_event_token =~ /^[0-9]+$/) { kill(15, $runtime_event_token); } else { $self->log({level=>3},"Unable to abort running async event [$runtime_event_token]\n"); } &App::sub_exit() if ($App::trace); } ############################################################################# # user() #############################################################################
sub user { &App::sub_entry if ($App::trace); my $self = shift; my $user = $self->{user} || getlogin || (getpwuid($<))[0] || "guest"; &App::sub_exit($user) if ($App::trace); $user; } ############################################################################# ### POE state routines ############################################################################# sub _default { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $state, $args ) = @_[ OBJECT, KERNEL, HEAP, ARG0, ARG1 ]; my (@args); @args = @$args if (ref($args) eq "ARRAY"); @args = ($args) if (!ref($args)); $self->log({level=>3},"POE: _default: Entered an unhandled state ($state) with args (@args)\n") if $self->{poe_trace}; &App::sub_exit() if ($App::trace); } sub _start { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; my $name = $self->{poe_session_name}; $kernel->alias_set($name); $kernel->sig(CHLD => "poe_sigchld"); $kernel->sig(HUP => "poe_sigignore"); $kernel->sig(INT => "poe_sigterm"); $kernel->sig(QUIT => "poe_sigterm"); $kernel->sig(USR1 => "poe_sigignore"); $kernel->sig(USR2 => "poe_sigignore"); $kernel->sig(TERM => "poe_sigterm"); $kernel->call( IKC => publish => $name, $self->{poe_ikc_published_states} ); $kernel->post("IKC", "monitor", "*", {register => "ikc_register", unregister => "ikc_unregister", shutdown => "ikc_shutdown"}); # don't start kicking off async events until we give the nodes a chance to register themselves $kernel->delay_set("poe_event_loop_extension", 5) if (!$self->{disable_event_loop_extensions}); $kernel->delay_set("poe_alarm", 5); $kernel->delay_set("poe_profile", $self->{poe_profile}) if ($self->{poe_profile}); &App::sub_exit() if ($App::trace); } sub _stop { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $state, $args ) = @_[ OBJECT, KERNEL, HEAP, ARG0, ARG1 ]; #sleep(1); # take a second to let child processes to die (perhaps not necessary, perhaps necessary when using POE::Wheel::Run) &App::sub_exit() if ($App::trace); } sub ikc_register { &App::sub_entry if ($App::trace); my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1]; $self->log({level=>3},"ikc_register: ($session_name)\n") if $self->{options}{poe_ikc_debug}; my ($retval); &App::sub_exit($retval) if ($App::trace); return($retval); } sub ikc_unregister { &App::sub_entry if ($App::trace); my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1]; $self->log({level=>3},"ikc_unregister: ($session_name)\n") if $self->{options}{poe_ikc_debug}; &App::sub_exit() if ($App::trace); } sub ikc_shutdown { &App::sub_entry if ($App::trace); my ($self, $kernel, $arg0, $arg1, $arg2, $arg3) = @_[OBJECT, KERNEL, ARG0, ARG1, ARG2, ARG3]; $self->log({level=>3},"ikc_shutdown: args=($arg0, $arg1, $arg2, $arg3)\n") if $self->{options}{poe_ikc_debug}; &App::sub_exit() if ($App::trace); return; } sub poe_yield { &App::sub_entry if ($App::trace); my ($self, $kernel, $state, $max_count) = @_; my $count = $self->{poe_count}{$state} || 0; $max_count ||= 1; if ($count < $max_count) { $count++; $self->{poe_count}{$state} = $count; $kernel->yield($state); } &App::sub_exit() if ($App::trace); return; } sub poe_yield_acknowledged { &App::sub_entry if ($App::trace); my ($self, $state) = @_; if ($self->{poe_count}{$state}) { $self->{poe_count}{$state}--; } else { $self->{poe_count}{$state} = 0; } &App::sub_exit() if ($App::trace); return; } sub poe_sigterm { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ]; # How do I shut down the POE kernel now and exit? # I think I need to shut down the last session and the kernel will exit. # As per http://poe.perl.org/?POE_FAQ/How_do_I_force_a_session_to_shut_down # or http://www.mail-archive.com/poe@perl.org/msg03488.html #$kernel->sig_handled(); #$kernel->yield("poe_shutdown"); # However the signals which bring me here seem to do the shutdown for me, so it's unnecessary &App::sub_exit() if ($App::trace); } sub poe_sigignore { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ]; &App::sub_exit() if ($App::trace); } sub poe_sigchld { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $pid, $status ) = @_[ OBJECT, KERNEL, HEAP, ARG1, ARG2 ]; my $exitval = $status >> 8; my $sig = $status & 255; $self->finish_pid($pid, $exitval, $sig); &App::sub_exit() if ($App::trace); } sub poe_alarm { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; $self->profile_start("poe_alarm") if $self->{poe_profile}; $self->log({level=>3},"poe_alarm enter\n") if $self->{poe_trace}; my $main_service = $self->{main_service}; ### This is mostly for the node, which needs this to spawn queued execute subrequest events ### without it, subrequests get acquired by the node never spawns children to shop it $self->poe_yield($kernel, "poe_dispatch_pending_async_events"); $self->poe_yield($kernel, "poe_event_loop_extension"); my $time = time(); my (@events); my $events_occurred = 0; my $time_of_next_event = 0; while ($time_of_next_event <= $time) { $time_of_next_event = $self->get_current_events(\@events, $time); if ($#events > -1) { foreach my $event (@events) { $kernel->yield("poe_run_event", $event); # put on the POE run queue $events_occurred++; } $time = time(); } } # every time we process an alarm, we need to set the next one my $sec_until_next_event = $time_of_next_event - $time; $self->{alarm_id} = $kernel->delay_set("poe_alarm", $sec_until_next_event); ### call some POE profile dump functions, only happens when ENV variables ### POE_TRACE_PROFILE=1 POE_TRACE_STATISTICS=1 if ($self->{poe_trace}) { my %data = $kernel->stat_getdata(); for my $key (sort keys %data) { $self->log({level=>3},"poe_alarm: poe_statistics [" . sprintf("%20s : %s", $key, $data{$key}) . "]\n"); } $kernel->stat_show_profile(); } $self->log({level=>3},"poe_alarm exit: events_occurred[$events_occurred] sec_until_next_event[$sec_until_next_event]\n") if $self->{poe_trace}; $self->profile_stop("poe_alarm") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } sub poe_profile { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; my $poe_profile = $self->{poe_profile}; if ($poe_profile) { $self->{poe_profile_id} = $kernel->delay_set("poe_profile", $poe_profile); my $profile_stats = $self->profile_stats(); $self->log("PROFILE: cumultime count avgtime mintime maxtime key\n"); my ($stats); foreach my $key (sort { $profile_stats->{$b}{cumul_time} <=> $profile_stats->{$a}{cumul_time} } keys %$profile_stats) { $stats = $profile_stats->{$key}; if ($stats->{count}) { $self->log("PROFILE: %10.4f %10d %8.4f %8.4f %8.4f %s\n", $stats->{cumul_time}, $stats->{count}, $stats->{cumul_time}/$stats->{count}, $stats->{min_time}, $stats->{max_time}, $key); } } $self->profile_clear(); } &App::sub_exit() if ($App::trace); } # NOTE: see http://poe.perl.org/?POE_FAQ/How_do_I_force_a_session_to_shut_down sub poe_shutdown { &App::sub_entry if ($App::trace); my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP ]; $self->profile_start("poe_shutdown") if $self->{poe_profile}; ### Abort all running async events for my $event_token (keys %{$self->{running_async_event}}) { ### We can't use the normal abort_async_event ### because POE and IKC are shutting down shortly $self->_abort_running_async_event($event_token); $self->log({level=>3},"poe_shutdown: abort running events : event_token=[$event_token]\n") if $self->{poe_trace}; } ### Clear your alias $kernel->alias_remove( $self->{poe_session_name} ); ### Clear all alarms you might have set $kernel->alarm_remove_all(); ### Get rid of external ref count $kernel->refcount_decrement( $session, $self->{poe_session_name} ); ### Shut down the HTTP server $kernel->post( $self->{poe_kernel_http_name}, 'SHUTDOWN'); ### Shut down POE IKC $kernel->post('IKC', 'shutdown'); $self->profile_stop("poe_shutdown") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); return; } sub poe_dispatch_pending_async_events { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $arg ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ]; $self->profile_start("poe_dispatch_pending_async_events") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_dispatch_pending_async_events enter\n") if $self->{poe_trace}; $self->poe_yield_acknowledged("poe_dispatch_pending_async_events"); my $events_occurred = $self->dispatch_pending_async_events(); $self->log({level=>3},"POE: poe_dispatch_pending_async_events exit: events_occurred[$events_occurred]\n") if $self->{poe_trace}; $self->profile_stop("poe_dispatch_pending_async_events") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } sub poe_event_loop_extension { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; $self->profile_start("poe_event_loop_extension") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_event_loop_extension enter\n") if $self->{poe_trace}; $self->poe_yield_acknowledged("poe_event_loop_extension"); my $event_loop_extensions = $self->{event_loop_extensions}; my $async_events_added = 0; if ($event_loop_extensions && $#$event_loop_extensions > -1) { my ($extension, $obj, $method, $args, $events_executed); for (my $i = 0; $i <= $#$event_loop_extensions; $i++) { $extension = $event_loop_extensions->[$i]; ($obj, $method, $args) = @$extension; $events_executed = $obj->$method(@$args); # execute extension $async_events_added += $events_executed; } } if ($async_events_added) { $self->poe_yield($kernel, "poe_dispatch_pending_async_events"); $self->poe_yield($kernel, "poe_event_loop_extension"); } $self->log({level=>3},"POE: poe_event_loop_extension exit: event_loop_extensions[" . @$event_loop_extensions . "] async_events_added[$async_events_added]\n") if $self->{poe_trace}; $self->profile_stop("poe_event_loop_extension") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } sub trigger_event_loop_extension { &App::sub_entry if ($App::trace); my ($self) = @_; $self->poe_yield($self->{poe_kernel}, "poe_event_loop_extension"); &App::sub_exit() if ($App::trace); } sub poe_run_event { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $event ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ]; $self->log({level=>3},"POE: poe_run_event enter\n") if $self->{poe_trace}; my ($event_str); my $args = $event->{args} || []; my $args_str = join(",", @$args); if ($event->{name}) { my $service_type = $event->{service_type} || "SessionObject"; $event_str = "$service_type($event->{name}).$event->{method}"; } else { $event_str = "$event->{method}"; } $self->profile_start("poe_run_event: $event_str") if $self->{poe_profile}; $self->send_event($event); $self->log({level=>3},"POE: poe_run_event exit: event[$event_str($args_str)]\n") if $self->{poe_trace}; $self->profile_stop("poe_run_event: $event_str") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } # State on Node sub poe_enqueue_async_event { &App::sub_entry if ($App::trace); my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0]; $self->profile_start("poe_enqueue_async_event") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_enqueue_async_event enter\n") if $self->{poe_trace}; my ($sender, $event, $callback_event) = @$args; my $runtime_event_token = $self->send_async_event($event, { method => "async_event_finished", args => [ $sender, $event, $callback_event ], }); $event->{event_token} = $runtime_event_token; $self->log({level=>3},"POE: poe_enqueue_async_event exit: event[$event->{name}.$event->{method} token=$event->{event_token}] runtime_event_token[$runtime_event_token]\n") if $self->{poe_trace}; $self->profile_stop("poe_enqueue_async_event") if $self->{poe_profile}; &App::sub_exit([$runtime_event_token, [$event, $callback_event]]) if ($App::trace); return([$runtime_event_token, [$event, $callback_event]]); } # State on Controller sub poe_enqueue_async_event_finished { &App::sub_entry if ($App::trace); my ($self, $kernel, $return_values) = @_[OBJECT, KERNEL, ARG0]; $self->profile_start("poe_enqueue_async_event_finished") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_enqueue_async_event_finished enter\n") if $self->{poe_trace}; my ($runtime_event_token, $async_event) = @$return_values; $self->{running_async_event}{$runtime_event_token} = $async_event; $self->log({level=>3},"POE: poe_enqueue_async_event_finished exit: event[$async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token]\n") if $self->{poe_trace}; $self->profile_stop("poe_enqueue_async_event_finished") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } # Method on Node sub async_event_finished { &App::sub_entry if ($App::trace); my ($self, $sender, $event, $callback_event) = @_; $self->profile_start("async_event_finished") if $self->{poe_profile}; my $runtime_event_token = $event->{event_token}; my $remote_server_name = "poe_${sender}"; $remote_server_name =~ s/:/_/; my $remote_session_alias = $self->{poe_session_name}; # remote is same as local my $remote_session_state = "poe_remote_async_event_finished"; my $kernel = $self->{poe_kernel}; $kernel->post("IKC", "post", "poe://$remote_server_name/$remote_session_alias/$remote_session_state", [ $runtime_event_token, $callback_event->{args} ]); $self->profile_stop("async_event_finished") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } # State on Controller sub poe_remote_async_event_finished { &App::sub_entry if ($App::trace); my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0]; $self->profile_start("poe_remote_async_event_finished") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_remote_async_event_finished enter\n") if $self->{poe_trace}; my ($runtime_event_token, $callback_args) = @$args; my $async_event = $self->{running_async_event}{$runtime_event_token}; if ($async_event) { my ($event, $callback_event) = @$async_event; $self->log({level=>3},"POE: poe_remote_async_event_finished : ($event->{name}.$event->{method} => $runtime_event_token)\n") if $self->{poe_trace}; delete $self->{running_async_event}{$runtime_event_token}; my $destination = $event->{destination} || "local"; ### Decrease the node's load value a fraction of its last system load my $system_load = $self->{node}{$destination}{system_load}; my $load = $self->{node}{$destination}{load}; my $number_node_events = $self->{node}{$destination}{num_async_events}; my $load_decrease_amount = 0; if ($number_node_events > 0) { $load_decrease_amount = $load / $number_node_events; } $self->{node}{$destination}{load} = sprintf("%.1f", $load - $load_decrease_amount); ### Keep track of our event counts $self->{num_async_events}--; $self->{node}{$destination}{num_async_events}--; if ($callback_event) { $callback_event->{args} = $callback_args; $self->send_event($callback_event); } else { $self->log({level=>3},"Server: WARNING : poe_remote_async_event_finished called without callback_event : runtime_event_token[$runtime_event_token]\n"); } } $self->log({level=>3},"POE: poe_remote_async_event_finished exit\n") if $self->{poe_trace}; $self->profile_stop("poe_remote_async_event_finished") if $self->{poe_profile}; &App::sub_exit() if ($App::trace); } sub poe_server_state { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; $self->profile_start("poe_server_state") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_server_state enter\n") if $self->{poe_trace}; my $server_state = $self->state(); $self->log({level=>3},"POE: poe_server_state exit\n") if $self->{poe_trace}; $self->profile_stop("poe_server_state") if $self->{poe_profile}; &App::sub_exit($server_state) if ($App::trace); return $server_state; } sub poe_http_server_state { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL, HEAP, ARG0, ARG1 ]; $self->profile_start("poe_http_server_state") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_http_server_state enter\n") if $self->{poe_trace}; my $server_state = $kernel->call( $self->{poe_session_name}, 'poe_server_state' ); ### Build the response. $response->code(RC_OK); $response->push_header( "Content-Type", "text/plain" ); $response->content($server_state); ### Signal that the request was handled okay. $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response ); $self->log({level=>3},"POE: poe_http_server_state exit\n") if $self->{poe_trace}; $self->profile_stop("poe_http_server_state") if $self->{poe_profile}; &App::sub_exit(RC_OK) if ($App::trace); return RC_OK; } sub poe_debug { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ]; $self->profile_start("poe_debug") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_debug enter\n") if $self->{poe_trace}; my $debug = $self->debug(); $self->log({level=>3},"POE: poe_debug exit\n") if $self->{poe_trace}; $self->profile_stop("poe_debug") if $self->{poe_profile}; &App::sub_exit($debug) if ($App::trace); return $debug; } sub poe_http_debug { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL, HEAP, ARG0, ARG1 ]; $self->profile_start("poe_http_debug") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_http_debug enter\n"); my $debug = $kernel->call( $self->{poe_session_name}, 'poe_debug' ); ### Build the response. $response->code(RC_OK); $response->push_header( "Content-Type", "text/plain" ); $response->content($debug); ### Signal that the request was handled okay. $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response ); $self->log({level=>3},"POE: poe_http_debug exit\n") if $self->{poe_trace}; $self->profile_stop("poe_http_debug") if $self->{poe_profile}; &App::sub_exit(RC_OK) if ($App::trace); return RC_OK; } sub poe_http_test_run { &App::sub_entry if ($App::trace); my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL, HEAP, ARG0, ARG1 ]; $self->profile_start("poe_http_test_run") if $self->{poe_profile}; $self->log({level=>3},"POE: poe_http_test_run enter\n") if $self->{poe_trace}; my $event = { service_type => "SessionObject", name => "mvworkd", method => "sleep2", args => [ 3 ], }; $self->send_async_event_now($event); # Build the response. $response->code(RC_OK); $response->push_header( "Content-Type", "text/plain" ); $response->content("SessionObject(mvworkd).sleep(30)"); # Signal that the request was handled okay. $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response ); $self->log({level=>3},"POE: poe_http_test_run exit\n") if $self->{poe_trace}; $self->profile_stop("poe_http_test_run") if $self->{poe_profile}; &App::sub_exit(RC_OK) if ($App::trace); return RC_OK; } 1;