App::Context::Server - a runtime environment with a single master server and its subprocesses


App-Context documentation Contained in the App-Context distribution.

Index


Code Index:

NAME

Top

App::Context::Server - a runtime environment with a single master server and its subprocesses

SYNOPSIS

Top

   # ... official way to get a Context object ...
   use App;
   $context = App->context();
   $config = $context->config();   # get the configuration
   $config->dispatch_events();     # dispatch events

   # ... alternative way (used internally) ...
   use App::Context::Server;
   $context = App::Context::Server->new();

wait_for_event()

    * Signature: $self->wait_for_event($event_token)
    * Param:     $event_token     string
    * Return:    void
    * Throws:    App::Exception
    * Since:     0.01

    Sample Usage: 

    $self->wait_for_event($event_token);

The wait_for_event() method is called when an asynchronous event has been sent and no more processing can be completed before it is done.

user()

The user() method returns the username of the authenticated user. The special name, "guest", refers to the unauthenticated (anonymous) user.

    * Signature: $username = $context->user();
    * Param:  void
    * Return: string
    * Throws: <none>
    * Since:  0.01

    Sample Usage: 

    $username = $context->user();


App-Context documentation Contained in the App-Context distribution.
#############################################################################
## $Id: Server.pm 9819 2007-08-03 19:34:40Z spadkins $
#############################################################################

package App::Context::Server;
$VERSION = (q$Revision: 9819 $ =~ /(\d[\d\.]*)/)[0];  # VERSION numbers generated by svn

use App;
use App::Context;

@ISA = ( "App::Context" );

use Sys::Hostname;
use Socket;
use IO::Socket;
use IO::Socket::INET;
use POSIX ":sys_wait_h";
use Date::Format;
use Date::Parse;

use strict;

