| IO-Multiplex-KQueue documentation | Contained in the IO-Multiplex-KQueue distribution. |
IO::Multiplex::KQueue - IO::Multiplex by kqueue(2)
use IO::Multiplex::KQueue;
my $mux = new IO::Multiplex::KQueue;
$mux->add($fh1);
$mux->add(\*FH2);
$mux->set_callback_object(...);
$mux->listen($server_socket);
$mux->loop;
sub mux_input {
...
}
IO::Multiplex::KQueue is kqueue(2) IO::Multiplex implementation with
compatible interface to IO::Multiplex (version 1.08).
Please refer IO::Multiplex for details.
Just install IO::KQueue and replace IO::Multiplex with
IO::Multiplex::KQueue in your source code.
use IO::Socket;
use IO::Multiplex;
# Create a multiplex object
my $mux = new IO::Multiplex;
use IO::Socket;
use IO::Multiplex::KQueue;
# Create a multiplex object
my $mux = new IO::Multiplex::KQueue;
# done! no futher modification!
heartbeat is not supported in IO::Multiplex::KQueue.
You may get several "read error: Operation timed out" warings.
IO::KQueue 0.29 has a bug to handle timeout. please install http://www.in2home.org/download/IO-KQueue-0.30.tar.gz or you will fail t/110_ntest.t.
IO::Multiplex, IO::KQueue, kqueue(2).
Copyright 1999 Bruce J Keeler <bruce@gridpoint.com>
Copyright 2001-2003 Rob Brown <bbb@cpan.org>
Copyright 2005 Kai-Hsiang Chuang <in2@in2home.org>
Released under the terms of the Artistic License.
| IO-Multiplex-KQueue documentation | Contained in the IO-Multiplex-KQueue distribution. |
# $Id: IOMultiplexKQueue.pm 116 2005-12-22 04:25:28Z in2 $ package IO::Multiplex::KQueue;
use strict; use POSIX qw(errno_h BUFSIZ); use vars qw($VERSION); use Socket; use FileHandle qw(autoflush); use IO::Handle; use Fcntl; use Carp qw(carp); use IO::KQueue; $VERSION = '0.02'; BEGIN { eval { # Can optionally use Hi Res timers if available require Time::HiRes; Time::HiRes->import ('time'); } }; # This is what you want. Trust me. $SIG{PIPE} = 'IGNORE'; sub new { my $package = shift; my $self = bless { _kq => IO::KQueue->new(), _fhs => {}, _handles => {}, _timerkeys => {}, _timers => [], _listen => {} } => $package; return $self; } sub listen { my $self = shift; my $fh = shift; $self->add($fh); $self->{_fhs}{"$fh"}{listen} = 1; } sub add { my $self = shift; my $fh = shift; return if $self->{_fhs}{"$fh"}; nonblock($fh); autoflush($fh, 1); $self->{_fhs}{"$fh"}{udp_true} = (SOCK_DGRAM == unpack("i", scalar getsockopt($fh,Socket::SOL_SOCKET(),Socket::SO_TYPE()))); $self->{_fhs}{"$fh"}{inbuffer} = ''; $self->{_fhs}{"$fh"}{outbuffer} = ''; $self->{_fhs}{"$fh"}{fileno} = fileno($fh); $self->{_handles}{"$fh"} = $fh; #fd_set($self->{_readers}, $fh, 1); $self->EV_SET($fh, EVFILT_READ, EV_ADD, 0, 5, [$fh, EVFILT_READ]); tie *$fh, "IO::Multiplex::KQueue::Handle", $self, $fh; return $fh; } sub remove { my $self = shift; my $fh = shift; #fd_set($self->{_writers}, $fh, 0); #fd_set($self->{_readers}, $fh, 0); eval { # according to kqueue(2): Events which are attached to file # descriptors are automatically deleted on the last close of # the descriptor. (different behavior between select(2) and kqueue(2)) # removing a fd which is already closed will cause "set kevent # failed: No such file or directory" $self->EV_SET($fh, EVFILT_READ, EV_DELETE); $self->EV_SET($fh, EVFILT_WRITE, EV_DELETE); }; delete $self->{_fhs}{"$fh"}; delete $self->{_handles}{"$fh"}; $self->_removeTimer($fh); untie *$fh; } sub set_callback_object { my $self = shift; my $obj = shift; my $fh = shift; return if $fh && !exists($self->{_fhs}{"$fh"}); my $old = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object}; $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj; return $old; } sub kill_output { my $self = shift; my $fh = shift; return unless $fh && exists($self->{_fhs}{"$fh"}); $self->{_fhs}{"$fh"}{outbuffer} = ''; $self->EV_SET($fh, EVFILT_WRITE, EV_DELETE); } sub outbuffer { my $self = shift; my $fh = shift; return unless $fh && exists($self->{_fhs}{"$fh"}); if (@_) { $self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_; #fd_set($self->{_writers}, $fh, 0) if !$_[0]; $self->EV_SET($fh, EVFILT_WRITE, EV_ADD, 0, 0, [$fh, EVFILT_WRITE]); } return $self->{_fhs}{"$fh"}{outbuffer}; } sub inbuffer { my $self = shift; my $fh = shift; return unless $fh && exists($self->{_fhs}{"$fh"}); if (@_) { $self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_; } return $self->{_fhs}{"$fh"}{inbuffer}; } sub set_timeout { my $self = shift; my $fh = shift; my $timeout = shift; return unless $fh && exists($self->{_fhs}{"$fh"}); if (defined $timeout) { $self->_addTimer($fh, $timeout + time); } else { $self->_removeTimer($fh); } } sub handles { my $self = shift; return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}}); } sub _addTimer { my $self = shift; my $fh = shift; my $time = shift; # Set a key so that we can quickly tell if a given $fh has # a timer set $self->{_timerkeys}{"$fh"} = 1; # Store the timeout in an array, and resort it @{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] ); } sub _removeTimer { my $self = shift; my $fh = shift; # Return quickly if no timer is set return unless exists $self->{_timerkeys}{"$fh"}; # Remove the timeout from the sorted array @{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}}; # Get rid of the key delete $self->{_timerkeys}{"$fh"}; } sub loop { my $self = shift; my $heartbeat = shift; $self->{_endloop} = 0; warn "heartbeat is not supported in IO::Multiplex::KQueue" if( $heartbeat ); while (!$self->{_endloop} && keys %{$self->{_fhs}}) { my $rv; my $data; my $rdready; my $wrready; my $timeout = undef; my @results = undef; if (@{$self->{_timers}}) { $timeout = $self->{_timers}[0][1] - time; } @results = $self->{_kq}->kevent($timeout); unless(@results) { if ($! == EINTR || $! == EAGAIN) { next; } elsif( @{$self->{_timers}} ){ $self->_checkTimeouts(); next; } else { warn "kevent exists without event!?"; next; } } foreach my $kevent (@results) { my($fh, $action) = @{$kevent->[KQ_UDATA]}; # Avoid creating a permanent empty hash ref for "$fh" # by attempting to access its {object} element # if it has already been closed. next unless exists $self->{_fhs}{"$fh"}; # Get the callback object. my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object}; # Is this descriptor ready for reading? if( $action == EVFILT_READ ){ if ($self->{_fhs}{"$fh"}{listen}) { # It's a server socket, so a new connection is # waiting to be accepted my $client = $fh->accept; next unless ($client); $self->add($client); $obj->mux_connection($self, $client) if $obj && $obj->can("mux_connection"); } else { if ($self->is_udp($fh)) { $rv = recv($fh, $data, BUFSIZ, 0); if (defined $rv) { # Remember where the last UDP packet came from $self->{_fhs}{"$fh"}{udp_peer} = $rv; } } else { $rv = &POSIX::read(fileno($fh), $data, BUFSIZ); } if (defined($rv) && length($data)) { # Append the data to the client's receive buffer, # and call process_input to see if anything needs to # be done. $self->{_fhs}{"$fh"}{inbuffer} .= $data; $obj->mux_input($self, $fh, \$self->{_fhs}{"$fh"}{inbuffer}) if $obj && $obj->can("mux_input"); } else { unless (defined $rv) { next if $! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK; warn "IO::Multiplex::KQueue read error: $!" if $! != ECONNRESET; } # There's an error, or we received EOF. If # there's pending data to be written, we leave # the connection open so it can be sent. If # the other end is closed for writing, the # send will error and we close down there. # Either way, we remove it from _readers as # we're no longer interested in reading from # it. #fd_set($self->{_readers}, $fh, 0); $self->EV_SET($fh, EVFILT_READ, EV_DELETE); $obj->mux_eof($self, $fh, \$self->{_fhs}{"$fh"}{inbuffer}) if $obj && $obj->can("mux_eof"); if (exists $self->{_fhs}{"$fh"}) { delete $self->{_fhs}{"$fh"}{inbuffer}; # The mux_eof handler could have responded # with a shutdown for writing. $self->close($fh) unless exists $self->{_fhs}{"$fh"} && exists $self->{_fhs}{"$fh"}{outbuffer}; } next; } } } # end if readable next unless exists $self->{_fhs}{"$fh"}; if( $action == EVFILT_WRITE ){ unless ($self->{_fhs}{"$fh"}{outbuffer}) { #fd_set($self->{_writers}, $fh, 0); $self->EV_SET($fh, EVFILT_WRITE, EV_DELETE); $obj->mux_outbuffer_empty($self, $fh) if ($obj && $obj->can("mux_outbuffer_empty")); next; } $rv = &POSIX::write(fileno($fh), $self->{_fhs}{"$fh"}{outbuffer}, length($self->{_fhs}{"$fh"}{outbuffer})); unless (defined($rv)) { # We got an error writing to it. If it's # EWOULDBLOCK (shouldn't happen if select told us # we can write) or EAGAIN, or EINTR we don't worry # about it. otherwise, close it down. unless ($! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN) { if ($! == EPIPE) { $obj->mux_epipe($self, $fh) if $obj && $obj->can("mux_epipe"); } else { warn "IO::Multiplex: write error: $!\n"; } $self->close($fh); } next; } substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = ''; unless ($self->{_fhs}{"$fh"}{outbuffer}) { # Mark us as not writable if there's nothing more to # write #fd_set($self->{_writers}, $fh, 0); $self->EV_SET($fh, EVFILT_WRITE, EV_DELETE); $obj->mux_outbuffer_empty($self, $fh) if ($obj && $obj->can("mux_outbuffer_empty")); if ($self->{_fhs}{"$fh"}{shutdown}) { # If we've been marked for shutdown after write # do it. shutdown($fh, 1); delete $self->{_fhs}{"$fh"}{outbuffer}; unless (exists $self->{_fhs}{"$fh"}{inbuffer}) { # We'd previously been shutdown for reading # also, so close out completely $self->close($fh); next; } } } } # End if writeable next unless exists $self->{_fhs}{"$fh"}; } # End foreach $fh (...) $self->_checkTimeouts() if @{$self->{_timers}}; } # End while(loop) } sub _checkTimeouts { my $self = shift; # Get the current time my $time = time; # Copy all of the timers that should go off into # a temporary array. This allows us to modify the # real array as we process the timers, without # interfering with the loop. my @timers = (); foreach my $timer (@{$self->{_timers}}) { # If the timer is in the future, we can stop last if $timer->[1] > $time; push @timers, $timer; } foreach my $timer (@timers) { my $fh = $timer->[0]; $self->_removeTimer($fh); next unless exists $self->{_fhs}{"$fh"}; my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object}; $obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout"); } } sub endloop { my $self = shift; $self->{_endloop} = 1; } sub udp_peer { my $self = shift; my $fh = shift; return $self->{_fhs}{"$fh"}{udp_peer}; } sub is_udp { my $self = shift; my $fh = shift; return $self->{_fhs}{"$fh"}{udp_true}; } sub write { my $self = shift; my $fh = shift; my $data = shift; return unless $fh && exists($self->{_fhs}{"$fh"}); if ($self->{_fhs}{"$fh"}{shutdown}) { $! = EPIPE; return undef; } if ($self->is_udp($fh)) { if (my $udp_peer = $self->udp_peer($fh)) { # Send the packet back to the last peer that said something return send($fh, $data, 0, $udp_peer); } else { # No udp_peer yet? # This better be a connect()ed UDP socket # or else this will fail with ENOTCONN return send($fh, $data, 0); } } $self->{_fhs}{"$fh"}{outbuffer} .= $data; #fd_set($self->{_writers}, $fh, 1); $self->EV_SET($fh, EVFILT_WRITE, EV_ADD, 0, 0, [$fh, EVFILT_WRITE]); return length($data); } sub shutdown { my $self = shift; my $fh = shift; my $which = shift; return unless $fh && exists($self->{_fhs}{"$fh"}); if ($which == 0 || $which == 2) { # Shutdown for reading. We can do this now. shutdown($fh, 0); # The mux_eof hook must be run from the main loop to consume # the rest of the inbuffer if there is anything left. # It will also remove $fh from _readers. } if ($which == 1 || $which == 2) { # Shutdown for writing. Only do this now if there is no pending # data. if ($self->{_fhs}{"$fh"}{outbuffer}) { $self->{_fhs}{"$fh"}{shutdown} = 1; } else { shutdown($fh, 1); delete $self->{_fhs}{"$fh"}{outbuffer}; } } # Delete the descriptor if it's totally gone. unless (exists $self->{_fhs}{"$fh"}{inbuffer} || exists $self->{_fhs}{"$fh"}{outbuffer}) { $self->close($fh); } } sub close { my $self = shift; my $fh = shift; return unless exists $self->{_fhs}{"$fh"}; my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object}; warn "closeing with read buffer" if $self->{_fhs}{"$fh"}{inbuffer}; warn "closeing with write buffer" if $self->{_fhs}{"$fh"}{outbuffer}; #fd_set($self->{_readers}, $fh, 0); #fd_set($self->{_writers}, $fh, 0); #in kqueue(2): # Events which are attached to file descriptors are # automatically deleted on the last close of the descriptor. delete $self->{_fhs}{"$fh"}; delete $self->{_handles}{"$fh"}; untie *$fh; close $fh; $obj->mux_close($self, $fh) if $obj && $obj->can("mux_close"); } # We set non-blocking mode on all descriptors. If we don't, then send # might block if the data is larger than the kernel can accept all at once, # even though select told us we can write. With non-blocking mode, we # get a partial write in those circumstances, which is what we want. sub nonblock { my $fh = shift; my $flags = fcntl($fh, F_GETFL, 0) or die "fcntl F_GETFL: $!\n"; fcntl($fh, F_SETFL, $flags | O_NONBLOCK) or die "fcntl F_SETFL $!\n"; } sub EV_SET { my $self = shift; my $fh = shift; $self->{_kq}->EV_SET($self->{_fhs}{"$fh"}{fileno}, @_); } # We tie handles into this package to handle write buffering. package IO::Multiplex::KQueue::Handle; use strict; use Tie::Handle; use Carp; use vars qw(@ISA); @ISA = qw(Tie::Handle); sub FILENO { my $self = shift; return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno}); } sub TIEHANDLE { my $package = shift; my $mux = shift; my $fh = shift; my $self = bless { _mux => $mux, _fh => $fh } => $package; return $self; } sub WRITE { my $self = shift; my ($msg, $len, $offset) = @_; $offset ||= 0; return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len)); } sub CLOSE { my $self = shift; return $self->{_mux}->shutdown($self->{_fh}, 2); } sub READ { carp "Do not read from a muxed file handle"; } sub READLINE { carp "Do not read from a muxed file handle"; } sub FETCH { return "Fnord"; } 1; __END__