| IO-Lambda documentation | Contained in the IO-Lambda distribution. |
IO::Lambda::Loop::POE - POE event loop for IO::Lambda
This is the implementation of event loop for IO::Lambda based on POE event
loop. The module is not intended for direct use.
Note that POE is also a proxy event loop itself, and depending on the
actual event loop module it uses, functionality of IO::Lambda might be
limited.
use POE; use IO::Lambda::Loop::POE; # explicitly select the event loop module use IO::Lambda;
or
setenv IO_LAMBDA_DEBUG=loop=POE
Threads and forks seem to be not playing nicely together with POE.
| IO-Lambda documentation | Contained in the IO-Lambda distribution. |
# $Id: POE.pm,v 1.6 2010/03/26 20:35:03 dk Exp $ package IO::Lambda::Loop::POE; use strict; use warnings; use POE; use IO::Lambda qw(:constants :dev); use Time::HiRes qw(time); use vars qw( @timers $deadline $alarm_id $kr_active_session $kr_last_session $session $alarm_id %filenos $event $DEBUG @mask @modes $global_destruction ); END { $global_destruction = 1 } $DEBUG = $IO::Lambda::DEBUG{poe} || 0; $mask[IO_READ] = 'select_read'; $mask[IO_WRITE] = 'select_write'; $mask[IO_EXCEPTION] = 'select_expedite'; $modes[POE::Kernel::MODE_RD()] = IO_READ; $modes[POE::Kernel::MODE_WR()] = IO_WRITE; $modes[POE::Kernel::MODE_EX()] = IO_EXCEPTION; IO::Lambda::Loop::default('POE'); $kr_active_session = $poe_kernel-> [POE::Kernel::KR_ACTIVE_SESSION()]; # this shuts up warnings - we don't rely on run() anyway ${$poe_kernel-> [POE::Kernel::KR_RUN()]} |= POE::Kernel::KR_RUN_CALLED(); sub new { bless {} , shift } sub empty { ( @timers + scalar keys %filenos) ? 0 : 1 } # creates new session when needed (or when about to be needed), and shuts one down as well sub reset_session { my $expect_to_have_a_new_session = shift; if ( not @timers and not keys %filenos and not $expect_to_have_a_new_session ) { if ( $session ) { warn "session "._o($session)." abandoned\n" if $DEBUG; undef $session; } } elsif ( not defined $session) { return if $global_destruction; $session = POE::Session-> create( inline_states => { _start => sub {}, timeout => \&on_tick, io => \&on_io, _stop => sub { warn "session "._o($_[SESSION])." stopped\n" if $DEBUG; if (not $session and (@timers or keys %filenos)) { require Data::Dumper; die "session stopped, but we have some unhandled data:\n", Data::Dumper->Dump(\@timers, \%filenos) } }, } ); warn "session "._o($session). " started\n" if $DEBUG; } } # returns new deadline - we queue timeouts internally, without bothering POE sub deadline { return undef unless @timers; my $new_deadline = $timers[0]-> [WATCH_DEADLINE]; for ( map { $_-> [WATCH_DEADLINE] } @timers) { next if $_ >= $new_deadline; $new_deadline = $_; } return $new_deadline; } # we use a single session, single alarm here for all IO::Lambda timeouts sub reset_timer { my $old = $deadline; if ( $global_destruction) { undef $alarm_id; return; } $deadline = deadline(); unless ( defined $deadline) { if (defined $alarm_id) { warn "stop timer[$alarm_id]\n" if $DEBUG; $poe_kernel-> alarm_remove($alarm_id); undef $alarm_id; } return; } return if defined $old and $old == $deadline; if ( defined $old and defined $alarm_id) { my $ok = $poe_kernel-> alarm_adjust($alarm_id, $deadline - $old); warn "alarm_adjust($alarm_id, ", $deadline-$old, "):$!" unless $ok; warn "reset timer[$alarm_id] $old -> $deadline\n" if $DEBUG; } else { warn "something wrong, existing alarm ID with no old timeout" if defined $alarm_id; $alarm_id = $poe_kernel-> alarm_set(timeout => $deadline); warn "alarm_set($deadline):$!" unless defined $alarm_id; warn "start timer[$alarm_id] $deadline\n" if $DEBUG; } } sub reset_mask { return if $global_destruction; my $f = shift; my $mask = 0; for my $flags ( map { $_-> [WATCH_IO_FLAGS]} @{ $f-> {rec}} ) { $mask |= $_ for grep { $flags & $_ } IO_READ, IO_WRITE, IO_EXCEPTION; } return if $mask == $f-> {mask}; for ( IO_READ, IO_WRITE, IO_EXCEPTION) { my $meth = $mask[$_]; if ( $mask & $_ ) { next if $f-> {mask} & $_; $poe_kernel-> $meth( $f-> {handle}, 'io', $f); warn "$meth charged for ", fileno($f-> {handle}), "\n" if $DEBUG; } elsif ( $f-> {mask} & $_) { $poe_kernel-> $meth( $f-> {handle} ); warn "$meth cleared for ", fileno($f-> {handle}), "\n" if $DEBUG; } } $f-> {mask} = $mask; } sub purge_filenos(&) { my $sub = shift; my @kill; while ( my ( $fileno, $r) = each %filenos) { my @xr = grep &$sub, @{$r->{rec}}; next if @xr == @{$r->{rec}}; $r-> {rec} = \@xr; push @kill, $fileno unless @xr; reset_mask( $r); } warn "delete objects for @kill\n" if $DEBUG and @kill; delete @filenos{@kill}; } sub on_io { $event++; my ($handle, $mode, $obj) = @_[ARG0,ARG1,ARG2]; my $mask = $modes[$mode]; my $fileno = fileno($handle); warn "event $fileno/$mode=$mask\n" if $DEBUG; unless ( $obj-> {mask} & $mask) { warn "handled already, don't propagate\n" if $DEBUG; return; } my (@ev, @rx); # remove records that were listening for that event for ( @{ $obj-> {rec} } ) { if ( $_-> [WATCH_IO_FLAGS] & $mask) { push @ev, $_; } else { push @rx, $_; } } $obj-> {rec} = \@rx; # next see if there are records waiting for more than one mode, - # build a vector and check them using select() if ( $obj-> {mask} & ~$mask) { my @vec = ('', '', ''); vec( $vec[0], $fileno, 1) = 1 if $obj-> {mask} & IO_READ; vec( $vec[1], $fileno, 1) = 1 if $obj-> {mask} & IO_WRITE; vec( $vec[2], $fileno, 1) = 1 if $obj-> {mask} & IO_EXCEPTION; select( $vec[0], $vec[1], $vec[2], 0 ); $mask |= IO_READ if vec($vec[0], $fileno, 1); $mask |= IO_WRITE if vec($vec[1], $fileno, 1); $mask |= IO_EXCEPTION if vec($vec[2], $fileno, 1); } reset_mask($obj); unless ($obj-> {mask}) { delete $filenos{$fileno}; warn "object deleted for $fileno\n" if $DEBUG; } # kill associated timers my %timer = (map { $_ => undef } grep { defined $_-> [WATCH_DEADLINE] } @ev); if ( scalar keys %timer) { my $n = @timers; @timers = grep { not exists $timer{"$_"}} @timers; reset_timer if @timers != $n; warn "some timers killed too\n" if $DEBUG; } # prepare and call the handlers if ( $DEBUG) { warn "io dispatch ", join(' ', map { defined($_) ? $_ : 'undef' } @$_), "\n" for @ev; } for ( @ev) { $$_[WATCH_IO_FLAGS] &= $mask; $$_[WATCH_OBJ]-> io_handler( $_); } reset_session; } # timeout handler sub on_tick { warn "event timer[$alarm_id]\n" if $DEBUG; $event++; my @ev; # timers my $t = time; push @ev, grep { $_-> [WATCH_DEADLINE] <= $t } @timers; @timers = grep { $_-> [WATCH_DEADLINE] > $t } @timers; $alarm_id = undef; reset_timer; purge_filenos { return not defined($_-> [WATCH_DEADLINE]) or ($_-> [WATCH_DEADLINE] > $t) }; $$_[WATCH_IO_FLAGS] = 0 for @ev; if ( $DEBUG) { warn "timer dispatch ", join(' ', map { defined($_) ? $_ : 'undef' } @$_), "\n" for @ev } $$_[WATCH_OBJ]-> io_handler( $_) for @ev; reset_session; } sub watch { reset_session(1); $kr_last_session = $$kr_active_session; $$kr_active_session = $session; my ( $self, $rec) = @_; my $fileno = fileno $rec->[WATCH_IO_HANDLE]; die "Invalid filehandle" unless defined $fileno; my $flags = $rec->[WATCH_IO_FLAGS]; unless ( $filenos{$fileno}) { $filenos{$fileno} = { mask => 0, rec => [], handle => $rec->[WATCH_IO_HANDLE], }; warn "object created for $fileno\n" if $DEBUG; } my $f = $filenos{$fileno}; push @{$f-> {rec}}, $rec; reset_mask($f); $self-> after( $rec) if $rec-> [WATCH_DEADLINE]; $$kr_active_session = $kr_last_session; } sub after { reset_session(1); $kr_last_session = $$kr_active_session; $$kr_active_session = $session; my ( $self, $rec) = @_; push @timers, $rec; reset_timer; $$kr_active_session = $kr_last_session; } sub remove { $kr_last_session = $$kr_active_session; $$kr_active_session = $session; my ($self, $obj) = @_; my $t = @timers; @timers = grep { defined($_-> [WATCH_OBJ]) and $_-> [WATCH_OBJ] != $obj } @timers; reset_timer if $t != @timers; purge_filenos { defined($_-> [WATCH_OBJ]) and $_-> [WATCH_OBJ] != $obj }; reset_session(0); $$kr_active_session = $kr_last_session; } sub remove_event { $kr_last_session = $$kr_active_session; $$kr_active_session = $session; my ($self, $rec) = @_; my $t = @timers; @timers = grep { $_ != $rec } @timers; reset_timer if $t != @timers; purge_filenos { $_ != $rec }; reset_session(0); $$kr_active_session = $kr_last_session; } sub yield { warn "yield\n" if $DEBUG; my ( $self, $nonblocking ) = @_; local $event = 0; $poe_kernel-> run_one_timeslice; # that ssession should be the kernel itself $poe_kernel-> run if $poe_kernel-> _data_ses_count == 1; return if $nonblocking; $poe_kernel-> run_one_timeslice while $event == 0; $poe_kernel-> run if $poe_kernel-> _data_ses_count == 1; } sub signal { $event++ } 1; __DATA__