/usr/local/CPAN/Sprocket/Sprocket/Base.pm


package Sprocket::Base;

use strict;
use warnings;

use Carp qw( croak );
use Sprocket qw( Common Connection Session AIO );
use POE;

use Class::Accessor::Fast;
use base qw(Class::Accessor::Fast);

our $VERSION = $Sprocket::VERSION;

our $sprocket_aio;
our $sprocket;
our $basic_logger = 'Sprocket::Logger::Basic';

__PACKAGE__->mk_accessors( qw(
    name
    uuid
    _uuid
    shutting_down
    opts
    is_forked
    is_child
    connections
    _logger
    session_id
) );

BEGIN {
    eval "use BSD::Resource";
    eval 'sub HAS_BSD_RESOURCE() { '.( $@ ? 0 : 1 ).' }';

    # can't use $basic_logger here
    eval "use Sprocket::Logger::Basic";
    eval 'sub HAS_BASIC_LOGGER() { '.( $@ ? 0 : 1 ).' }';
    
    $sprocket->register_hook( [qw(
        sprocket.connection.create
        sprocket.connection.destroy
        sprocket.plugin.add
        sprocket.plugin.remove
    )] );
}


# events sent to process_plugins
sub EVENT_NAME() { 0 }
sub SERVER()     { 1 }
sub CONNECTION() { 2 }

our @base_states = qw(
    _start
    _default
    signals
    shutdown
    begin_soft_shutdown
    _log
    events_received
    events_ready
    exception
    process_plugins
    sig_child
    time_out_check
    cleanup
    call_in_ses_context
);

sub spawn {
    my ( $class, $self, @states ) = @_;
    
    # a special session that uses a connection hash
    Sprocket::Session->create(
#       options => { trace => 1 },
        object_states => [
            $self => [ @base_states, @states ]
        ],
    );

    return $self;
}

sub new {
    my $class = shift;
    croak "$class requires an even number of parameters" if @_ % 2;
    my %opts = &adjust_params;
        
    my $uuid = new_uuid();
    
    $opts{alias} = "sprocket/$uuid" unless( defined( $opts{alias} ) and length( $opts{alias} ) );
    $opts{name} = "sprocket/$uuid" unless( defined( $opts{name} ) );
    $opts{time_out} = defined( $opts{time_out} ) ? $opts{time_out} : 30;
    $opts{log_level} = 4 unless( defined( $opts{log_level} ) );
    
    my $logger = delete $opts{logger};
    if ( defined( $logger ) && not UNIVERSAL::can( $logger, 'put' ) ) {
        warn "invalid logger: $logger (no put method), falling back to $basic_logger";
        undef $logger;
    }
    
    unless ( defined $logger ) {
        if ( !HAS_BASIC_LOGGER ) {
            warn "$basic_logger is unavailable.  Logging disabled!";
            undef $logger;
        } else {
            $logger = "$basic_logger"->new(
                parent_alias => $opts{alias},
                log_level => $opts{log_level},
            );
        }
    }
    
    my $self = bless( {
        name => $opts{name},
        opts => \%opts, 
        heaps => {},
        connections => 0,
        plugins => {},
        plugin_pri => [],
        time_out_check => 10, # time_out checker
        type => delete $opts{_type},
        uuid => $uuid,
        is_forked => 0,
        _logger => $logger,
    }, ref $class || $class );

    $self->{_uuid} = gen_uuid( $self );

    $self->check_params if ( $self->can( 'check_params' ) );

    if ( $opts{max_connections} ) {
        if ( HAS_BSD_RESOURCE ) {
            my $ret = setrlimit( RLIMIT_NOFILE, $opts{max_connections}, $opts{max_connections} );
            unless ( defined $ret && $ret ) {
                if ( $> == 0 ) {
                    $self->_log(v => 1, msg => 'Unable to set max connections limit');
                } else {
                    $self->_log(v => 1, msg => 'Need to be root to increase max connections');
                }
            }
        } else {
            $self->_log(v => 1, msg => 'Need BSD::Resource installed to increase max connections');
        }
    }

    $sprocket->add_component( $self );
    
    return $self;
}