sub _init {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $options = {} if (!defined $options);

    $self->SUPER::_init($options);

    App->mkdir($options->{prefix}, "data", "app", "Context");

    $| = 1;  # autoflush STDOUT (not sure this is required)
    open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";

    my $host = hostname;
    $self->{hostname} = $host;
    $host =~ s/\..*//;   # get rid of fully qualified domain name
    $self->{host} = $host;
    $self->{port} = $options->{port} || 8080;

    $self->{num_procs} = 0;
    $self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
    $self->{max_async_events} = $self->{options}{"app.context.max_async_events"}
        if (defined $self->{options}{"app.context.max_async_events"});
    $self->{async_event_count} = 0;
    $self->{pending_async_events} = [];
    $self->{running_async_event} = {};

    $self->{verbose} = $options->{verbose};

    $self->_init2a($options);

    my $listen_socket = IO::Socket::INET->new(
        Proto     => "tcp",
        # LocalAddr => $self->{host},  # allow both the "hostname" and "localhost" to be used
        LocalPort => $self->{port},
        Listen    => SOMAXCONN,
        ReuseAddr => 1,
    ) || die "Unable to listen on $self->{host}:$self->{port} - $!";

    $self->{listen_socket} = $listen_socket;
    my $listen_fd = fileno($listen_socket);
    my $listen_vec;
    vec($listen_vec, $listen_fd, 1) = 1;
    $self->{listen_vec} = $listen_vec;

    $self->{rpc_serializer} = $self->serializer("server_rpc", class => "App::Serializer::Perl", indent => 0);

    if ($self->{options}{log_rotate}) {
        my $rotate_sec = $self->{options}{log_rotate};
        $rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31);
        my $time = time();
        my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time));  # I need a base which is midnight local time
        my $next_rotate_time = ((int(($time - $base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
        $self->schedule_event(
            tag => "context-log-rotation",
            method => "log_file_open",
            args => [0], # don't overwrite
            time => $next_rotate_time,
            interval => $rotate_sec,  # and every X seconds hereafter
        );
    }

    $self->_init2b($options);

    &App::sub_exit() if ($App::trace);
}

sub _init2a {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    &App::sub_exit() if ($App::trace);
}

sub _init2b {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    &App::sub_exit() if ($App::trace);
}

sub close_listen_socket {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    if ($self->{listen_socket}) {
        my $listen_socket = $self->{listen_socket};
        my $listen_fd = fileno($listen_socket);
        $self->log({level=>4},"Closed listen socket($listen_fd)\n");
        $listen_socket->close();
        $listen_socket = undef;
        delete $self->{listen_socket};
        delete $self->{listen_vec};
    }
    &App::sub_exit() if ($App::trace);
}

sub shutdown_unshareable_resources {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->close_listen_socket();
    $self->SUPER::shutdown_unshareable_resources();
    &App::sub_exit() if ($App::trace);
}

sub shutdown {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->close_listen_socket();
    $self->shutdown_child_processes();
    $self->SUPER::shutdown();
    &App::sub_exit() if ($App::trace);
}

sub shutdown_child_processes {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if ($self->{proc}) {
        foreach my $pid (keys %{$self->{proc}}) {
            kill(15, $pid);
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub DESTROY {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    $self->close_listen_socket();
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events {
    &App::sub_entry if ($App::trace);
    my ($self, $max_events_occurred) = @_;

    my ($role, $port, $startup, $shutdown);
    $self->dispatch_events_begin();

    my $verbose = $self->{verbose};

    my $options = $self->{options};
    my $objects = $options->{init_objects};
    my ($service_type, $name, $service);
    foreach my $object (split(/ *[;,]+ */, $objects)) {
        if ($object) {
            if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
                $service_type = $1;
                $name = $2;
            }
            else {
                $service_type = "SessionObject";
                $name = $object;
            }
            $service = $self->service($service_type, $name);  # instantiate it. that's all.
            $self->log({level=>3},"$service_type $name instantiated [$service]\n");
            $self->{main_service} = $service if (!$self->{main_service});
        }
    }

    my $quit = 0;

    $SIG{HUP}  = sub { $self->log({level=>2},"Caught Signal: @_\n"); };                         # SIG  1
    $SIG{INT}  = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; };   # SIG  2
    $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; };   # SIG  3
    $SIG{USR1} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };                         # SIG 10
    $SIG{USR2} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };                         # SIG 12
    $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; };   # SIG 15
    $SIG{CHLD} = "DEFAULT";                                                                     # SIG 17

    my $default_sleep_interval = 15*60;

    my $listen_socket = $self->{listen_socket};
    my $listen_vec    = $self->{listen_vec};
    my $listen_fd     = fileno($listen_socket);
    my ($connection_socket, $connection_fd, $msg, $accept_worthwhile);
    my ($event, @events);
    my $event_loop_extensions = $self->{event_loop_extensions};
    my ($extension, $obj, $method, $args, $extension_idx, $extension_events_occurred);
    my $last_extension_idx = -1;
    my ($time, $time_of_next_event, $sleep_interval);
    my $start_time = time();
    my $total_events_occurred = 0;
    my ($events_occurred);
    my ($pid, $exitval, $sig);
    my ($await_return_value, $server_close, $return_value);
    while (!$quit) {
        eval {
            $time = time();
            $events_occurred = 0;
	        # Don't start dispatching these requests until a brief wait after starting.
	        # We want all of the nodes to get a chance to register themselves.
            if (($time-$start_time >= 4) && $#{$self->{pending_async_events}} > -1) {
                $events_occurred += $self->dispatch_pending_async_events(1);
            }
            $events_occurred += $self->dispatch_finished_processes();

            # Scheduled events: Every time through the loop, we check to see
            # if it is time for a scheduled event to occur.  If so, we send
            # each of those events out.
            $self->log({level=>4},"Checking for scheduled events.\n");
            $time = time();
            $time_of_next_event = $self->get_current_events(\@events, $time);
            if ($#events > -1) {
                foreach $event (@events) {
                    $self->send_event($event);
                    $events_occurred++;
                }
                $time = time();
            }

            # Registered Extensions to the Event Loop: These are lower priority.
            # We only allow the extensions to be run in any given iteration through
            # the event loop if we have no other core event that has occurred.
            # Even then, we only allow one extension (that returns true) to run
            # in each iteration, and we check them in round-robin fashion so that
            # one extension does not get more attention than the others.
            if (!$events_occurred) {
                $extension_idx = $last_extension_idx;  # start with last executed extension
                for (my $i = 0; $i <= $#$event_loop_extensions; $i++) {
                    $extension_idx ++;   # increment it in round-robin fashion
                    $extension_idx = 0 if ($extension_idx > $#$event_loop_extensions);
                    $extension = $event_loop_extensions->[$extension_idx];
                    ($obj, $method, $args) = @$extension;
                    $extension_events_occurred = $obj->$method(@$args);  # execute extension and ...
                    if ($extension_events_occurred) {     # check return value for true
                        $last_extension_idx = $extension_idx;
                        $events_occurred += $extension_events_occurred;
                        last;
                    }
                }
            }

            if (!$events_occurred) {
                # Sleep Interval: Based on when the next event is scheduled and the current
                # time, we determine the sleep interval.
                $time = time();
                if ($time_of_next_event > 0) {
                    $sleep_interval = $time_of_next_event - $time;
                    $sleep_interval = 0 if ($sleep_interval < 0);
                }
                else {
                    $sleep_interval = $default_sleep_interval;
                }
            }
            else {
                $sleep_interval = 0;
            }

            # TODO: if (sleep_interval == 0), use select() to see if anyone is waiting, else ...
            $self->log({level=>4},"Listening on socket($listen_fd): timeout($sleep_interval)\n");
            $accept_worthwhile = 1;
            # NOTE: to understand why I do this section of code, read the 3rd paragraph under the
            # accept() method of IO::Socket (i.e. "man IO::Socket") or read it here.
            # http://perldoc.perl.org/IO/Socket.html
            if ($sleep_interval == 0) {
                if (select($listen_vec, undef, $listen_vec, 0) == 0) {  # nothing happening on the socket
                    $accept_worthwhile = 0;  # don't bother to call accept() on it
                }
            }

            # Here is the truth table for $await_return_value, $server_close
            #   $await_return_value  $server_close =         client         +        server     
            #   -------------------  -------------   ----------------------   ---------------------
            #             0                0              write/close              read/close
            #             0                1            write/read/close           read/close
            #             1                0         write/read/write/close   read/write/read/close
            #             1                1            write/read/close         read/write/close
            # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
            if ($accept_worthwhile) {
                $listen_socket->timeout($sleep_interval);
                #$SIG{CHLD}  = sub { $self->log({level=>4},"Caught Signal: @_\n"); };
                $SIG{CHLD}  = sub { };  # the point is to interrupt the accept() system call, not to do anything.
                $connection_socket = $listen_socket->accept();
                $SIG{CHLD}  = "DEFAULT";
                if ($connection_socket) {
                    $connection_fd = fileno($connection_socket);
                    $msg = $connection_socket->getline();
                    $self->log({level=>4},"Message on socket($connection_fd) [$msg]\n");
                    if ($msg) {
                        $await_return_value = ($msg =~ s/^RV-//);
                        $server_close       = ($msg =~ s/^SC-//);
                        $msg =~ s/[\015\012]+$//;
                        if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
                            $quit = 1;
                        }
                        elsif ($msg =~ /^GET/) {
                            $await_return_value = 1;
                            my $content = $self->state();
                            my $content_length = length($content);
                            $return_value = <<EOF;
HTTP/1.1 200 OK
Content-type: text/plain
Content-length: $content_length
Connection: close

$content
EOF
                        }
                        else {
                            $return_value = $self->process_msg($msg);
                            $return_value .= "\n" if ($return_value !~ /\n$/);
                        }
                        if ($await_return_value) {
                            $self->log({level=>4},"Returned on socket($connection_fd) [$return_value]\n") if ($msg !~ /^GET/);
                            $connection_socket->autoflush(1);
                            $connection_socket->print($return_value);
                            $connection_socket->getline() if (!$server_close);
                        }
                        $connection_socket->close();
                    }
                    else {
                        $connection_socket->close();
                    }
                }
            }
        };
        if ($@) {
            $self->log($@);
        }
        $total_events_occurred += $events_occurred;
        $quit = 1 if ($max_events_occurred && $total_events_occurred >= $max_events_occurred);
    }

    $self->close_listen_socket();
    $self->dispatch_events_end();
    $self->shutdown();
    &App::sub_exit() if ($App::trace);
}

sub dispatch_network_events {
    &App::sub_entry if ($App::trace);
    my ($self, $sleep_interval) = @_;

    $sleep_interval ||= 0;
    my $verbose = $self->{verbose};
    my $events_occurred = 0;

    my ($connection_socket, $msg, $accept_worthwhile);
    $self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
    my $listen_socket = $self->{listen_socket};
    my $listen_vec    = $self->{listen_vec};

    $accept_worthwhile = 1;
    if ($sleep_interval == 0) {
        # NOTE: to understand why I do this section of code, read the 3rd paragraph under the
        # accept() method of IO::Socket (i.e. "man IO::Socket") or read it here.
        # http://perldoc.perl.org/IO/Socket.html
        if (select($listen_vec, undef, $listen_vec, 0) == 0) {  # nothing happening on the socket
            $accept_worthwhile = 0;  # don't bother to call accept() on it
        }
    }

    if ($accept_worthwhile) {
        $listen_socket->timeout($sleep_interval);
        $SIG{CHLD}  = sub { };
        $connection_socket = $listen_socket->accept();
        $SIG{CHLD}  = "DEFAULT";
        if ($connection_socket) {
            $connection_socket->autoflush(1);
            $msg = <$connection_socket>;
            $msg =~ s/[\015\012]+$//;
            if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
                # $quit = 1;
            }
            else {
                $self->process_msg($msg);
            }
            $connection_socket->close();
            $events_occurred ++;
        }
    }

    &App::sub_exit($events_occurred) if ($App::trace);
    return($events_occurred);
}

sub dispatch_finished_processes {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my ($pid, $exitval, $sig);
    my $events_occurred = 0;
    while (($pid = waitpid(-1,WNOHANG)) > 0) {
        $events_occurred ++;
        $exitval = $? >> 8;
        $sig     = $? & 255;
        $self->log({level=>4},"Child $pid finished [exitval=$exitval,sig=$sig]\n");
        $self->finish_pid($pid, $exitval, $sig);
    }
    &App::sub_exit($events_occurred) if ($App::trace);
    return($events_occurred);
}

sub dispatch_events_begin {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $verbose = $self->{verbose};
    $self->log({level=>2},"Starting Server on $self->{host}:$self->{port}\n");
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events_end {
    my ($self) = @_;
    my $verbose = $self->{verbose};
    $self->log({level=>2},"Stopping Server.\n");
}

sub process_msg {
    my ($self, $msg) = @_;
    $self->log({level=>3},"process_msg: [$msg]\n");
    my $verbose = $self->{verbose};
    my $return_value = $self->process_custom_msg($msg);
    if (!$return_value) {
        $return_value = "unknown [$msg]\n";
    }
    &App::sub_exit($return_value) if ($App::trace);
    return($return_value);
}

sub process_custom_msg {
    &App::sub_entry if ($App::trace);
    my ($self, $msg) = @_;
    my $return_value = "";
    &App::sub_exit($return_value) if ($App::trace);
    return($return_value);
}

sub state {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
    my $state = "Server: $self->{host}:$self->{port}  procs[$self->{num_procs}/$self->{max_procs}:max]  async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
    $state .= "\n";
    $state .= $self->_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

sub _state {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $state = "";

    my $options = $self->{options};
    my $objects = $options->{init_objects};
    my ($service_type, $name, $service);
    foreach my $object (split(/ *[;,]+ */, $objects)) {
        if ($object) {
            if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
                $service_type = $1;
                $name = $2;
            }
            else {
                $service_type = "SessionObject";
                $name = $object;
            }
            $service = $self->service($service_type, $name);  # instantiate it. that's all.
            if ($service->can("state")) {
                $state .= "\n";
                $state .= $service->state();
            }
        }
    }

    my $main_service = $self->{main_service};

    $state .= "\n";
    $state .= "Running Async Events:\n";
    my ($async_event, $event, $callback_event, @args, $args_str, $event_token, $runtime_event_token, $str);
    foreach $runtime_event_token (sort keys %{$self->{running_async_event}}) {
        $async_event = $self->{running_async_event}{$runtime_event_token};
        ($event, $callback_event) = @$async_event;
        $str = "";
        if ($main_service && $main_service->can("format_async_event")) {
            $str = $main_service->format_async_event($event, $callback_event, $runtime_event_token);
        }
        if ($str) {
            $state .= "   ";
            $state .= $main_service->format_async_event($event, $callback_event, $runtime_event_token);
            $state .= "\n";
        }
        else {
            @args = ();
            @args = @{$event->{args}} if ($event->{args});
            $args_str = join(",",@args);
            $state .= sprintf("   %-20s %-20s %-24s", $event->{event_token}, $runtime_event_token, "$event->{name}.$event->{method}($args_str)");
            if ($callback_event) {
                @args = ();
                @args = @{$callback_event->{args}} if ($callback_event->{args});
                $args_str = join(",",@args);
                $state .= "$callback_event->{name}.$callback_event->{method}($args_str)";
            }
            $state .= "\n";
        }
    }

    $state .= "\n";
    $state .= "Pending Async Events: count [$self->{async_event_count}]\n";
    foreach $async_event (@{$self->{pending_async_events}}) {
        ($event, $callback_event) = @$async_event;
        $str = "";
        if ($main_service && $main_service->can("format_async_event")) {
            $str = $main_service->format_async_event($event, $callback_event);
        }
        if ($str) {
            $state .= "   ";
            $state .= $main_service->format_async_event($event, $callback_event);
            $state .= "\n";
        }
        else {
            @args = ();
            @args = @{$event->{args}} if ($event->{args});
            $args_str = join(",",@args);
            $state .= sprintf("   %-20s %-40s", $event->{event_token}, "$event->{name}.$event->{method}($args_str)");
            if ($callback_event) {
                @args = ();
                @args = @{$callback_event->{args}} if ($callback_event->{args});
                $args_str = join(",",@args);
                $state .= " => $callback_event->{name}.$callback_event->{method}($args_str)";
            }
            $state .= "\n";
        }
    }

    $state .= "\n";

    $state .= $self->SUPER::_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

# TODO: Implement this as a fork() or a context-level message to a node to fork().
#       i.e. messages such as "EVENT:" and "EVENT-OK:"
#       Save the callback_event according to an event_token.
#       Then implement cleanup_pid to send the callback_event.

sub send_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event, $callback_event) = @_;
    my $event_token = $self->new_event_token();
    $event->{event_token} = $event_token;
    $callback_event->{event_token} = $event_token if ($callback_event);
    push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
    &App::sub_exit($event_token) if ($App::trace);
    return($event_token);
}

sub new_event_token {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    $self->{async_event_count} ++;
    my $event_token = "$self->{host}-$self->{port}-$self->{async_event_count}";
    &App::sub_exit($event_token) if ($App::trace);
    return($event_token);
}

sub dispatch_pending_async_events {
    &App::sub_entry if ($App::trace);
    my ($self, $max_events) = @_;
    $max_events ||= 9999;
    my $pending_async_events = $self->{pending_async_events};
    my ($async_event, $assigned, $event, $in_process);
    my $events_occurred = 0;
    my $i = 0;
    my $event_capacity_exists = 1;
    my $max_i = $#$pending_async_events;
    while ($i <= $max_i && $events_occurred < $max_events) {
        $async_event = $pending_async_events->[$i];
        $event = $async_event->[0];
        if ($event->{destination}) {
            $self->send_async_event_now(@$async_event);
            $events_occurred ++;
            splice(@$pending_async_events, $i, 1);  # remove $pending_async_events->[$i]
            $max_i--;
        }
        elsif ($event_capacity_exists) {
            $assigned = $self->assign_event_destination($event);
            if ($assigned) {
                $self->send_async_event_now(@$async_event);
                $events_occurred ++;
                # keep $i the same
                splice(@$pending_async_events, $i, 1);  # remove $pending_async_events->[$i]
                $max_i--;
            }
            else {   # [undef] no servers are eligible for assignment
                $event_capacity_exists = 0;   # there's no sense looking at the other pending async events
                $i++;   # look at the next one
            }
        }
        else {      # [0] this async_event is not eligible to run
            $i++;   # look at the next one
        }
    }
    &App::sub_exit($events_occurred) if ($App::trace);
    return($events_occurred);
}

sub assign_event_destination {
    &App::sub_entry if ($App::trace);
    my ($self, $event) = @_;
    my $assigned = undef;
    if ($self->{num_procs} < $self->{max_procs} &&
        (!defined $self->{max_async_events} || $self->{num_async_events} < $self->{max_async_events})) {
        $event->{destination} = $self->{host};
        $assigned = 1;
    }
    &App::sub_exit($assigned) if ($App::trace);
    return($assigned);
}

sub send_async_event_now {
    &App::sub_entry if ($App::trace);
    my ($self, $event, $callback_event) = @_;
    if ($event->{destination} eq "in_process") {
        my $event_token = $self->send_async_event_in_process($event, $callback_event);
    }
    else {
        my $pid = $self->fork();
        if (!$pid) {   # running in child
            my $exitval = 0;
            my (@results);
            eval {
                @results = $self->send_event($event);
            };
            if ($@) {
                @results = ($@);
            }
            if ($#results > -1 && defined $results[0] && $results[0] ne "") {
                my $textfile = $self->{options}{prefix} . "/data/app/Context/$$";
                if (open(FILE, "> $textfile")) {
                    print App::Context::Server::FILE @results;
                    close(App::Context::Server::FILE);
                }
                else {
                    $exitval = 1;
                }
            }
            $self->shutdown();
            $self->exit($exitval);
        }
        my $destination = $event->{destination} || "local";
        $self->{num_async_events}++;
        $self->{node}{$destination}{num_async_events}++;
        my $runtime_event_token = $pid;
        $self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
    }
    &App::sub_exit() if ($App::trace);
}

sub wait_for_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event_token) = @_;
    &App::sub_exit() if ($App::trace);
}

