/usr/local/CPAN/Sprocket/Sprocket/Spread.pm


package Sprocket::Spread;

use strict;
use warnings;

use Sprocket qw( ChannelManager );
use POE qw( Driver::SysRW  Wheel::ReadWrite );
use Spread;
use Carp qw( croak );
use Symbol qw( gensym );

# XXX
use Data::Dumper;

our $sprocket_spread;


sub import {
    my ( $class, $args ) = @_;
    my $package = caller();

    croak "Sprocket::Spread expects its arguments in a hash ref"
        if ( $args && ref( $args ) ne 'HASH' );

    unless ( delete $args->{no_auto_export} ) {
        {
            no strict 'refs';
            *{ $package . '::sprocket_spread' } = \$sprocket_spread;
            # XXX push Spread consts into their namespace?
        }
    }

    return if ( delete $args->{no_auto_bootstrap} );

    # bootstrap
    __PACKAGE__->new( %$args );
    
    return;
}


sub new {
    my $class = shift;
    return $sprocket_spread if ( $sprocket_spread );

    my %args = &adjust_params;
    
    my $host = delete $args{host};
    my $port = delete $args{port};

    warn 'Unknown params passed to Sprocket::Spread: '.join(',', keys %args)."\n"
        if ( keys %args );

    $sprocket_spread = bless( {
        host    => $host,
        port    => $port,
        cm      => Sprocket::ChannelManager->new(),
    }, ref $class || $class);
    
    $sprocket_spread->{session_id} = 
    POE::Session->create(
        object_states => [
            $sprocket_spread => [qw(
                _start
                _stop

                input
                error

                disconnect
                connect
            )]
        ]
    )->ID();

    return $sprocket_spread;
}


