POE::Component::AIO - Asynchronous Input/Output for POE


POE-Component-AIO documentation Contained in the POE-Component-AIO distribution.

Index


Code Index:

NAME

Top

POE::Component::AIO - Asynchronous Input/Output for POE

SYNOPSIS

Top

 use POE qw( Component::AIO );

 ...

 aio_read( $fh, 0, 1024, $buffer, 0, $poco_aio->callback( 'open_done' ) );

 aio_read( $fh, 0, 1024, $buffer, 0, sub {
   ...
 } );

DESCRIPTION

Top

 This component adds support for L<IO::AIO> use in POE

EXAMPLE

  use POE;

  Foo->new();

  $poe_kernel->run();

  package Foo;

  use POE qw( Component::AIO );
  use Fcntl;

  use strict;
  use warnings;

  sub new {
      my $class = shift;

      my $self = bless( {}, $class );

      POE::Session->create(
          object_states => [
              $self => [qw(
                  _start
                  _stop

                  open_done
                  read_done
              )]
          ]
      );

      return $self;
  }

  sub _start {
      my $file = '/etc/passwd';

      aio_open( $file, O_RDONLY, 0, $poco_aio->callback( 'open_done', $file ) );
  }

  sub open_done {
      my ( $self, $session, $file, $fh ) = @_[ OBJECT, SESSION, ARG0, ARG1 ];

      unless ( defined $fh ) {
          die "aio open failed on $file: $!";
      }

      my $buffer = '';
      # read 1024 bytes from $fh
      aio_read( $fh, 0, 1024, $buffer, 0, $poco_aio->postback( 'read_done', \$buffer ) );
  }

  sub read_done {
      my ( $self, $buffer, $bytes ) = @_[ OBJECT, ARG0, ARG1 ];

      unless( $bytes > 0 ) {
          die "aio read failed: $!";
      }

      print $$buffer;
  }

  sub _stop {
      $poco_aio->shutdown();
  }

NOTES

Top

This module automaticly bootstraps itself on use(). $poco_aio is imported into your namespace for easy use. Just like $poe_kernel when using POE. There are two import options available: no_auto_bootstrap and no_auto_export.

Example:

  use POE::Component::AIO { no_auto_bootstrap => 1, no_auto_export => 1 };

Also, use of this modules' callback and postback methods are completely optional. They are included for convenience, but note that they don't work the same as the postback and callback methods from POE::Session.

METHODS

Top

new()

Call this to get the singleton object, which is the same as $poco_aio. See the notes above. You do not need to call this unless you have disabled auto bootstrapping.

shutdown()

Stop the session used by this module.

callback( $event [, $params ] )

Returns a callback. Params are optional and are stacked before params passed to the callback at call time. This differs from POE::Session's callback because the params are not wrapped in array references. It uses the current session to latch the callback to. If you want to use another session, you can pass an array ref of the session id and event name as the event param.

Examples:

  $cb = $poco_aio->callback( 'foo' );
  $cb = $poco_aio->callback( 'foo', $bar );
  $cb = $poco_aio->callback( [ $session->ID(), 'foo' ] );
  $cb = $poco_aio->callback( [ $session->ID(), 'foo' ], $bar );

postback( $event [, $params ] );

See the callback method. The only difference is that it uses a post instead of call

SEE ALSO

Top

IO::AIO, POE

AUTHOR

Top

David Davis <xantus@cpan.org> http://xantus.org/

LICENSE

Top

Artistic License

COPYRIGHT AND LICENSE

Top


POE-Component-AIO documentation Contained in the POE-Component-AIO distribution.

package POE::Component::AIO;

use IO::AIO qw( poll_fileno poll_cb );
use POE;

use strict;
use warnings;
use Carp qw( croak );

our %callback_ids;
our $VERSION = '1.00';

use vars qw( $poco_aio );