sub fork {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $pid = $self->SUPER::fork();
    if ($pid) {  # the parent process has a new child process
        $self->{num_procs}++;
        $self->{proc}{$pid} = {};
    }
    else {  # the new child process has no sub-processes
        $self->{num_procs} = 0;
        $self->{proc} = {};
        $SIG{INT}  = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(102); };   # SIG  2
        $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(103); };   # SIG  3
        $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(115); };   # SIG 15
    }
    &App::sub_exit($pid) if ($App::trace);
    return($pid);
}

sub finish_pid {
    &App::sub_entry if ($App::trace);
    my ($self, $pid, $exitval, $sig) = @_;

    $self->{num_procs}--;
    delete $self->{proc}{$pid};

    my $runtime_event_token = $pid;
    my $async_event = $self->{running_async_event}{$runtime_event_token};
    if ($async_event) {
        my ($event, $callback_event) = @$async_event;
        my $returnval = "";
        my $returnvalfile = $self->{options}{prefix} . "/data/app/Context/$pid";
        if (open(FILE, $returnvalfile)) {
            if ($callback_event) {
                $returnval = join("",<App::Context::Server::FILE>);
            }
            close(App::Context::Server::FILE);
            unlink($returnvalfile);
        }

        my $destination = $event->{destination} || "local";
        $self->{num_async_events}--;
        $self->{node}{$destination}{num_async_events}--;
        delete $self->{running_async_event}{$runtime_event_token};

        if ($callback_event) {
            $callback_event->{args} = [] if (! $callback_event->{args});
            my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid [sig=$sig]" : "";
            push(@{$callback_event->{args}},
                {event_token => $callback_event->{event_token}, returnval => $returnval, errnum => $exitval, errmsg => $errmsg});
            $self->send_event($callback_event);
        }
        elsif ($sig == 9) {  # killed without a chance to finish its work
            $self->finish_killed_async_event($event);
        }
    }

    &App::sub_exit() if ($App::trace);
}