sub _start {
    my ( $self, $kernel, $session ) = @_[ OBJECT, KERNEL, SESSION ];

    $self->session_id( $session->ID );
    
    $session->option( @{$self->{opts}->{session_options}} )
        if ( $self->{opts}->{session_options} );
    
    $kernel->alias_set( $self->{opts}->{alias} )
        if ( $self->{opts}->{alias} );


    if ( $self->{opts}->{plugins} ) {
        foreach my $t ( @{ $self->opts->{plugins} } ) {
            # convert CamelCase to camel_case
            $t = adjust_params($t);
            $self->add_plugin(
                $t->{plugin},
                $t->{priority} || 0
            );
        }
    }
    
    if ( my $ev = delete $self->opts->{event_manager} ) {
        eval "use $ev->{module}";
        if ( $@ ) {
            $self->_log(v => 1, msg => "Error loading $ev->{module} : $@");
            $self->shutdown_all;
            return;
        }
        $ev->{options} = []
            unless ( $ev->{options} && ref( $ev->{options} ) eq 'ARRAY' );
        
        $self->{event_manager} = "$ev->{module}"->new(
            @{$ev->{options}},
            parent_id => $self->session_id
        );
    }

    $self->{aio} = defined( $sprocket_aio ) ? 1 : 0;

    $self->{time_out_id} = $kernel->alarm_set( time_out_check => time() + $self->{time_out_check} )
        if ( $self->{time_out_check} );

    # TODO recheck and document
    $kernel->sig( DIE => 'exception' )
        if ( $self->{opts}->{use_exception_handler} );

    $kernel->sig( TSTP => 'signals' )
        unless( $self->opts->{no_tstp} );
    $kernel->sig( INT => 'signals' );

    $kernel->call( $session => '_startup' );
    
    return;
}

