/usr/local/CPAN/Sprocket/Sprocket/Base.pm
package Sprocket::Base;
use strict;
use warnings;
use Carp qw( croak );
use Sprocket qw( Common Connection Session AIO );
use POE;
use Class::Accessor::Fast;
use base qw(Class::Accessor::Fast);
our $VERSION = $Sprocket::VERSION;
our $sprocket_aio;
our $sprocket;
our $basic_logger = 'Sprocket::Logger::Basic';
__PACKAGE__->mk_accessors( qw(
name
uuid
_uuid
shutting_down
opts
is_forked
is_child
connections
_logger
session_id
) );
BEGIN {
eval "use BSD::Resource";
eval 'sub HAS_BSD_RESOURCE() { '.( $@ ? 0 : 1 ).' }';
# can't use $basic_logger here
eval "use Sprocket::Logger::Basic";
eval 'sub HAS_BASIC_LOGGER() { '.( $@ ? 0 : 1 ).' }';
$sprocket->register_hook( [qw(
sprocket.connection.create
sprocket.connection.destroy
sprocket.plugin.add
sprocket.plugin.remove
)] );
}
# events sent to process_plugins
sub EVENT_NAME() { 0 }
sub SERVER() { 1 }
sub CONNECTION() { 2 }
our @base_states = qw(
_start
_default
signals
shutdown
begin_soft_shutdown
_log
events_received
events_ready
exception
process_plugins
sig_child
time_out_check
cleanup
call_in_ses_context
);
sub spawn {
my ( $class, $self, @states ) = @_;
# a special session that uses a connection hash
Sprocket::Session->create(
# options => { trace => 1 },
object_states => [
$self => [ @base_states, @states ]
],
);
return $self;
}
sub new {
my $class = shift;
croak "$class requires an even number of parameters" if @_ % 2;
my %opts = &adjust_params;
my $uuid = new_uuid();
$opts{alias} = "sprocket/$uuid" unless( defined( $opts{alias} ) and length( $opts{alias} ) );
$opts{name} = "sprocket/$uuid" unless( defined( $opts{name} ) );
$opts{time_out} = defined( $opts{time_out} ) ? $opts{time_out} : 30;
$opts{log_level} = 4 unless( defined( $opts{log_level} ) );
my $logger = delete $opts{logger};
if ( defined( $logger ) && not UNIVERSAL::can( $logger, 'put' ) ) {
warn "invalid logger: $logger (no put method), falling back to $basic_logger";
undef $logger;
}
unless ( defined $logger ) {
if ( !HAS_BASIC_LOGGER ) {
warn "$basic_logger is unavailable. Logging disabled!";
undef $logger;
} else {
$logger = "$basic_logger"->new(
parent_alias => $opts{alias},
log_level => $opts{log_level},
);
}
}
my $self = bless( {
name => $opts{name},
opts => \%opts,
heaps => {},
connections => 0,
plugins => {},
plugin_pri => [],
time_out_check => 10, # time_out checker
type => delete $opts{_type},
uuid => $uuid,
is_forked => 0,
_logger => $logger,
}, ref $class || $class );
$self->{_uuid} = gen_uuid( $self );
$self->check_params if ( $self->can( 'check_params' ) );
if ( $opts{max_connections} ) {
if ( HAS_BSD_RESOURCE ) {
my $ret = setrlimit( RLIMIT_NOFILE, $opts{max_connections}, $opts{max_connections} );
unless ( defined $ret && $ret ) {
if ( $> == 0 ) {
$self->_log(v => 1, msg => 'Unable to set max connections limit');
} else {
$self->_log(v => 1, msg => 'Need to be root to increase max connections');
}
}
} else {
$self->_log(v => 1, msg => 'Need BSD::Resource installed to increase max connections');
}
}
$sprocket->add_component( $self );
return $self;
}
sub _start {
my ( $self, $kernel, $session ) = @_[ OBJECT, KERNEL, SESSION ];
$self->session_id( $session->ID );
$session->option( @{$self->{opts}->{session_options}} )
if ( $self->{opts}->{session_options} );
$kernel->alias_set( $self->{opts}->{alias} )
if ( $self->{opts}->{alias} );
if ( $self->{opts}->{plugins} ) {
foreach my $t ( @{ $self->opts->{plugins} } ) {
# convert CamelCase to camel_case
$t = adjust_params($t);
$self->add_plugin(
$t->{plugin},
$t->{priority} || 0
);
}
}
if ( my $ev = delete $self->opts->{event_manager} ) {
eval "use $ev->{module}";
if ( $@ ) {
$self->_log(v => 1, msg => "Error loading $ev->{module} : $@");
$self->shutdown_all;
return;
}
$ev->{options} = []
unless ( $ev->{options} && ref( $ev->{options} ) eq 'ARRAY' );
$self->{event_manager} = "$ev->{module}"->new(
@{$ev->{options}},
parent_id => $self->session_id
);
}
$self->{aio} = defined( $sprocket_aio ) ? 1 : 0;
$self->{time_out_id} = $kernel->alarm_set( time_out_check => time() + $self->{time_out_check} )
if ( $self->{time_out_check} );
# TODO recheck and document
$kernel->sig( DIE => 'exception' )
if ( $self->{opts}->{use_exception_handler} );
$kernel->sig( TSTP => 'signals' )
unless( $self->opts->{no_tstp} );
$kernel->sig( INT => 'signals' );
$kernel->call( $session => '_startup' );
return;
}
sub _default {
my ( $self, $con, $cmd ) = @_[ OBJECT, HEAP, ARG0 ];
return if ( $cmd =~ m/^_(child|parent)/ );
return $self->process_plugins( [ $cmd, $self, $con, @_[ ARG1 .. $#_ ] ] )
if ( UNIVERSAL::can( $con, 'ID' ) );
$self->_log(v => 1, msg => "_default called, no handler for event $cmd"
." [$con] (the connection for this event may be gone)");
return;
}
sub signals {
my ( $self, $signal_name ) = @_[ OBJECT, ARG0 ];
$self->_log(v => 1, msg => "Client caught SIG$signal_name");
if ( $signal_name eq 'INT' ) {
# TODO do something here
# to stop ctrl-c / INT
#$_[KERNEL]->sig_handled();
} elsif ( $signal_name eq 'TSTP' ) {
local $SIG{TSTP} = 'DEFAULT';
kill( TSTP => $$ );
$_[ KERNEL ]->sig_handled();
}
return 0;
}
sub sig_child {
$_[KERNEL]->sig_handled();
}
sub new_connection {
my $self = shift;
my $con = Sprocket::Connection->new(
parent_id => $self->session_id,
@_
);
# TODO ugh, move this stuff out of here
$con->event_manager( $self->{event_manager}->{alias} )
if ( $self->{event_manager} );
$self->{heaps}->{ $con->ID } = $con;
my $len = $self->connections( scalar( keys %{$self->{heaps}} ) );
$sprocket->broadcast( 'sprocket.connection.create', {
source => $self,
target => $con,
} );
return $con;
}
# gets a connection obj from any component
sub get_connection {
my ( $self, $id, $norec ) = @_;
if ( my $con = $self->{heaps}->{ $id } ) {
return $con;
}
return undef if ( $norec );
return $sprocket->get_connection( $id );
}
sub _log {
my ( $self, %o ) = ref $_[ KERNEL ] ? @_[ OBJECT, ARG0 .. $#_ ] : @_;
return unless defined $self->_logger;
$self->_logger->put( $self, \%o );
}
sub cleanup {
my ( $self, $con_id ) = @_[ OBJECT, ARG0 ];
if ( my $con = $self->{heaps}->{ $con_id } ) {
$self->process_plugins( [ $self->{type}.'_disconnected', $self, $con, 0 ] )
unless ( defined $con->error );
$self->cleanup_connection( $con );
}
}
sub cleanup_connection {
my ( $self, $con ) = @_;
return unless( $con );
$sprocket->broadcast( 'sprocket.connection.destroy', {
source => $self,
target => $con,
} );
delete $self->{heaps}->{ $con->ID };
$self->connections( scalar( keys %{$self->{heaps}} ) );
$self->shutdown()
if ( $self->shutting_down && $self->connections <= 0 );
return;
}
sub shutdown_all {
shift;
$sprocket->shutdown_all( @_ );
}
sub shutdown {
unless ( $_[KERNEL] && ref $_[KERNEL] ) {
return $poe_kernel->call( shift->session_id => shutdown => @_ );
}
my ( $self, $kernel, $type ) = @_[ OBJECT, KERNEL, ARG0 ];
if ( lc( $type ) eq 'soft' ) {
$self->shutting_down( $type );
$kernel->call( $_[SESSION] => 'begin_soft_shutdown' );
return;
}
foreach ( values %{$self->{heaps}} ) {
$_->close( 1 ); # force
$self->cleanup_connection( $_ );
}
$self->{heaps} = {};
# XXX proper?
$kernel->sig( INT => undef );
$kernel->sig( TSTP => undef );
$kernel->alarm_remove_all();
$kernel->alias_remove( $self->{opts}->{alias} )
if ( $self->{opts}->{alias} );
# XXX remove plugins one by one?
delete @{$self}{qw( wheel sf )};
# if this is the last component, sprocket will shutdown aio
$sprocket->remove_component( $self );
return;
}
sub begin_soft_shutdown {
my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
$self->_log(v => 1, msg => $self->{name}." subclass didn't define a begin_soft_shutdown event. shutting down hard!");
$self->shutdown();
return;
}
sub events_received {
my $self = $_[ OBJECT ];
$self->process_plugins( [ 'events_received', $self, @_[ HEAP, ARG0 .. $#_ ] ] );
}
sub events_ready {
my $self = $_[ OBJECT ];
$self->process_plugins( [ 'events_ready', $self, @_[ HEAP, ARG0 .. $#_ ] ] );
}
sub exception {
my ( $kernel, $self, $con, $sig, $error ) = @_[ KERNEL, OBJECT, HEAP, ARG0, ARG1 ];
# TODO check exceptions with new POE
$self->_log(v => 1, l => 1, msg => "plugin exception handled: ($sig) : "
.join(' | ',map { $_.':'.$error->{$_} } keys %$error ) );
$con->close( 1 ) if ( UNIVERSAL::can( $con, 'close' ) );
$kernel->sig_handled();
}
sub time_out_check {
my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
my $time = time();
$self->{time_out_id} = $kernel->alarm_set( time_out_check => $time + $self->{time_out_check} );
foreach my $con ( values %{$self->{heaps}} ) {
next unless ( $con );
if ( my $timeout = $con->time_out ) {
$self->process_plugins( [ $self->{type}.'_time_out', $self, $con, $time ] )
if ( ( $con->active_time + $timeout ) < $time );
}
}
}
sub add_plugin {
my $self = shift;
my $t = $self->{plugins};
my ( $plugin, $pri ) = @_;
my $uuid;
if ( $plugin->can( 'uuid' ) ) {
$uuid = $plugin->uuid;
} else {
warn "WARNING, plugin $plugin doesn't have a uuid,"
."contact the author and have them read the Sprocket::Plugin docs";
$uuid = "bad-plugin-$plugin";
}
warn "WARNING : Overwriting existing plugin '$uuid' (You have two plugins with the same id!!)"
if ( exists( $t->{ $uuid } ) );
$pri ||= 0;
my $found = 0;
foreach ( values %$t ) {
$found++ if ( $_->{priority} == $pri );
}
warn "WARNING: You have defined more than one plugin with the same"
." priority, was this intended? plugin: $plugin uuid: $uuid pri: $pri"
if ( $found );
$t->{ $uuid } = {
plugin => $plugin,
priority => $pri,
};
$plugin->parent_id( $self->session_id );
$sprocket->broadcast( 'sprocket.plugin.add', {
source => $self,
target => $plugin,
} );
$plugin->handle_event( plugin_start_aio => $self => $pri );
$plugin->handle_event( add_plugin => $self => $pri );
# recalc plugin order
@{ $self->{plugin_pri} } = sort {
$t->{ $a }->{priority} <=> $t->{ $b }->{priority}
} keys %$t;
return 1;
}
sub remove_plugin {
my $self = shift;
my $uuid = shift;
# TODO remove by name or obj
my $t = $self->{plugins};
my $plugin = delete $t->{ $uuid };
return 0 unless ( $plugin );
$sprocket->broadcast( 'sprocket.plugin.remove', {
source => $self,
target => $plugin,
} );
$plugin->{plugin}->handle_event( remove_plugin => $plugin->{priority} );
# recalc plugin_pri
@{ $self->{plugin_pri} } = sort {
$t->{ $a }->{priority} <=> $t->{ $b }->{priority}
} keys %$t;
return 1;
}
sub process_plugins {
my ( $self, $args, $i ) = $_[ KERNEL ] ? @_[ OBJECT, ARG0, ARG1 ] : @_;
return unless ( @{ $self->{plugin_pri} } );
my $con = $args->[ CONNECTION ];
$con->state( $args->[ EVENT_NAME ] )
if ( UNIVERSAL::can( $con, 'state' ) );
if ( UNIVERSAL::can( $con, 'plugin' ) && ( my $t = $con->plugin ) ) {
return $self->{plugins}->{ $t }->{plugin}->handle_event( @$args );
} else {
$i ||= 0;
if ( $#{ $self->{plugin_pri} } >= $i ) {
return if ( $self->{plugins}->{
$self->{plugin_pri}->[ $i ]
}->{plugin}->handle_event( @$args ) );
}
$i++;
# avoid a post
return if ( $#{ $self->{plugin_pri} } < $i );
}
# XXX call?
#$poe_kernel->call( $self->session_id => process_plugins => $args => $i );
$poe_kernel->yield( process_plugins => $args => $i );
}
sub get_plugin {
my ( $self, $uuid ) = @_;
return $self->{plugins}->{ $uuid }->{plugin}
if ( exists( $self->{plugins}->{ $uuid } ) );
# fall back to finding the plugin globally
return $sprocket->get_plugin( $uuid );
}
sub resolve_plugin_uuid {
my ( $self, $name ) = @_;
my $plugin = grep { $name eq $_->{plugin}->name } values %{ $self->{plugins} };
return $plugin ? $plugin->uuid : undef;
}
sub forward_plugin_by_uuid {
my $self = shift;
my $uuid = shift;
unless( exists ( $self->{plugins}->{ $uuid } ) ) {
$self->_log( v => 4, msg => 'plugin not loaded! plugin uuid: '.$uuid );
return 0;
}
# XXX
my $con = $self->{heap};
$con->plugin( $uuid );
return $self->process_plugins( [ $con->state, $self, $con, @_ ] );
}
sub forward_plugin {
my $self = shift;
my $name = shift;
my ($plugin) = grep { $name eq $_->{plugin}->name } values %{ $self->{plugins} };
unless( $plugin ) {
$self->_log( v => 4, msg => 'plugin not loaded! plugin: '.$name );
return 0;
}
# XXX
my $con = $self->{heap};
$con->plugin( $plugin->{plugin}->uuid );
return $self->process_plugins( [ $con->state, $self, $con, @_ ] );
}
# helper used by Sprocket::Connection
sub call_in_ses_context {
# must call in this in our session's context
unless ( $_[KERNEL] && ref $_[KERNEL] ) {
return $poe_kernel->call( shift->session_id => @_ );
}
my $event = $_[ ARG0 ];
return $_[ KERNEL ]->$event( @_[ ARG1 .. $#_ ] );
}
1;