POE::Component::Supervisor - Erlang inspired babysitting


POE-Component-Supervisor documentation Contained in the POE-Component-Supervisor distribution.

Index


Code Index:

NAME

Top

POE::Component::Supervisor - Erlang inspired babysitting

SYNOPSIS

Top

    use POE;

    use POE::Component::Supervisor;

    POE::Component::Supervisor->new(
        children => [
            POE::Component::Supervisor::Supervised::Proc->new( ... ),    # monitor UNIX procs
            POE::Component::Supervisor::Supervised::Session->new( ... ), # monitor POE sessions
        ],
    );

DESCRIPTION

Top

This is a port of the Erlang process supervisor (http://www.erlang.org/doc/design_principles/sup_princ.html).

This will monitor instances of children, restarting them as necessary should they exit.

Restart throttling is not yet implemented but planned for a future version.

OBJECT HIERARCHY

Top

A supervisor has any number of supervised child descriptors, which in turn instantiate handles for each spawned instance of the child.

Supervised children are essential object factories for handles. They spawn new instances of the child they describe by instantiating handles.

A handle will do the actual management of the child, sending events to the supervisor when the child is terminated, and also facilitate explicit termination of the child's instance.

Based on its restart_policy the supervisor may order other handles to also stop, and ask various child descriptors to respawn certain children.

POE REFERENCE COUNTING

Top

When no more children are being supervised the POE reference count for the supervisor's session will go down to zero. If no alias is set up then the session will close. If an alias is set and no other sessions are doing anything the session will also close. See POE, and MooseX::POE::Aliased.

ATTRIBUTES

Top

alias

See MooseX::POE::Aliased.

This defaults to undef, unlike the role, so that a supervisor session will close automatically once it has no more children to supervise.

use_logger_singleton

See MooseX::LogDispatch.

Changes the default to true, to allow usage of an already configured Log::Dispatch::Config setup.

restart_policy

This is one of one, all or rest.

If the POE::Component::Supervisor::Supervised object describing the child deems the child should be restarted, then the value of this attribute controls which other children to also restart.

one denotes that only the child which died will be restarted.

rest will cause all the children appearing after the child which died in the children array to be restarted, but not the children preceding it.

all will restart all the children.

children

This is the array of children being supervised.

It is a required argument.

Note that the array reference will be modified if new children are introduced and when children are removed (even during normal shutdown), so pass in a copy of an array if this is a problem for you.

The order of the children matters, see restart_policy.

METHODS

Top

Spawn and supervise the children described by the descriptors in @children.

Stop the specified children.

If no arguments are provided all the children are stopped.

Delegates to $child by calling should_restart in POE::Component::Supervisor::Supervised.

Currently always returns false. In the future restart throttling will be implemented using this method.

If true is returned an error will be thrown by the supervisor.

EVENTS

Top

The POE event api is currently internal. All manipulation of the supervisor object should be done using the api described in METHODS.

SEE ALSO

Top

http://www.erlang.org/doc/design_principles/sup_princ.html

AUTHOR

Top

Stevan Little <stevan@iinteractive.com>

Yuval Kogman <yuval.kogman@iinteractive.com<gt>

COPYRIGHT

Top


POE-Component-Supervisor documentation Contained in the POE-Component-Supervisor distribution.

#!/usr/bin/perl

package POE::Component::Supervisor;
use MooseX::POE;

use Moose::Util::TypeConstraints;

use POE::Component::Supervisor::Supervised;
use POE::Component::Supervisor::Handle;

use Devel::PartialDump;

use Hash::Util::FieldHash::Compat qw(idhash);

use namespace::clean -except => 'meta';

our $VERSION = "0.07";

with qw(
    POE::Component::Supervisor::Interface
    MooseX::POE::Aliased
    POE::Component::Supervisor::LogDispatch
);

sub run {
    my $self = shift->new(@_);
    $poe_kernel->run;
}

# by default when all the children die we exit as well
sub _build_alias { undef }

has restart_policy => (
    isa => enum(__PACKAGE__ . "::RestartPolicy" => qw(one all rest)),
    is  => "rw",
    default => "one",
);

has children => (
    isa => "ArrayRef",
    init_arg => undef,
    is  => "ro",
    auto_deref => 1,
    default => sub { [] },
);

has _children_tmp => (
    isa => "ArrayRef",
    is  => "rw",
    init_arg => undef,
    clearer => "_clear_children_tmp",
);

has _last_child_id => (
    isa => "Int",
    is  => "rw",
    default => 0,
);

sub _next_child_id {
    my $self = shift;
    $self->_last_child_id( $self->_last_child_id + 1 );
}

has _children_hash => (
    isa => "HashRef",
    is  => "ro",
    init_arg => undef,
    default  => sub { idhash my %h },
);

sub _child_id {
    my ( $self, $child ) = @_;

    if ( defined ( my $id = $self->_children_hash->{$child}{id} ) ) {
        return $id;
    } else {
        confess "unknown child $child";
    }
}

sub _child_handle {
    my ( $self, $child ) = @_;
    $self->_children_hash->{$child}{handle};
}

# used to track which children are currently being stopped for the purpose of
# restarting, because we first have to stop everything and then we start them again
has _stopping_for_restart => (
    isa => "HashRef",
    is  => "ro",
    init_arg => undef,
    default  => sub { idhash my %h },
);

# when children that are being restarted have stopped they are tracked here
# when the last child is stopped this collection of children will be started based on the order of 'children'
has _pending_restart => (
    isa => "HashRef",
    is  => "ro",
    init_arg => undef,
    default  => sub { idhash my %h },
);

sub START {
    my ( $self, $kernel ) = @_[OBJECT, KERNEL];

    $kernel->sig( DIE => "exception" );

    $self->logger->info("starting supervisor $self in process $$");

    if ( my $children = $self->_children_tmp ) {
        $self->_clear_children_tmp;
        $self->start(@$children);
    }
}

sub STOP {
    my $self = $_[OBJECT];

    $self->logger->info("stopping supervisor $self in process $$");
}

event exception => sub {
    my ( $self, $error_info ) = @_[OBJECT, ARG1];

    $self->logger->error("Error in supervisor child session, event $error_info->{event} of $error_info->{dest_session}: $error_info->{error_str}, sent from $error_info->{source_session} state $error_info->{from_state} at $error_info->{file} line $error_info->{line}");
};

sub _register_child {
    my ( $self, $new_child ) = @_;

    $self->logger->debug("registering child $new_child");

    $self->_children_hash->{$new_child} ||= do {
        push @{ $self->children }, $new_child;
        $self->_new_child_registration($new_child);
    }
}

sub _new_child_registration {
    my ( $self, $new_child ) = @_;
    return { id => $self->_next_child_id };
}

sub _unregister_child {
    my ( $self, $child ) = @_;

    $self->logger->debug("unregistering child $child");

    if ( delete $self->_children_hash->{$child} ) {
        @{ $self->children } = grep { $_ != $child } @{ $self->children };
    }

}

sub BUILD {
    my ( $self, $params ) = @_;

    if ( my $children = $params->{children} ) {
        $self->_children_tmp($children);
    }
}

sub start {
    my ( $self, @children ) = @_;

    foreach my $child ( @children ) {
        next if $self->_children_hash->{$child};
        $self->_register_child($child);
        $self->yield( spawn => $child );
    }
}

sub stop {
    my ( $self, @children ) = @_;

    @children = reverse $self->children unless @children;

    foreach my $child ( @children ) {
        if ( my $entry = $self->_children_hash->{$child} ) {
            if ( my $handle = $entry->{handle} ) {
                $entry->{stopping} = 1;
                $entry->{handle}->stop;

                # remove it from the children list, so that it isn't restarted due to a
                # 'rest' or 'all' policy because of some other childs' exit
                # _unregister_child will eventually try to do this too, but that's OK
                # because it *should* do it if the child has been stopped unexpectedly
                # and is temporary/transient
                @{ $self->children } = grep { $_ != $child } @{ $self->children };
            } else {
                # it's already dead, just delete it
                $self->_unregister_child($child);
            }
        }
    }
}

sub notify_spawned {
    my ( $self, @args ) = @_;
    $self->yield( spawned => @args );
}

sub notify_stopped {
    my ( $self, @args ) = @_;
    $self->yield( stopped => @args );
}

event spawned => sub {
    my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];

    $kernel->refcount_increment( $self->get_session_id(), "handles" );

    $self->logger->info("child " . $self->_child_id($child) . " spawned " . Devel::PartialDump::dump(@args));
};