sub _default {
    my ( $self, $con, $cmd ) = @_[ OBJECT, HEAP, ARG0 ];
    
    return if ( $cmd =~ m/^_(child|parent)/ );

    return $self->process_plugins( [ $cmd, $self, $con, @_[ ARG1 .. $#_ ] ] )
        if ( UNIVERSAL::can( $con, 'ID' ) );
    
    $self->_log(v => 1, msg => "_default called, no handler for event $cmd"
        ." [$con] (the connection for this event may be gone)");
    
    return;
}

sub signals {
    my ( $self, $signal_name ) = @_[ OBJECT, ARG0 ];

    $self->_log(v => 1, msg => "Client caught SIG$signal_name");

    if ( $signal_name eq 'INT' ) {
        # TODO do something here
        # to stop ctrl-c / INT
        #$_[KERNEL]->sig_handled();
    } elsif ( $signal_name eq 'TSTP' ) {
        local $SIG{TSTP} = 'DEFAULT';
        kill( TSTP => $$ );
        $_[ KERNEL ]->sig_handled();
    }

    return 0;
}

sub sig_child {
    $_[KERNEL]->sig_handled();
}

sub new_connection {
    my $self = shift;
   
    my $con = Sprocket::Connection->new(
        parent_id => $self->session_id,
        @_
    );
    
    # TODO ugh, move this stuff out of here
    $con->event_manager( $self->{event_manager}->{alias} )
        if ( $self->{event_manager} );

    $self->{heaps}->{ $con->ID } = $con;

    my $len = $self->connections( scalar( keys %{$self->{heaps}} ) );

    $sprocket->broadcast( 'sprocket.connection.create', {
        source => $self,
        target => $con,
    } );
    
    return $con;
}

# gets a connection obj from any component
sub get_connection {
    my ( $self, $id, $norec ) = @_;
    
    if ( my $con = $self->{heaps}->{ $id } ) {
        return $con;
    }
    
    return undef if ( $norec );

    return $sprocket->get_connection( $id );
}

sub _log {
    my ( $self, %o ) = ref $_[ KERNEL ] ? @_[ OBJECT, ARG0 .. $#_ ] : @_;
    return unless defined $self->_logger;
    $self->_logger->put( $self, \%o );
}

sub cleanup {
    my ( $self, $con_id ) = @_[ OBJECT, ARG0 ];

    if ( my $con = $self->{heaps}->{ $con_id } ) {
        $self->process_plugins( [ $self->{type}.'_disconnected', $self, $con, 0 ] )
            unless ( defined $con->error );
        $self->cleanup_connection( $con );
    }
}

sub cleanup_connection {
    my ( $self, $con ) = @_;

    return unless( $con );
    
    $sprocket->broadcast( 'sprocket.connection.destroy', {
        source => $self,
        target => $con,
    } );
    
    delete $self->{heaps}->{ $con->ID };
    
    $self->connections( scalar( keys %{$self->{heaps}} ) );

    $self->shutdown()
        if ( $self->shutting_down && $self->connections <= 0 );
    
    return;
}

sub shutdown_all {
    shift;
    $sprocket->shutdown_all( @_ );
}

sub shutdown {
    unless ( $_[KERNEL] && ref $_[KERNEL] ) {
        return $poe_kernel->call( shift->session_id => shutdown => @_ );
    }
    
    my ( $self, $kernel, $type ) = @_[ OBJECT, KERNEL, ARG0 ];

    if ( lc( $type ) eq 'soft' ) {
        $self->shutting_down( $type );
        $kernel->call( $_[SESSION] => 'begin_soft_shutdown' );
        return;
    }

    foreach ( values %{$self->{heaps}} ) {
        $_->close( 1 ); # force
        $self->cleanup_connection( $_ );
    }
    $self->{heaps} = {};

    # XXX proper?
    $kernel->sig( INT => undef );
    $kernel->sig( TSTP => undef );
    $kernel->alarm_remove_all();
    $kernel->alias_remove( $self->{opts}->{alias} )
        if ( $self->{opts}->{alias} );

    # XXX remove plugins one by one?
    delete @{$self}{qw( wheel sf )};

    # if this is the last component, sprocket will shutdown aio
    $sprocket->remove_component( $self );

    return;
}

sub begin_soft_shutdown {
    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
    
    $self->_log(v => 1, msg => $self->{name}." subclass didn't define a begin_soft_shutdown event. shutting down hard!");

    $self->shutdown();

    return;
}

sub events_received {
    my $self = $_[ OBJECT ];
    $self->process_plugins( [ 'events_received', $self, @_[ HEAP, ARG0 .. $#_ ] ] );
}

sub events_ready {
    my $self = $_[ OBJECT ];
    $self->process_plugins( [ 'events_ready', $self, @_[ HEAP, ARG0 .. $#_ ] ] );
}

sub exception {
    my ( $kernel, $self, $con, $sig, $error ) = @_[ KERNEL, OBJECT, HEAP, ARG0, ARG1 ];

    # TODO check exceptions with new POE
    $self->_log(v => 1, l => 1, msg => "plugin exception handled: ($sig) : "
        .join(' | ',map { $_.':'.$error->{$_} } keys %$error ) );
    
    $con->close( 1 ) if ( UNIVERSAL::can( $con, 'close' ) );
    $kernel->sig_handled();
}

sub time_out_check {
    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];

    my $time = time();
    $self->{time_out_id} = $kernel->alarm_set( time_out_check => $time + $self->{time_out_check} );

    foreach my $con ( values %{$self->{heaps}} ) {
        next unless ( $con );
        if ( my $timeout = $con->time_out ) {
            $self->process_plugins( [ $self->{type}.'_time_out', $self, $con, $time ] )
                if ( ( $con->active_time + $timeout ) < $time );
        }
    }
}

sub add_plugin {
    my $self = shift;
    
    my $t = $self->{plugins};
   
    my ( $plugin, $pri ) = @_;
    my $uuid;
    
    if ( $plugin->can( 'uuid' ) ) {
        $uuid = $plugin->uuid;
    } else {
        warn "WARNING, plugin $plugin doesn't have a uuid,"
            ."contact the author and have them read the Sprocket::Plugin docs";
        $uuid = "bad-plugin-$plugin";
    }
    
    warn "WARNING : Overwriting existing plugin '$uuid' (You have two plugins with the same id!!)"
        if ( exists( $t->{ $uuid } ) );

    $pri ||= 0;

    my $found = 0;
    foreach ( values %$t ) {
        $found++ if ( $_->{priority} == $pri );
    }
    
    warn "WARNING: You have defined more than one plugin with the same"
        ." priority, was this intended? plugin: $plugin uuid: $uuid pri: $pri"
        if ( $found );

    $t->{ $uuid } = {
        plugin => $plugin,
        priority => $pri,
    };
    
    $plugin->parent_id( $self->session_id );

    $sprocket->broadcast( 'sprocket.plugin.add', {
        source => $self,
        target => $plugin,
    } );
    
    $plugin->handle_event( plugin_start_aio => $self => $pri );
    $plugin->handle_event( add_plugin => $self => $pri );
    
    # recalc plugin order
    @{ $self->{plugin_pri} } = sort {
        $t->{ $a }->{priority} <=> $t->{ $b }->{priority}
    } keys %$t;

    return 1;
}

sub remove_plugin {
    my $self = shift;
    my $uuid = shift;
    
    # TODO remove by name or obj
    
    my $t = $self->{plugins};
    
    my $plugin = delete $t->{ $uuid };
    return 0 unless ( $plugin );
    
    $sprocket->broadcast( 'sprocket.plugin.remove', {
        source => $self,
        target => $plugin,
    } );
    
    $plugin->{plugin}->handle_event( remove_plugin => $plugin->{priority} );
    
    # recalc plugin_pri
    @{ $self->{plugin_pri} } = sort {
        $t->{ $a }->{priority} <=> $t->{ $b }->{priority}
    } keys %$t;
    
    return 1;
}

sub process_plugins {
    my ( $self, $args, $i ) = $_[ KERNEL ] ? @_[ OBJECT, ARG0, ARG1 ] : @_;

    return unless ( @{ $self->{plugin_pri} } );
   
    my $con = $args->[ CONNECTION ];
    $con->state( $args->[ EVENT_NAME ] )
        if ( UNIVERSAL::can( $con, 'state' ) );
    
    if ( UNIVERSAL::can( $con, 'plugin' ) && ( my $t = $con->plugin ) ) {
        return $self->{plugins}->{ $t }->{plugin}->handle_event( @$args );
    } else {
        $i ||= 0;
        if ( $#{ $self->{plugin_pri} } >= $i ) {
            return if ( $self->{plugins}->{
                $self->{plugin_pri}->[ $i ]
            }->{plugin}->handle_event( @$args ) );
        }
        $i++;
        # avoid a post
        return if ( $#{ $self->{plugin_pri} } < $i );
    }
    
    # XXX call?
    #$poe_kernel->call( $self->session_id => process_plugins => $args => $i );
    $poe_kernel->yield( process_plugins => $args => $i );
}

sub get_plugin {
    my ( $self, $uuid ) = @_;

    return $self->{plugins}->{ $uuid }->{plugin}
        if ( exists( $self->{plugins}->{ $uuid } ) );

    # fall back to finding the plugin globally
    return $sprocket->get_plugin( $uuid );
}

sub resolve_plugin_uuid {
    my ( $self, $name ) = @_;

    my $plugin = grep { $name eq $_->{plugin}->name } values %{ $self->{plugins} };

    return $plugin ? $plugin->uuid : undef;
}

sub forward_plugin_by_uuid {
    my $self = shift;
    my $uuid = shift;

    unless( exists ( $self->{plugins}->{ $uuid } ) ) {
        $self->_log( v => 4, msg => 'plugin not loaded! plugin uuid: '.$uuid );
        return 0;
    }
    
    # XXX 
    my $con = $self->{heap};
    $con->plugin( $uuid );

    return $self->process_plugins( [ $con->state, $self, $con, @_ ] );
}
    
sub forward_plugin {
    my $self = shift;
    my $name = shift;

    my ($plugin) = grep { $name eq $_->{plugin}->name } values %{ $self->{plugins} };
    
    unless( $plugin ) {
        $self->_log( v => 4, msg => 'plugin not loaded! plugin: '.$name );
        return 0;
    }

    # XXX 
    my $con = $self->{heap};
    $con->plugin( $plugin->{plugin}->uuid );

    return $self->process_plugins( [ $con->state, $self, $con, @_ ] );
}

# helper used by Sprocket::Connection
sub call_in_ses_context {
    # must call in this in our session's context
    unless ( $_[KERNEL] && ref $_[KERNEL] ) {
        return $poe_kernel->call( shift->session_id => @_ );
    }
    
    my $event = $_[ ARG0 ];
    return $_[ KERNEL ]->$event( @_[ ARG1 .. $#_ ] );
}

1;