sub connect {
    my ( $self, %args );
    if ( ref $_[ KERNEL ] ) {
        ( $self, %args ) = @_[ OBJECT, ARG0 .. $#_ ];
    } else {
        $self = shift;
        return $poe_kernel->call( $self->{session_id} => 'connect' => @_ );
    }
    
    return 1 if ( $self->{connected} );
    
    $args{host} = $self->{host}
        if ( $self->{host} && !$args{host} );
   
    $args{port} = $self->{port}
        if ( $self->{port} && !$args{port} );
        
    $self->{spread_name} = ( $args{port} || '4803' ) . '@' . ( $args{host} || 'localhost' );
    $self->{private_name} = 'sp-' . $$;
    # names can't be too long, because it chops it and finds that the first part is not unique
    # ARGGG!
    #$self->{private_name} = $poe_kernel->ID();

    @{$self}{qw( mbox private_group )} = Spread::connect( {
        spread_name => $self->{spread_name},
        private_name => $self->{private_name},
    } );

    if ( $@ || !defined( $self->{mbox} ) || !defined( $self->{private_group} ) ) {
        warn "Spread connect failed: $@\n";

        $self->{connected} = 0;
        return 0;
    } else {
        # XXX retry?
        my $fh = $self->{fh} = gensym();
        open( $fh, "<&=$self->{mbox}" ) or die $!;

        $self->{wheel} = POE::Wheel::ReadWrite->new(
            Handle      => $fh,
            Driver      => Sprocket::Spread::Driver->new( mbox => $self->{mbox} ),
            Filter      => Sprocket::Spread::Filter->new(),
            InputEvent  => 'input',
            ErrorEvent  => 'error',
        );
        warn "spread connected with mbox: $self->{mbox} priv: $self->{private_name} and spread_name $self->{spread_name}";

        $self->{connected} = 1;
        return 1;
    }

}

sub _start {
    warn "Spread started\n";
    $_[KERNEL]->yield( 'connect' );
}

sub _stop {
    warn "Spread stopped\n";
}

sub error {
    warn "Spread error\n";
    $_[OBJECT]->disconnect();
}

sub input {
    my ( $self, $input ) = @_[ OBJECT, ARG0 ];
    my ( $type, $sender, $groups, $mess_type, $endian, $message ) = @{$input};

    return $self->disconnect()
        unless( defined( $type ) );

    if ( $type & REGULAR_MESS ) {

        if ( defined( $endian ) && $endian ) {
            warn "Spread: endian mismatch!";
        }

        $self->deliver( 'message', $self->{private_name}, {
            type    => 'message',
            message => $message,
            group   => $sender,
            members => $groups,
            index   => $mess_type,
        } );

        return;
    }

    if ( $type & TRANSITION_MESS ) {

        $self->deliver( 'admin', $self->{private_name}, {
            'type' => 'transitional',
            'group' => $sender 
        } );

    } elsif ( $type & CAUSED_BY_LEAVE && !( $type & REG_MEMB_MESS ) ) {

        $self->deliver( 'admin', $self->{private_name}, {
            'type' => 'self_leave',
            'group' => $sender
        } );

    } elsif ( $type & REG_MEMB_MESS ) {

        my ( @gids, $nummem, $member );
        eval {
            @gids = unpack( 'IIIIa*', $message );
            ( $nummem, $member ) = delete @gids[ 3, 4 ];
        };

        if ( $@ ) {
            $self->deliver( 'error', 'receive', $@ );
            return;
        }

        if ( $type & CAUSED_BY_DISCONNECT ) {
            
            $self->deliver( 'admin', $self->{private_name}, {
                type    => 'disconnect',
                who     => $member,
                group   => $sender,
                members => $groups,
                index   => $mess_type,
                gid     => \@gids,
            } );

        } elsif ( $type & CAUSED_BY_NETWORK ) {
            
            $self->deliver( 'admin', $self->{private_name}, {
                type    => 'network',
                message => $message,
                group   => $sender,
                members => $groups,
                index   => $mess_type,
                gid     => \@gids,
            } );

        } elsif ( $type & CAUSED_BY_JOIN ) {
            
            $self->deliver( 'admin', $self->{private_name}, {
                type    => 'join',
                who     => $member,
                group   => $sender,
                members => $groups,
                index   => $mess_type,
                gid     => \@gids,
            } );

        } elsif ( $type & CAUSED_BY_LEAVE ) {

            $self->deliver( 'admin', $self->{private_name}, {
                type    => 'leave',
                who     => $member,
                group   => $sender,
                members => $groups,
                index   => $mess_type,
                gid     => \@gids,
            } );

        } else {
           $self->deliver( 'error', 'receive', 'unknown packet type' );
        }

    } else {
        $self->deliver( 'error', 'receive', 'unknown packet type' );
    }

    return;
}

sub deliver {
    my ( $self, $type ) = ( shift, shift );
    my ( $errtype, $privname, $msg );

    if ( $type eq 'error' ) {
        ( $errtype, $msg ) = @_;
        warn "error: $errtype $msg\n";
    } else {
        ( $privname, $msg ) = @_;
        $self->{cm}->deliver( $type, $privname, $msg );
    }
    
    # XXX
    print STDERR 'msg:'.$privname.' '.Data::Dumper->Dump([$msg]);
    
    return;
}

sub disconnect {
    my ( $self );
    if ( ref $_[ KERNEL ] ) {
        $self = $_[ OBJECT ];
    } else {
        $self = shift;
        return $poe_kernel->call( $self->{session_id} => 'disconnect' => @_ );
    }
    # TODO
    warn "Spread disconnect\n";
    $self->{connected} = 0;
}

sub publish {
    my ( $self, $groups, $message, $mess_type, $flag ) = @_;

    unless ( $self->{mbox} ) {
        warn "not connected when trying to publish to spread";
        return;
    }

    $flag = SAFE_MESS unless( defined( $flag ) );
    
    $mess_type = 0 unless( defined( $mess_type ) );

    $groups = $groups->[0]
        if ( ref( $groups ) && ref( $groups ) eq 'ARRAY' && $#{$groups} == 0 );
    
    require Data::Dumper;
    warn "groups:".Data::Dumper->Dump([$groups]);
    
    my $ret;
    eval {
        $ret = Spread::multicast( $self->{mbox}, $flag, $groups, $mess_type, $message );
    };

    if ( $@ || !defined( $ret ) || $ret < 0 ) {
        $self->disconnect()
            if ( defined $sperrno && $sperrno == CONNECTION_CLOSED );
        return 0;
    }

    return 1;
}

sub subscribe {
    my ( $self, $groups ) = @_;

    unless ( $self->{connected} ) {
        unless ( $self->connect() ) {
            return 0;
        }
    }

    $groups = $groups->[0]
        if ( ref( $groups ) && ref( $groups ) eq 'ARRAY' && $#{$groups} == 0 );

    my $ret;
    eval {
        $ret = Spread::join( $self->{mbox}, $groups );
    };
    if ( $@ && !$ret ) {
        $self->disconnect()
            if ( defined $sperrno && $sperrno == CONNECTION_CLOSED );
        return 0;
    }
    
    return 1;
}

sub unsubscribe {
    my ( $self, $groups ) = @_;

    unless ( $self->{connected} ) {
        unless ( $self->connect() ) {
            return 0;
        }
    }

    $groups = $groups->[0]
        if ( ref( $groups ) && ref( $groups ) eq 'ARRAY' && $#{$groups} == 0 );

    my $ret;
    eval {
        $ret = Spread::leave( $self->{mbox}, $groups );
    };
    if ( $@ && !$ret ) {
        $self->disconnect()
            if ( defined $sperrno && $sperrno == CONNECTION_CLOSED );
        return 0;
    }

    return 1;
}

sub plugin_subscribe {
    my ( $self, $plugin, $groups ) = @_;
    
    return $self->subscribe( $self->{cm}->subscribe( $plugin, $groups ) );
}

sub plugin_unsubscribe {
    my ( $self, $plugin, $groups ) = @_;
    
    return $self->unsubscribe( $self->{cm}->unsubscribe( $plugin, $groups ) );
}

sub plugin_publish {
    my ( $self, $plugin, $groups ) = ( shift, shift, shift );
    
    my $pub = $self->{cm}->grouplist( $plugin, $groups );
    require Data::Dumper;
    warn "plugin publish: $pub ".Data::Dumper->Dump([$pub]);
    return $self->publish( $pub, @_ );
}


1;


package Sprocket::Spread::Driver;

use strict;
use warnings;

sub new {
    my $class = shift;
    my %args = @_;

    my $mbox = delete $args{mbox};

    warn "bad Spread param: ".join( ',', keys %args ) if ( keys %args );

    bless( [ $mbox ], ref $class || $class );
}


sub get {
    my ( $self, $fh ) = @_;

    my ( $type, $sender, $groups, $messt, $endian, $message ) = Spread::receive( $self->[ 0 ] );

    if ( !defined( $type ) ) {
        warn "Spread: Unknown error";
        return [];
    }

    return [ $type, $sender, $groups, $messt, $endian, $message ];
}

1;

package Sprocket::Spread::Filter;

use strict;
use warnings;

sub new {
    my $class = shift;

    bless( [], ref $class || $class );
}

sub get {
    shift;
    return [ @_ ];
}

1;