event spawn => sub {
    my ( $self, $child ) = @_[OBJECT, ARG0 ];

    $self->logger->debug("instructing child " . $self->_child_id($child) . " to spawn");

    $self->_children_hash->{$child}{handle} = $child->spawn( supervisor => $self );
};

event respawn => sub {
    my ( $self, $child ) = @_[OBJECT, ARG0];

    $self->_pending_restart->{$child} = $child;

    if ( scalar keys %{ $self->_stopping_for_restart } ) {
        # if we're waiting on more children to exit, just mark this child as ready to restart
        $self->logger->debug("child " . $self->_child_id($child) . " respawn postponed, other children still not stopped");
    } else {
        # otherwise we can now restart all the children which are ready to be restarted
        $self->logger->debug("no more unstopped children, ready to respawn");
        my @children_to_restart = grep { defined } delete @{ $self->_pending_restart }{ $self->children };

        foreach my $child ( @children_to_restart ) {
            $self->yield( _respawn => $child );
        }
    }
};

event _respawn => sub {
    my ( $self, $child ) = @_[OBJECT, ARG0];

    $self->logger->info("respawning child " . $self->_child_id($child));
    $self->_children_hash->{$child}{handle} = $child->respawn( supervisor => $self );
};

event stopped => sub {
    my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];

    $kernel->refcount_decrement( $self->get_session_id(), "handles" );

    delete $self->_children_hash->{$child}{handle};

    if ( $self->_children_hash->{$child}{stopping} ) {
        $self->call( stopped_per_request => $child, @args );
    } elsif ( my $restarting = delete $self->_stopping_for_restart->{$child} ) {
        $self->call( stopped_for_restart => $child, @args );
    } else {
        $self->call( stopped_unexpectedly => $child, @args );
    }
};