sub finish_killed_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event) = @_;
    &App::sub_exit() if ($App::trace);
}

sub find_runtime_event_token {
    &App::sub_entry if ($App::trace);
    my ($self, $event_token) = @_;
    my $running_async_event = $self->{running_async_event};
    my ($runtime_event_token_found, $async_event);
    foreach my $runtime_event_token (keys %$running_async_event) {
        $async_event = $running_async_event->{$runtime_event_token};
        if ($async_event->[0]{event_token} eq $event_token) {
            $runtime_event_token_found = $runtime_event_token;
            last;
        }
    }
    &App::sub_exit($runtime_event_token_found) if ($App::trace);
    return($runtime_event_token_found);
}

sub reset_running_async_events {
    &App::sub_entry if ($App::trace);
    my ($self, $runtime_event_token_prefix) = @_;
    $runtime_event_token_prefix =~ s/:/-/;  # in case they send "localhost:8080" instead of "localhost-8080"
    my $running_async_event = $self->{running_async_event};
    my ($runtime_event_token, $async_event);
    foreach $runtime_event_token (keys %$running_async_event) {
        $async_event = $running_async_event->{$runtime_event_token};
        if ($async_event && $runtime_event_token =~ /^$runtime_event_token_prefix\b/) {
            $self->reset_running_async_event($runtime_event_token);
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub reset_running_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $runtime_event_token) = @_;
    my $async_event = $self->abort_running_async_event($runtime_event_token);
    if ($async_event) {
        my $pending_async_events = $self->{pending_async_events};
        unshift(@$pending_async_events, $async_event);
    }
    &App::sub_exit($async_event) if ($App::trace);
    return($async_event);
}

sub abort_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event_token) = @_;
    my $pending_async_events = $self->{pending_async_events};
    my ($async_event);
    my $aborted = 0;
    # first look for it in the pending list
    for (my $i = 0; $i <= $#$pending_async_events; $i++) {
        $async_event = $pending_async_events->[$i];
        if ($async_event->[0]{event_token} eq $event_token) {
            splice(@$pending_async_events, $i, 1);
            $aborted = 1;
            last;
        }
    }
    # then look for it in the running list
    if (!$aborted) {
        my $runtime_event_token = $self->find_runtime_event_token($event_token);
        if ($runtime_event_token) {
            $async_event = $self->abort_running_async_event($runtime_event_token);
        }
    }
    &App::sub_exit($async_event) if ($App::trace);
    return($async_event);
}

sub abort_running_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $runtime_event_token) = @_;
    my $running_async_event  = $self->{running_async_event};
    my $pending_async_events = $self->{pending_async_events};
    my $async_event = $running_async_event->{$runtime_event_token};
    if ($async_event) {
        $self->{num_async_events}--;
        delete $self->{running_async_event}{$runtime_event_token};
        unshift(@$pending_async_events, $async_event);
        $self->_abort_running_async_event($runtime_event_token, @$async_event);
    }
    &App::sub_exit($async_event) if ($App::trace);
    return($async_event);
}

# $runtime_event_tokens take the following forms:
#    $runtime_event_token = $pid; -- App::Context::Server::send_async_event_now() and ::finish_pid()
sub _abort_running_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $runtime_event_token, $event, $callback_event) = @_;
    if ($runtime_event_token =~ /^[0-9]+$/) {
        kill(15, $runtime_event_token);
    }
    else {
        $self->log("Unable to abort running async event [$runtime_event_token]\n");
    }
    &App::sub_exit() if ($App::trace);
}

#############################################################################
# user()
#############################################################################

sub user {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    my $user = $self->{user} || getlogin || (getpwuid($<))[0] || "guest";
    &App::sub_exit($user) if ($App::trace);
    $user;
}

1;