| IO-Events documentation | Contained in the IO-Events distribution. |
$SIG{PIPE} = 'IGNORE', usually a sound idea.$|=1 first in your script if you use ::Fork or ::Process
classes.on_fork, never terminate a child process by die or exit, because
otherwise everything Perl allocated in parent process will be de-allocated
twice, both in the parent and in the child process. If you must, use
POSIX::exit instead.
IO::Events - Non-blocking IO using events
Example 1, run 'bc' as a co-process:
use IO::Events;
my $loop = IO::Events::Loop-> new();
my $stdin_alive = 1;
my $calculator = IO::Events::Process::ReadWrite-> new(
owner => $loop,
process => 'bc -l',
on_read => sub {
while ( my $line = $_[0]-> readline) {
print "bc says: $line";
}
},
on_close => sub {
exit 1 if $stdin_alive; # fork/exec error
}
);
my $stdin = IO::Events::stdin-> new(
owner => $loop,
on_read => sub {
$calculator-> write( $_[0]-> read );
},
on_close => sub {
$stdin_alive = 0;
exit;
},
);
$loop-> yield while 1;
Example 2, connect to/listen on a TCP port within a single process:
use IO::Events;
my $loop = IO::Events::Loop-> new();
IO::Events::Socket::TCP-> new(
owner => $loop,
listen => 1,
port => 10000,
on_read => sub {
my $new = shift-> accept(
read => 1,
on_read => sub {
while ( my $line = $_[0]-> readline) {
print "client says: $line\n";
exit;
}
}
);
print "connect from $new->{remote_addr}:$new->{remote_port}\n";
},
);
IO::Events::Socket::TCP-> new(
owner => $loop,
connect => 'localhost',
port => 10000,
)-> write("hello, tcp socket!\n");
$loop->yield while 1;
Example 3, connect to/listen on a UDP port within a single process:
use Socket;
use IO::Events;
my $loop = IO::Events::Loop-> new();
IO::Events::Socket::UDP-> new(
owner => $loop,
port => 10000,
on_read => sub {
my $self = $_[0];
my $data = $self-> recv;
print "$self->{remote_host}:$self->{remote_port} says: $data";
exit;
},
);
IO::Events::Socket::UDP-> new(
owner => $loop,
)-> send( 'localhost', 10000, "hello, udp socket!\n");
$loop->yield while 1;
The module implements object-oriented approach to select-driven events and contains set of convenience objects for inter-process communication.
The main part of the module is the 'loop' instance of IO::Events::Loop class,
which knows about all IO handles subject to select(). The handles are inherited
from IO::Events::Handle class, which property handle holds reference to a
IO handle object, - a file scalar or IO::Handle instance.
IO::Events::Handle object propagates select-based events - on_read,
on_write, on_exception, as well as generic on_create, on_close and
on_error. It is a requirement that handles are non-blocking.
All instances are created by calling new with arbitrary number of named parameters.
The unrecognized parameters are stored in the object and cause no conflict:
my $a = IO::Events::Handle-> new( my_var => 1 );
die $a->{my_var};
The module is not meant to serve as replacement of IO::Select and
IPC::Open, and can perfectly live together with the first and counteract
with the handles managed by the second. The example in "SYNOPSIS" section
displays how to harness the non-blocking IO between stdin and a co-process.
$SIG{PIPE} = 'IGNORE', usually a sound idea.$|=1 first in your script if you use ::Fork or ::Process
classes.on_fork, never terminate a child process by die or exit, because
otherwise everything Perl allocated in parent process will be de-allocated
twice, both in the parent and in the child process. If you must, use
POSIX::exit instead.new() parametersNumber of seconds passed to select() as the fourth parameter.
Default value: 50
In addition to select(), IO::Events::Loop also waits for
finished processes, automatically getting rid of handles associated with
them, if set to 1.
Default value: 1
Returns number of handles owner by loop object. When a program automatically disposes of handles, not needed anymore, it may choose to stop when there are no more handles to serve.
Enters select() loop and dispatches pending IO if data are available to read
or write. Hash values of 'block_read', 'block_write', and 'block_exc'
can be set to 1 if read, write, or exception events are not to be used.
Practically,
$loop-> yield( block_read => 1, block_exc => 1, timeout => 0 )
call effectively ( but still in the non-blocking fashion ) flushes write buffers.
Returns result of select() call, the number of streams handled.
Flushes write sockets, if possible.
All handles are assigned an unique id, and are stored in
internal {id} hash. This hash is read-only, and can be used
to look-up a handle object by its id string.
Hash of file numbers, read-only.
Hash of PIDs associated with handles, read-only.
Used for IPC and waitpid results processing.
Root of IO handle object hierarchy, dispatches IO events and auto-destructs when
handle is closed or an IO error occurs. The explicit destruction is invoked by
calling destroy, which is reentrant-safe.
new()If set to 1, IO handle is explicitly closed as the object instance is destroyed. Doesn't affect anything otherwise.
Default value: 1
Flushes not yet written data. If DISCARD is 1, does not return until all data are written or error is signalled. If 0, discards all data.
IO::Handle object or a file scalar. If not specified,
a new IO::Handle instance is created automatically.
The handle is set to non-blocking mode. If this is already
done, nonblock optional boolean parameter can be set to 1
to prevent extra fcntl calls.
Set to 1 if handle is to be read from.
Default value: 0
Defined in IO::Events::Fork::ReadWrite and IO::Event::IPC::Process::ReadWrite
namespaces. @WHO can contain string 'read' and 'write', to tell what
part of bidirectional IPC is to be closed.
Set to 1 if handle is to be written from.
Default value: 0
If a handle is associated with a process, IO::Events::Loop uses
this field to waitpid() and auto-destruct the handle.
Default value: undef
Selects whether the handle is readable.
Selects whether the handle is writable.
Selects whether the handle accepts exception events.
Returns newline-ended read data, if available.
Return contents of the read buffer.
Appends DATA to the write buffer
Destructs handle instance
Dispatches EVENT, passing PARAMETERS to each callback.
A single event can cause several callback routines to be called. This is useful
when a class declares its own, for example, cleanup code in on_close sub,
whereas the class instance user can add another listener to the same on_close
notification:
package MyHandle;
...
sub on_close { ... }
...
MyHandle-> new( on_close => sub { ... });
The class declares static ( per-class ) instance of hash %events, which
contains declaration of events and their execution flow. SINGLE-declared
events call only single callback, be it static or dynamic. MULTIPLE-declared
events call all callbacks, but execution flow can be stopped by setting
{event_flag} to 1. This is useful, for example, to dynamically override
default behavior of IO::Events::Handle::on_error which emits a warning message
to stderr and destroys the handle.
Called before object instance is destroyed.
In a special case for ReadWrite objects, on_close is called twice,
when read and write handles are closed. To distinuish between the cases,
the second parameter is set to 1 when on_close is called due to
the writer handle destruction.
Declared as MULTIPLE.
Called after object instance is created.
Declared as MULTIPLE.
Called when read or write calls encounter error.
Declared as MULTIPLE.
Called when exception is arrived. Consult your system select manpage
to see what events and on what socket types can be expected.
Special event, called by IO::Events::Fork objects when
a new process is instantiated. Although the syntax for specifying
on_fork is the same as for the other events, on_fork does
not interact with these, and is not called from within yield.
When on_fork returned, process is terminated. If you wish to
terminate process yourself, do not call perl's exit but rather
POSIX::_exit, since otherwise perl stuctures created before fork
will be destroyed twice.
Called after data is read.
Called when handle is writable and the write buffer is empty. If the event
doesn't fill the write buffer, the handle write flag is cleared and further
on_write notifications are suppressed until the write buffer is filled.
Runs a process with its stdout tied to a newly created handle.
The process name is passed to process parameter to the new() contructor.
Runs a process with its stdin tied to a newly created handle.
The process name is passed to process parameter to the new() contructor.
Runs a process with its stdin and stdout tied to two newly created handles. The both handles are transparently mapped to a single handle object.
Note: check IPC::Open2 and IPC::Open3 also.
Forks a child with its stdout tied to a newly created handle.
Forks a child with its stdin tied to a newly created handle.
Forks a child with its stdin and stdout tied to two newly created handles. The both handles are transparently mapped to a single handle object.
Shortcut class for STDIN handle.
Shortcut class for STDOUT handle.
Shortcut class for STDERR handle.
Shortcut class for TCP socket. Parameters accepted:
If set, connect() call is issued on the socket to
HOSTNAME and port set in port
parameter.
If set, socket listens on port.
Number of a port to bind to.
If set, socket listens on addr IP address, otherwise INADDR_ANY.
Creates a new IO handle and accepts a connection into it.
Returns the newly created IO::Events::Handle object
with %PROFILE fields.
Shortcut class for UDP socket. Parameters accepted:
Number of a port to bind to.
If set, socket listens on addr IP address, otherwise INADDR_ANY.
Issues send(2) call, returns number of bytes sent or an error. See send in perldoc for more details.
Options accepted: oob, dontroute, eor, eof.
Issues recv(2) call, returns data block or under if error. See recv in perldoc for more details.
Options accepted: oob, peek, waitall, nonblock, maxlen.
A handle class, used to connect to and listen on UNIX sockets.
Creates a new IO handle and accepts a connection into it.
Returns the newly created IO::Events::Handle object
with %PROFILE fields.
Connects to a socket over a given PATHNAME
Listens on a socket over a given PATHNAME
A tiny hackish hask to add time-based events. The class is not inherited
from IO::Event::Handle, and the only property it shared with the other
handle classes is owner.
Invokes on_tick callback in SECONDS, which can be float.
If on, timer is started immediately, otherwise is stopped.
If on, timer fires off event each SECONDS interval, otherwise goes off
after the first on_tick.
Starts the timer
Stops the timer
Callback invoked each time SECONDS interval is expired.
perlipc, POE, IO::Handle, IO::Select, IPC::Open2.
Copyright (c) 2004 catpipe Systems ApS. All rights reserved.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
Dmitry Karasik <dmitry@karasik.eu.org>
| IO-Events documentation | Contained in the IO-Events distribution. |
# # Copyright (c) 2004 catpipe Systems ApS # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $Id: Events.pm,v 1.32 2007/03/28 08:01:59 dk Exp $ use strict; package IO::Events; use vars qw($VERSION $FORK_MODE @loops); $VERSION=0.6; # Master loop object package IO::Events::Loop; use vars qw(@ISA); use IO::Handle; use Errno qw(EAGAIN); use POSIX qw(sys_wait_h exit); use Time::HiRes qw(time); sub new { my $class = shift; my $obj = bless { # options debug => 0, timeout => 50, # seconds waitpid => 1, @_, # private fields read => '', write => '', exc => '', processes => {}, filenos => {}, ids => {}, timers => [], }, $class; push @IO::Events::loops, $obj; return $obj; } sub yield { my ( $self, %profile) = @_; my ( $ir, $iw, $ie) = ( $self->{read}, $profile{block_write} ? '' : $self-> {write}, $profile{block_exc} ? '' : $self-> {exc} ); my $timeout = exists $profile{timeout} ? $profile{timeout} : $self->{timeout}; if ( @{$self->{timers}}) { my $time = time; for my $timer (@{$self->{timers}}) { next unless $timer->{active}; my $sleep = $timer->{alert} - $time; $timeout = $sleep if $timeout > $sleep; } $timeout = 0 if $timeout < 0; } my $n = select( $ir, $iw, $ie, $timeout); if ( @{$self->{timers}}) { my $time = time; for my $timer (@{$self->{timers}}) { next if not $timer->{active} or $time < $timer->{alert}; $timer-> notify; } } unless ( $n > 0) { if ( $self->{debug}) { print STDERR "IO::Events: empty select"; if ( $n < 0) { print STDERR " error:$!"; } print STDERR "\n"; } goto WAITPID; } my $i; my $lnx = (sort { $a <=> $b } map { length } ( $ir, $iw, $ie))[-1] * 8; for ( $i = 0; $i < $lnx; $i++) { my ( $r, $w, $e) = ( vec( $ir, $i, 1), vec( $iw, $i, 1), vec( $ie, $i, 1)); next unless $r || $w || $e; my $task; if ( exists $self-> {filenos}-> {$i} && exists $self->{ids}->{$self-> {filenos}-> {$i}}) { $task = $self->{ids}->{$self-> {filenos}-> {$i}}; } else { print STDERR "IO::Events: runaway handle $i/$self->{filenos}->{$i}\n" if $self->{debug}; $self-> error( undef, 'select'); next; } if ( $task-> {callback} and not $task-> {dead} ) { $task-> {callback}-> ( $task, $r, $w, $e); } if ( $r and not $task-> {dead}) { my $nbytes; if ( $task-> {read} > -1) { $nbytes = sysread( $task->{handle}, $task->{read_buffer}, $profile{block_read} ? 0 : 65536, length ($task->{read_buffer})); } if ( $task->{read} > 0) { print STDERR "IO::Events: # $i read $nbytes bytes\n" if $self->{debug}; unless ( defined $nbytes) { $self-> error( $task, 'read') unless $! == EAGAIN; next; } } else { $nbytes = 1 unless defined $nbytes; # simulate read print STDERR "IO::Events: # $i simulated read $nbytes\n" if $self->{debug}; } next if $profile{block_read}; if ( $nbytes > 0) { $task-> notify('on_read'); next; } $task-> destroy unless $task-> {pid}; } if ( $w and not $task-> {dead}) { unless ( length $task->{write_buffer}) { vec( $self->{write}, $task-> {fileno}, 1) = 0; $task-> notify('on_write'); next; } my $nbytes = syswrite( $task->{handle}, $task->{write_buffer}); print STDERR "IO::Events: # $i wrote $nbytes bytes\n" if $self->{debug}; unless ( defined $nbytes) { $self-> error( $task, 'write') unless $! == EAGAIN; next; } if ( $nbytes > 0) { substr( $task->{write_buffer}, 0, $nbytes) = ''; unless ( length $task->{write_buffer}) { vec( $self->{write}, $task-> {fileno}, 1) = 0; $task-> notify('on_write'); } next; } } if ( $e and not $task-> {dead}) { print STDERR "IO::Events: exception $i\n" if $self->{debug}; $task-> notify('on_exception'); } } # close processes WAITPID: if ( $self-> {waitpid}) { while (($_ = waitpid(-1,WNOHANG)) > 0) { next unless $self->{processes}->{$_}; my @tasks = map { $self-> {ids}-> {$_}} @{$self->{processes}-> {$_}}; # read leftovers for my $task ( @tasks) { if ( $task-> can_read && $task-> {read} > 0) { my $notify; while ( 1) { my $nbytes = sysread( $task->{handle}, $task->{read_buffer}, 65536, length ($task->{read_buffer})); unless ( defined $nbytes) { $self-> error( $task, 'read') unless $! == EAGAIN; last; } $notify += $nbytes; last unless $nbytes; } $task-> notify('on_read') if $notify; } # XXX if $task-> can_exception ... read URG bytes? $task->{exitcode} = $?; $task->{finished} = 1; } for my $task ( @tasks) { $task-> destroy; } } } return $n; } sub handles { return scalar(keys %{$_[0]->{ids}}); } sub flush { shift-> yield( block_read => 1, block_exc => 1, timeout => 0); } sub error { my ( $self, $task, $condition) = @_; $task-> notify('on_error', $condition, $!) if $task; } sub on_fork { $IO::Events::FORK_MODE = 1; shift-> DESTROY; $IO::Events::FORK_MODE = undef; } sub DESTROY { my $self = $_[0]; return if $self->{dead}; for ( values %{$self->{ids}}) { next unless $_; $_->{dead} = 1 if $IO::Events::FORK_MODE; $_-> destroy; } for ( @{$self->{timers}}) { next unless $_; $_->{dead} = 1; } @IO::Events::loops = grep { $self != $_ } @IO::Event::IPC::loops; $self-> {dead} = 1; } END { for ( @IO::Events::loops) { eval { $_->DESTROY }; warn "$@" if $@; } @IO::Events::loops = (); } # Single task package IO::Events::Handle; use vars qw(@ISA %events); use Errno qw(EAGAIN); use constant SINGLE => 1; use constant MULTIPLE => 2; %events = ( on_read => SINGLE, on_write => SINGLE, on_exception => SINGLE, on_close => MULTIPLE, on_create => MULTIPLE, on_error => MULTIPLE, ); use Fcntl; sub new { my $class = shift; my $self = bless { auto_close => 1, finished => 0, exitcode => 0, read_buffer => '', write_buffer => '', write => 0, read => 0, exception => 0, pid => undef, @_, }, $class; $self->{handle} = IO::Handle-> new() unless defined $self->{handle}; for ( qw(owner)) { die "No `$_' field" unless defined $self-> {$_}; } $self-> {id} = "$self" unless defined $self-> {id}; my $owner = $self->{owner}; die "Id `$self->{id}` already present" if exists $owner->{ids}->{$self->{id}}; my $fno = fileno( $self->{handle}); die "Cannot read fileno() from handle" unless defined $fno; $self-> {fileno} = $fno; unless ( $self-> {nonblock}) { my $fl; $fl = fcntl( $self->{handle}, F_GETFL, 0); die "$!" unless defined $fl; fcntl( $self->{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!"; } if ($self-> {write}) { vec( $owner-> {write}, $fno, 1) = 1; #print "write\n"; } if ($self-> {read}) { vec( $owner-> {read}, $fno, 1) = 1; #print "read\n"; } if ($self-> {exception}) { vec( $owner-> {exc}, $fno, 1) = 1; } $owner-> {filenos}-> {$fno} = $self-> {id}; push @{$owner-> {processes}-> {$self->{pid}}}, $self-> {id} if defined $self->{pid}; $owner-> {ids}-> {$self->{id}} = $self; $self-> notify('on_create'); return $self; } sub can_read { return vec( $_[0]->{owner}->{read}, $_[0]-> {fileno}, 1) unless $#_; vec( $_[0]->{owner}->{read}, $_[0]-> {fileno}, 1) = $_[1]; $_[0]-> {read} = $_[1]; } sub can_write { return vec( $_[0]->{owner}->{write}, $_[0]-> {fileno}, 1) unless $#_; vec( $_[0]->{owner}->{write}, $_[0]-> {fileno}, 1) = $_[1]; $_[0]-> {write} = $_[1]; } sub can_exception { return vec( $_[0]->{owner}->{exc}, $_[0]-> {fileno}, 1) unless $#_; vec( $_[0]->{owner}->{exc}, $_[0]-> {fileno}, 1) = $_[1]; $_[0]-> {exception} = $_[1]; } sub DESTROY { my $self = $_[0]; return if $self->{dead}; $self->{dead} = 1; $self-> flush; $self-> notify('on_close'); $self-> {handle}-> close if defined $self->{handle} && $self->{auto_close}; if ( defined $self->{owner}) { if ( defined $self->{fileno}) { vec( $self-> {owner}-> {exc}, $self->{fileno}, 1) = 0; vec( $self-> {owner}-> {write}, $self->{fileno}, 1) = 0; vec( $self-> {owner}-> {read}, $self->{fileno}, 1) = 0; delete $self-> {owner}-> {filenos}-> {$self->{fileno}}; } if (defined $self->{pid}) { my $p = $self-> {owner}-> {processes}-> {$self->{pid}}; @$p = grep { $_ ne $self->{id}} @$p; delete $self-> {owner}-> {processes}-> {$self->{pid}} unless @$p; } delete $self-> {owner}-> {ids}-> {$self->{id}}; } delete $self->{fileno}; delete $self->{id}; } sub readline { return $1 if $_[0]-> {read_buffer} =~ s/^([^\n]*\n)//; return undef; } sub read { my $c = $_[0]-> {read_buffer}; substr( $_[0]-> {read_buffer}, 0) = ''; return $c; } sub write { my ( $self, $data) = @_; $self-> {write_buffer} .= $data; vec( $self->{owner}->{write}, $self-> {fileno}, 1) = 1 if $self->{owner} and defined $self->{fileno}; my $nbytes = syswrite( $self->{handle}, $self->{write_buffer}); unless ( defined $nbytes) { $self-> {owner}-> error( $self, 'write') if $self->{owner} && $! != EAGAIN; $nbytes = 0 if $! == EAGAIN; } elsif ( $nbytes > 0) { substr( $self->{write_buffer}, 0, $nbytes) = ''; } $nbytes; } sub flush { my ( $self, $discard) = @_; if ( $discard) { $self-> {write_buffer} = ''; } else { while ( length $self-> {write_buffer}) { return undef unless defined $self-> write(''); } } return 1; } sub destroy { shift-> DESTROY } sub notify { my ( $self, $event, @params) = @_; die( "Unexistent event `$event'") unless $events{$event}; $self-> {event_flag} = 0; if ( exists $self->{$event}) { $self->{$event}->($self,@params); return if $events{$event} == SINGLE || $self->{event_flag}; } $self-> $event(@params) if $self-> can($event); } sub on_error { my ( $self, $condition, $errno) = @_; if ( $self) { $condition .= ' '.ref($self); $condition .= ",#$self->{fileno}" if defined $self->{fileno}; $condition .= " pid $self->{pid}" if defined $self->{pid}; $condition .= " ($self->{process})" if defined $self->{process}; } warn "Error on $condition: $errno\n"; $_[0]-> destroy; } # external writer process package IO::Events::Process::Write; use vars qw(@ISA); @ISA = qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; die "No `process'" unless defined $profile{process}; my $handle = IO::Handle-> new(); $handle-> autoflush(1); my $pid = open( $handle, "|$profile{process}"); die("Cannot fork:$!") unless defined $pid; $self = $self-> SUPER::new( write => 1, %profile, handle => $handle, pid => $pid, ); return $self; } # external reader process package IO::Events::Process::Read; use vars qw(@ISA); @ISA = qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; die "No `process'" unless defined $profile{process}; my $handle = IO::Handle-> new(); $handle-> autoflush(1); my $pid = open( $handle, "$profile{process}|"); die("Cannot fork:$!") unless defined $pid; $self = $self-> SUPER::new( read => 1, %profile, handle => $handle, pid => $pid, ); return $self; } # internal reader process package IO::Events::Fork::Read; use vars qw(@ISA); @ISA = qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; my $handle = IO::Handle-> new(); $handle-> autoflush(1); my $pid = open( $handle, "-|"); die("Cannot fork:$!") unless defined $pid; unless ( $pid) { # $profile{owner}->on_fork(); $|=1; my $on_fork = $profile{on_fork} || $self->can('on_fork'); $on_fork->(\%profile) if $on_fork; POSIX::_exit(0); } $self = $self-> SUPER::new( read => 1, %profile, handle => $handle, pid => $pid, ); return $self; } # internal writer process package IO::Events::Fork::Write; use vars qw(@ISA); @ISA = qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; my $handle = IO::Handle-> new(); $handle-> autoflush(1); my $pid = open( $handle, "|-"); die("Cannot fork:$!") unless defined $pid; unless ( $pid) { # $profile{owner}->on_fork(); my $on_fork = $profile{on_fork} || $self->can('on_fork'); $on_fork->(\%profile) if $on_fork; POSIX::_exit(0); } $self = $self-> SUPER::new( write => 1, %profile, handle => $handle, pid => $pid, ); return $self; } package IO::Events::internal::Shadow; use vars qw(@ISA); @ISA = qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; $profile{shadow_task} = $profile{owner}->{ids}->{$profile{id}}; $profile{id} = "shadow:$profile{id}"; my $ret = $self-> SUPER::new(%profile); return $ret; } sub on_close { undef $_[0]->{shadow_task}-> {shadow}; } sub on_error { my ( $self, $condition, $errno) = @_; $self-> {shadow_task}-> notify('on_error', $condition, $errno); } # internal bidirectional process package IO::Events::Fork::ReadWrite; use vars qw(@ISA); @ISA = qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; # reader my $handle1 = IO::Handle-> new(); $handle1-> autoflush(1); # writer my $handle2 = IO::Handle-> new(); $handle2-> autoflush(1); # fork & pipes pipe(READER, $handle2); pipe($handle1, WRITER); WRITER->autoflush(1); my $pid = fork(); die("Cannot fork:$!") unless defined $pid; unless ( $pid) { close $handle1; close $handle2; open STDOUT, ">&WRITER"; open STDIN, "<&READER"; # $profile{owner}->on_fork(); $|=1; my $on_fork = $profile{on_fork} || $self->can('on_fork'); $on_fork->(\%profile) if $on_fork; POSIX::_exit(0); } close WRITER; close READER; # create objects $self = $self-> SUPER::new( read => 1, %profile, handle => $handle1, pid => $pid, ); $self-> {shadow} = IO::Events::internal::Shadow-> new( write => 1, %profile, id => $self-> {id}, handle => $handle2, pid => $pid, on_write => \&shadow_write, on_close => \&shadow_close, ); return $self; } sub shadow_write { shift-> {shadow_task}-> notify('on_write'); } sub shadow_close { my $shadow = shift; $shadow-> {shadow_task}->{finished} = $shadow-> {finished}; $shadow-> {shadow_task}->{exitcode} = $shadow-> {exitcode}; $shadow-> {shadow_task}-> notify('on_close', 1); } sub shutdown { my ( $self, @cmd) = @_; for ( @cmd) { if ( $_ eq 'read') { $self-> SUPER::DESTROY; } elsif ( $_ eq 'write') { $self-> {shadow}-> DESTROY if $self-> {shadow}; } } } sub DESTROY { return if $_[0]->{dead}; $_[0]->{shadow}->DESTROY if $_[0]->{shadow}; $_[0]->SUPER::DESTROY; $_[0]->{dead} = 1; } sub write { my $self = shift; return unless $self->{shadow}; $self-> {shadow}-> write( @_) } # external bidirectional process package IO::Events::Process::ReadWrite; use vars qw(@ISA); @ISA = qw(IO::Events::Fork::ReadWrite); sub on_fork { exec( $_[0]->{process}) or POSIX::_exit(127); } package IO::Events::stdin; use vars qw(@ISA); @ISA=qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; $profile{id} = "stdin"; $profile{handle} = \*STDIN; $profile{read} = 1; $profile{auto_close} = 0; return $self-> SUPER::new(%profile); } package IO::Events::stdout; use vars qw(@ISA); @ISA=qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; $profile{id} = "stdout"; $profile{handle} = \*STDOUT; $profile{write} = 1; $profile{auto_close} = 0; return $self-> SUPER::new(%profile); } package IO::Events::stderr; use vars qw(@ISA); @ISA=qw(IO::Events::Handle); sub new { my ( $self, %profile) = @_; $profile{id} = "stderr"; $profile{handle} = \*STDERR; $profile{write} = 1; $profile{auto_close} = 0; return $self-> SUPER::new(%profile); } package IO::Events::Socket; use vars qw(@ISA); @ISA=qw(IO::Events::Handle); use Socket; sub accept { my ( $self, %profile) = @_; my $handle = IO::Handle-> new; accept( $handle, $self-> {handle}) or die "accept() error:$!"; return IO::Events::Handle-> new( owner => $self-> {owner}, handle => $handle, %profile, ); } sub connect { $_[0]-> {callback} = \&socket_connect_error_checker; } sub socket_connect_error_checker { my ( $self, $r, $w, $e) = @_; delete $self-> {callback}; local $! = unpack('i', getsockopt($self-> {handle}, SOL_SOCKET, SO_ERROR)); if ( $!) { $self-> {owner}-> error( $self, 'connect') if $self->{owner}; } } package IO::Events::Socket::TCP; use vars qw(@ISA); @ISA=qw(IO::Events::Socket); use strict; use Socket; use Fcntl; use Errno qw(EWOULDBLOCK EINPROGRESS); sub new { my ( $self, %profile) = @_; $profile{handle} = IO::Handle-> new unless $profile{handle}; die "Cannot create socket: $!" unless socket( $profile{handle}, PF_INET, SOCK_STREAM, getprotobyname('tcp')); unless ( $profile{nonblock}) { my $fl; $fl = fcntl( $profile{handle}, F_GETFL, 0); die "$!" unless defined $fl; fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!"; } if ( defined $profile{connect}) { my $iaddr; die "Cannot resolve host '$profile{connect}'" unless $iaddr = inet_aton( $profile{connect}); my $ok = connect( $profile{handle}, sockaddr_in( $profile{port}, $iaddr)); $ok = 1 if !$ok and ( $! == EWOULDBLOCK || $! == EINPROGRESS); die "Connect error: $!" unless $ok; } elsif ( exists $profile{listen}) { setsockopt( $profile{handle}, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or die "Error in setsockopt(SOL_SOCKET,SO_REUSEADDR,1):$!"; my $addr = $profile{addr} || '0.0.0.0'; my $inet = inet_aton( $addr); die "Cannot resolve host '$addr'" unless defined $inet; bind( $profile{handle}, sockaddr_in( $profile{port}, $inet)) or die "Error in bind($profile{port}, $addr):$!"; listen( $profile{handle}, SOMAXCONN); $profile{read} = -1; } my $this = $self-> SUPER::new(%profile); $this-> SUPER::connect() if $profile{connect}; return $this; } sub accept { my ( $self, %profile) = @_; my $client = $self-> SUPER::accept( %profile); my ($port, $ipaddr) = unpack_sockaddr_in( getpeername( $client->{handle})); $client-> {remote_addr} = inet_ntoa($ipaddr); $client-> {remote_port} = $port; return $client; } package IO::Events::Socket::UDP; use vars qw(@ISA); @ISA=qw(IO::Events::Socket); use strict; use Socket; use Fcntl; sub new { my ( $self, %profile) = @_; $profile{handle} = IO::Handle-> new unless $profile{handle}; die "Cannot create socket: $!" unless socket( $profile{handle}, PF_INET, SOCK_DGRAM, getprotobyname('udp')); unless ( $profile{nonblock}) { my $fl; $fl = fcntl( $profile{handle}, F_GETFL, 0); die "$!" unless defined $fl; fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!"; } my $addr = $profile{addr} || '0.0.0.0'; my $inet = inet_aton( $addr); die "Cannot resolve host '$addr'" unless defined $inet; if ( $profile{broadcast}) { setsockopt( $profile{handle}, SOL_SOCKET, SO_BROADCAST, pack("l", 1)) or die "Error in setsockopt(SOL_SOCKET,SO_BROADCAST,1):$!"; } $profile{read} = -2; bind( $profile{handle}, sockaddr_in( $profile{port} || 0, $inet)) or die "Error in bind($profile{port}, $addr):$!"; return $self-> SUPER::new(%profile); } sub recv { my ( $self, %profile) = @_; $profile{maxlen} = 32768 unless defined $profile{maxlen}; my $flags = MSG_DONTWAIT; $flags |= MSG_OOB if $profile{oob}; $flags |= MSG_PEEK if $profile{peek}; $flags |= MSG_WAITALL if $profile{waitall}; $flags &= ~MSG_DONTWAIT if defined($profile{nonblock}) and $profile{nonblock} == 0; my ( $port, $host); my $data = ''; $host = recv( $self-> {handle}, $data, $profile{maxlen}, $flags); unless ( defined $host) { $self-> error( 'recv'); return undef; } ( $port, $host) = sockaddr_in( $host); $self-> {remote_port} = $port; $self-> {remote_host} = gethostbyaddr( $host, AF_INET); return $data; } sub send { my ( $self, $addr, $port, $data, %profile) = @_; my $flags = 0; $flags |= MSG_OOB if $profile{oob}; $flags |= MSG_DONTROUTE if $profile{dontroute}; $flags |= MSG_EOR if $profile{eor}; $flags |= MSG_EOF if $profile{eof}; my $inet = inet_aton($addr) || die "unknown host '$addr'\n"; $inet = sockaddr_in( $port, $inet); my $ret = send( $self-> {handle}, $data, $flags, $inet); unless ( defined $ret) { $self-> error( 'recv'); return undef; } return $ret; } package IO::Events::Socket::UNIX; use vars qw(@ISA); @ISA=qw(IO::Events::Socket); use Socket; use Fcntl; sub new { my ( $self, %profile) = @_; $profile{handle} = IO::Handle-> new unless $profile{handle}; die "Cannot create socket: $!" unless socket( $profile{handle}, PF_UNIX, SOCK_STREAM, 0); unless ( $profile{nonblock}) { my $fl; $fl = fcntl( $profile{handle}, F_GETFL, 0); die "$!" unless defined $fl; fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!"; } if ( defined $profile{connect}) { connect( $profile{handle}, pack_sockaddr_un($profile{connect})) or die "connect($profile{connect}) error: $!"; } elsif ( exists $profile{listen}) { bind( $profile{handle}, pack_sockaddr_un($profile{listen})) or die "Error in bind($profile{listen}):$!"; listen( $profile{handle}, SOMAXCONN); $profile{read} = -1; } my $this = $self-> SUPER::new(%profile); $this-> SUPER::connect() if $profile{connect}; return $this; } package IO::Events::Timer; use Time::HiRes qw(time); sub new { my $class = shift; my $self = bless { timeout => 10000, repetitive => 0, active => 0, @_, }, $class; for ( qw(owner)) { die "No `$_' field" unless defined $self-> {$_}; } push @{$self-> {owner}-> {timers}}, $self; $self-> start if $self-> {active}; return $self; } sub DESTROY { my $self = $_[0]; return if $self-> {dead}; @{$self-> {owner}-> {timers}} = grep { $_ != $self } @{$self-> {owner}-> {timers}}; } sub start { my $self = $_[0]; $self-> {alert} = time + $self-> {timeout}; $self-> {active} = 1; } sub stop { $_[0]-> {active} = 0 } sub active { my ( $self, $active) = @_; return if $active == $self-> {active}; # to avoid restarts $active ? $self-> start : $self-> stop; } sub notify { my $self = $_[0]; if ( $self-> {repetitive}) { my $time = time; # eat up late events $self-> {alert} += $self-> {timeout} while $self-> {alert} < $time; } else { $self-> {active} = 0; } $self-> {event_flag} = 0; if ( defined $self->{on_tick}) { $self->{on_tick}->($self); return if $self->{event_flag}; } $self-> on_tick() if $self-> can('on_tick'); } 1; __DATA__