sub import {
    my ( $class, $args ) = @_;
    my $package = caller();

    croak "PoCo::AIO expects its arguments in a hash ref"
        if ( $args && ref( $args ) ne 'HASH' );

    unless ( delete $args->{no_auto_export} ) {
        {
            no strict 'refs';
            *{ $package . '::poco_aio' } = \$poco_aio;
        }

        eval( "package $package; use IO::AIO qw( 2 );" );
        if ( $@ ) {
            croak "could not export IO::AIO into $package (is it installed?)";
        }
    }

    return if ( $args->{no_auto_bootstrap} );

    # bootstrap
    POE::Component::AIO->new( %$args );
    
    return;
}

sub new {
    my $class = shift;
    return $poco_aio if ( $poco_aio );

    my $self = $poco_aio = bless({
        session_id => undef,
        postback => undef,
        @_
    }, ref $class || $class );

    POE::Session->create(
        object_states =>  [
            $self => [qw(
                _start
                _stop
                poll_cb
                do_postback
                _shutdown
            )]
        ],
    );

    return $self;
}

sub _start {
    my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
    
    $self->{session_id} = $_[ SESSION ]->ID();
    
    $kernel->alias_set( "$self" );
    
    open( my $fh, "<&=".poll_fileno() ) or die "$!";
    $kernel->select_read( $fh, 'poll_cb' );
    $self->{_fh} = $fh;
   
    return unless( $self->{postback} );
  
    # XXX undocumented
    if ( ref( $self->{postback} ) eq 'ARRAY' ) {
        $kernel->post( @{$self->{postback}}, $self->{session_id}, "$self" );
    } elsif ( ref( $self->{postback} ) eq 'CODE' ) {
        $kernel->yield( 'do_postback' );
    }
  
    return;
}

sub do_postback {
    my $self = $_[ OBJECT ];
    
    $self->{postback}->( $self->{session_id}, "$self" );
    
    return;
}

sub _stop { }

sub callback {
    my ($self, $event, @etc) = @_;

    my $id;
    if ( ref( $event ) eq 'ARRAY' ) {
        ( $id, $event ) = @$event;
    } else {
        my $ses = $poe_kernel->get_active_session();
        if ( $ses ) {
            $id = $ses->ID();
        } else {
            warn 'no active session in call to PoCo::AIO::callback';
            return undef;
        }
    }

    my $callback = POE::Component::AIO::AnonCallback->new(sub {
        $poe_kernel->call( $id => $event => @etc => @_ );
    });

    $callback_ids{$callback} = $id;

    $poe_kernel->refcount_increment( $id, 'anon_event' );

    return $callback;
}

sub postback {
    my ($self, $event, @etc) = @_;

    my $id;
    if ( ref( $event ) eq 'ARRAY' ) {
        ( $id, $event ) = @$event;
    } else {
        my $ses = $poe_kernel->get_active_session();
        if ( $ses ) {
            $id = $ses->ID();
        } else {
            warn 'no active session in call to PoCo::AIO::callback';
            return undef;
        }
    }
    
    my $postback = POE::Component::AIO::AnonCallback->new(sub {
        $poe_kernel->post( $id => $event => @etc => @_ );
    });

    $callback_ids{$postback} = $id;

    $poe_kernel->refcount_increment( $id, 'anon_event' );

    return $postback;
}

sub shutdown {
    $poe_kernel->call( shift->{session_id} => '_shutdown' );
}

sub _shutdown {
    my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];

    $kernel->alias_remove( "$self" );
    $kernel->select_read( delete $self->{_fh} );

    $poco_aio = undef;

    return;
}

1;

package POE::Component::AIO::AnonCallback;

use POE;

sub new {
    my $class = shift;

    bless( shift, ref $class || $class );
}

sub DESTROY {
    my $self = shift;
    my $session_id = delete $POE::Component::AIO::callback_ids{"$self"};

    if ( defined $session_id ) {
        $poe_kernel->refcount_decrement( $session_id, 'anon_event' );
    } else {
        warn "connection callback DESTROY without session_id to refcount_decrement";
    }

    return;
}


1;