event stopped_per_request => sub {
    my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];

    $self->logger->info("child " . $self->_child_id($child) . " exited as requested");

    $self->_unregister_child($child);
    $kernel->refcount_decrement( $self->get_session_id(), "children" );
};

event stopped_unexpectedly => sub {
    my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];

    my $id = $self->_child_id($child);

    $self->logger->notice("child $id exited on its own");

    if ( $self->should_restart_child($child, @args) ) {
        if ( $self->child_exit_is_fatal($child, @args) ) {
            $self->logger->error("child $id exit is fatal, raising error");
            $self->yield( fatal_exit => $child, @args );
        } else {
            my $policy = $self->restart_policy;
            $self->logger->info("child $id will be restarted, restart policy is $policy");
            $self->yield( "restart_$policy" => $child, @args );
        }
    } else {
        $self->logger->info("child $id won't be restarted");
        $self->_unregister_child($child);
        $kernel->refcount_decrement( $self->get_session_id(), "children" );
    }
};

event stopped_for_restart => sub {
    my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];

    $self->logger->info("child " . $self->_child_id($child) . " exited for restart as requested");

    $self->yield( respawn => $child );
};

event restart_one => sub {
    my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];

    if ( my $handle = $self->_child_handle($child) ) {
        $self->logger->info("stopping child " . $self->_child_id($child) . " for restart");
        $self->_stopping_for_restart->{$child} = 1;
        $handle->stop_for_restart();
    } else {
        $self->logger->debug("child " . $self->_child_id($child) . " is already dead, marking for respawn");
        $self->yield( respawn => $child );
    }
};

event restart_all => sub {
    my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];

    foreach my $child ( reverse $self->children ) {
        $self->yield( restart_one => $child );
    }
};

event restart_rest => sub {
    my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];

    my @children = $self->children;

    shift @children while $children[0] != $child;

    foreach my $child ( reverse @children ) {
        $self->yield( restart_one => $child );
    }
};

event fatal_exit => sub {
    # stop everything
    # exit with error
    # FIXME how do we exit abstractly? yield to some callback?

    die "supervisor seppuku";
};

sub child_exit_is_fatal {
    my ( $self, $child, @args ) = @_;

    # check if the child exceeded the maximal number of restarts by looking at
    # $self->_children_hash->{$child}{token_bucket}, the child descriptor's
    # restart policy (if its transient or not, etc), $args{exit_code} being an
    # error, etc

    return 0;
}

sub should_restart_child {
    my ( $self, $child, @args ) = @_;

    $child->should_restart(@args);
}

event _child => sub {
    my ( $self, $verb, $child ) = @_[OBJECT, ARG0, ARG1];

    $self->logger->debug("supervisor $self had child event for $child: $verb");
};

__PACKAGE__

__END__