IO::Events - Non-blocking IO using events


IO-Events documentation Contained in the IO-Events distribution.

Index


Code Index:

NAME

Top

IO::Events - Non-blocking IO using events

SYNOPSIS

Top

Example 1, run 'bc' as a co-process:

	use IO::Events;

	my $loop = IO::Events::Loop-> new();

	my $stdin_alive = 1;
	my $calculator = IO::Events::Process::ReadWrite-> new(
		owner    => $loop,
		process  => 'bc -l', 
		on_read  => sub {
			while ( my $line = $_[0]-> readline) {
				print "bc says: $line";
			}
		},
		on_close => sub {
			exit 1 if $stdin_alive; # fork/exec error
		}
	);

	my $stdin = IO::Events::stdin-> new(
		owner => $loop,
		on_read => sub { 
		$calculator-> write( $_[0]-> read );
		},
		on_close => sub {
			$stdin_alive = 0;
			exit;
		},
	);

	$loop-> yield while 1;




Example 2, connect to/listen on a TCP port within a single process:

	use IO::Events;

	my $loop = IO::Events::Loop-> new();
	IO::Events::Socket::TCP-> new(
		owner    => $loop,
		listen   => 1,
		port     => 10000,
		on_read => sub {
			my $new = shift-> accept( 
				read   => 1,
				on_read => sub {
					while ( my $line = $_[0]-> readline) {
						print "client says: $line\n";
						exit;
					}
				}
			);
	                print "connect from $new->{remote_addr}:$new->{remote_port}\n";
		},
	);

	IO::Events::Socket::TCP-> new(
		owner   => $loop,
		connect => 'localhost',
		port 	=> 10000,
	)-> write("hello, tcp socket!\n");

	$loop->yield while 1;

Example 3, connect to/listen on a UDP port within a single process:

	use Socket;
	use IO::Events;

	my $loop = IO::Events::Loop-> new();
	IO::Events::Socket::UDP-> new(
		owner    => $loop,
		port     => 10000,
		on_read => sub {
			my $self = $_[0];
			my $data = $self-> recv;
			print "$self->{remote_host}:$self->{remote_port} says: $data";
			exit;
		},
	);

	IO::Events::Socket::UDP-> new(
		owner   => $loop,
	)-> send( 'localhost', 10000, "hello, udp socket!\n");

	$loop->yield while 1;

DESCRIPTION

Top

The module implements object-oriented approach to select-driven events and contains set of convenience objects for inter-process communication.

The main part of the module is the 'loop' instance of IO::Events::Loop class, which knows about all IO handles subject to select(). The handles are inherited from IO::Events::Handle class, which property handle holds reference to a IO handle object, - a file scalar or IO::Handle instance. IO::Events::Handle object propagates select-based events - on_read, on_write, on_exception, as well as generic on_create, on_close and on_error. It is a requirement that handles are non-blocking.

All instances are created by calling new with arbitrary number of named parameters. The unrecognized parameters are stored in the object and cause no conflict:

	my $a = IO::Events::Handle-> new( my_var => 1 );
	die $a->{my_var};

The module is not meant to serve as replacement of IO::Select and IPC::Open, and can perfectly live together with the first and counteract with the handles managed by the second. The example in "SYNOPSIS" section displays how to harness the non-blocking IO between stdin and a co-process.

Advices

* Set $SIG{PIPE} = 'IGNORE', usually a sound idea.
* Set $|=1 first in your script if you use ::Fork or ::Process classes.
* If you use on_fork, never terminate a child process by die or exit, because otherwise everything Perl allocated in parent process will be de-allocated twice, both in the parent and in the child process. If you must, use POSIX::exit instead.

IO::Events::Loop

Top

new() parameters

timeout INTEGER

Number of seconds passed to select() as the fourth parameter.

Default value: 50

waitpid BOOLEAN

In addition to select(), IO::Events::Loop also waits for finished processes, automatically getting rid of handles associated with them, if set to 1.

Default value: 1

Methods

handles

Returns number of handles owner by loop object. When a program automatically disposes of handles, not needed anymore, it may choose to stop when there are no more handles to serve.

yield %HASH

Enters select() loop and dispatches pending IO if data are available to read or write. Hash values of 'block_read', 'block_write', and 'block_exc' can be set to 1 if read, write, or exception events are not to be used. Practically,

	$loop-> yield( block_read => 1, block_exc => 1, timeout => 0 )

call effectively ( but still in the non-blocking fashion ) flushes write buffers.

Returns result of select() call, the number of streams handled.

flush

Flushes write sockets, if possible.

Fields

%id

All handles are assigned an unique id, and are stored in internal {id} hash. This hash is read-only, and can be used to look-up a handle object by its id string.

%filenos

Hash of file numbers, read-only.

%processes

Hash of PIDs associated with handles, read-only. Used for IPC and waitpid results processing.

IO::Events::Handle

Top

Root of IO handle object hierarchy, dispatches IO events and auto-destructs when handle is closed or an IO error occurs. The explicit destruction is invoked by calling destroy, which is reentrant-safe.

Parameters to new()

auto_close BOOLEAN

If set to 1, IO handle is explicitly closed as the object instance is destroyed. Doesn't affect anything otherwise.

Default value: 1

flush [ DISCARD = 0]

Flushes not yet written data. If DISCARD is 1, does not return until all data are written or error is signalled. If 0, discards all data.

handle IOHANDLE

IO::Handle object or a file scalar. If not specified, a new IO::Handle instance is created automatically.

The handle is set to non-blocking mode. If this is already done, nonblock optional boolean parameter can be set to 1 to prevent extra fcntl calls.

read BOOLEAN

Set to 1 if handle is to be read from.

Default value: 0

shutdown @WHO

Defined in IO::Events::Fork::ReadWrite and IO::Event::IPC::Process::ReadWrite namespaces. @WHO can contain string 'read' and 'write', to tell what part of bidirectional IPC is to be closed.

write BOOLEAN

Set to 1 if handle is to be written from.

Default value: 0

pid INTEGER

If a handle is associated with a process, IO::Events::Loop uses this field to waitpid() and auto-destruct the handle.

Default value: undef

Methods

can_read BOOLEAN

Selects whether the handle is readable.

can_write BOOLEAN

Selects whether the handle is writable.

can_exception BOOLEAN

Selects whether the handle accepts exception events.

readline

Returns newline-ended read data, if available.

read

Return contents of the read buffer.

write DATA

Appends DATA to the write buffer

destroy

Destructs handle instance

notify $EVENT, @PARAMETERS

Dispatches EVENT, passing PARAMETERS to each callback.

Events

A single event can cause several callback routines to be called. This is useful when a class declares its own, for example, cleanup code in on_close sub, whereas the class instance user can add another listener to the same on_close notification:

	package MyHandle;
	...
	sub on_close { ... }
	...

	MyHandle-> new( on_close => sub { ... });

The class declares static ( per-class ) instance of hash %events, which contains declaration of events and their execution flow. SINGLE-declared events call only single callback, be it static or dynamic. MULTIPLE-declared events call all callbacks, but execution flow can be stopped by setting {event_flag} to 1. This is useful, for example, to dynamically override default behavior of IO::Events::Handle::on_error which emits a warning message to stderr and destroys the handle.

on_close

Called before object instance is destroyed.

In a special case for ReadWrite objects, on_close is called twice, when read and write handles are closed. To distinuish between the cases, the second parameter is set to 1 when on_close is called due to the writer handle destruction.

Declared as MULTIPLE.

on_create

Called after object instance is created.

Declared as MULTIPLE.

on_error

Called when read or write calls encounter error.

Declared as MULTIPLE.

on_exception

Called when exception is arrived. Consult your system select manpage to see what events and on what socket types can be expected.

on_fork

Special event, called by IO::Events::Fork objects when a new process is instantiated. Although the syntax for specifying on_fork is the same as for the other events, on_fork does not interact with these, and is not called from within yield.

When on_fork returned, process is terminated. If you wish to terminate process yourself, do not call perl's exit but rather POSIX::_exit, since otherwise perl stuctures created before fork will be destroyed twice.

on_read

Called after data is read.

on_write

Called when handle is writable and the write buffer is empty. If the event doesn't fill the write buffer, the handle write flag is cleared and further on_write notifications are suppressed until the write buffer is filled.

IO::Events::Process::Read

Top

Runs a process with its stdout tied to a newly created handle. The process name is passed to process parameter to the new() contructor.

IO::Events::Process::Write

Top

Runs a process with its stdin tied to a newly created handle. The process name is passed to process parameter to the new() contructor.

IO::Events::Process::ReadWrite

Top

Runs a process with its stdin and stdout tied to two newly created handles. The both handles are transparently mapped to a single handle object.

Note: check IPC::Open2 and IPC::Open3 also.

IO::Events::Fork::Read

Top

Forks a child with its stdout tied to a newly created handle.

IO::Events::Fork::Write

Top

Forks a child with its stdin tied to a newly created handle.

IO::Events::Fork::ReadWrite

Top

Forks a child with its stdin and stdout tied to two newly created handles. The both handles are transparently mapped to a single handle object.

IO::Events::stdin

Top

Shortcut class for STDIN handle.

IO::Events::stdout

Top

Shortcut class for STDOUT handle.

IO::Events::stderr

Top

Shortcut class for STDERR handle.

IO::Events::Socket::TCP

Top

Shortcut class for TCP socket. Parameters accepted:

connect HOSTNAME

If set, connect() call is issued on the socket to HOSTNAME and port set in port parameter.

listen

If set, socket listens on port.

port INTEGER

Number of a port to bind to.

addr STRING

If set, socket listens on addr IP address, otherwise INADDR_ANY.

accept %PROFILE

Creates a new IO handle and accepts a connection into it. Returns the newly created IO::Events::Handle object with %PROFILE fields.

IO::Events::Socket::UDP

Top

Shortcut class for UDP socket. Parameters accepted:

port INTEGER

Number of a port to bind to.

addr STRING

If set, socket listens on addr IP address, otherwise INADDR_ANY.

send HOSTNAME, PORT, DATA, %OPTIONS

Issues send(2) call, returns number of bytes sent or an error. See send in perldoc for more details.

Options accepted: oob, dontroute, eor, eof.

recv %OPTIONS

Issues recv(2) call, returns data block or under if error. See recv in perldoc for more details.

Options accepted: oob, peek, waitall, nonblock, maxlen.

IO::Events::Socket::UNIX

Top

A handle class, used to connect to and listen on UNIX sockets.

accept %PROFILE

Creates a new IO handle and accepts a connection into it. Returns the newly created IO::Events::Handle object with %PROFILE fields.

connect PATHNAME

Connects to a socket over a given PATHNAME

listen PATHNAME

Listens on a socket over a given PATHNAME

IO::Events::Timer

Top

A tiny hackish hask to add time-based events. The class is not inherited from IO::Event::Handle, and the only property it shared with the other handle classes is owner.

timeout SECONDS

Invokes on_tick callback in SECONDS, which can be float.

active BOOLEAN

If on, timer is started immediately, otherwise is stopped.

repetitive BOOLEAN

If on, timer fires off event each SECONDS interval, otherwise goes off after the first on_tick.

start

Starts the timer

stop

Stops the timer

on_tick

Callback invoked each time SECONDS interval is expired.

SEE ALSO

Top

perlipc, POE, IO::Handle, IO::Select, IPC::Open2.

COPYRIGHT

Top

AUTHOR

Top

Dmitry Karasik <dmitry@karasik.eu.org>


IO-Events documentation Contained in the IO-Events distribution.

#
#  Copyright (c) 2004 catpipe Systems ApS
#  All rights reserved.
#
#  Redistribution and use in source and binary forms, with or without
#  modification, are permitted provided that the following conditions
#  are met:
#  1. Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#  2. Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
#
#  THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
#  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#  ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
#  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
#  OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
#  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
#  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
#  OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
#  SUCH DAMAGE.
#

# $Id: Events.pm,v 1.32 2007/03/28 08:01:59 dk Exp $
use strict;

package IO::Events;
use vars qw($VERSION $FORK_MODE @loops);
$VERSION=0.6;

# Master loop object
package IO::Events::Loop;
use vars qw(@ISA);

use IO::Handle;
use Errno qw(EAGAIN);
use POSIX qw(sys_wait_h exit);
use Time::HiRes qw(time);

sub new
{
	my $class = shift;
	my $obj = bless {
		# options
		debug     => 0,
		timeout   => 50, # seconds
		waitpid   => 1,
		@_,

		# private fields
		read      => '',
		write     => '',
		exc       => '',
		processes => {},
		filenos   => {},
		ids       => {},
		timers    => [],
	}, $class;
	push @IO::Events::loops, $obj;
	return $obj;
}

sub yield
{
	my ( $self, %profile) = @_;

	my ( $ir, $iw, $ie) = ( 
		$self->{read}, 
		$profile{block_write} ? '' : $self-> {write}, 
		$profile{block_exc} ? '' : $self-> {exc}
	);

	my $timeout = exists $profile{timeout} ? $profile{timeout} : $self->{timeout};
	if ( @{$self->{timers}}) {
		my $time = time;
		for my $timer (@{$self->{timers}}) {
			next unless $timer->{active};
			my $sleep = $timer->{alert} - $time;
			$timeout = $sleep if $timeout > $sleep;
		}
		$timeout = 0 if $timeout < 0;
	}
	my $n = select( $ir, $iw, $ie, $timeout);

	if ( @{$self->{timers}}) {
		my $time = time;
		for my $timer (@{$self->{timers}}) {
			next if not $timer->{active} or $time < $timer->{alert};
			$timer-> notify;
		}
	}
	
	unless ( $n > 0) {
		if ( $self->{debug}) {
			print STDERR "IO::Events: empty select";
			if ( $n < 0) {
				print STDERR " error:$!";
			}
			print STDERR "\n";
		}
		goto WAITPID;
	}

	my $i;
	my $lnx = (sort { $a <=> $b } map { length } ( $ir, $iw, $ie))[-1] * 8;
	for ( $i = 0; $i < $lnx; $i++) {
		my ( $r, $w, $e) = ( vec( $ir, $i, 1), vec( $iw, $i, 1), vec( $ie, $i, 1));
		next unless $r || $w || $e;
		my $task;
		if ( exists $self-> {filenos}-> {$i} && 
			exists $self->{ids}->{$self-> {filenos}-> {$i}}) {
			$task = $self->{ids}->{$self-> {filenos}-> {$i}};
		} else {
			print STDERR "IO::Events: runaway handle $i/$self->{filenos}->{$i}\n" 
				if $self->{debug};
			$self-> error( undef, 'select');
			next;
		}
		if ( $task-> {callback} and not $task-> {dead} ) {
			$task-> {callback}-> ( $task, $r, $w, $e);
		}

		if ( $r and not $task-> {dead}) {
			my $nbytes;
			if ( $task-> {read} > -1) {
				$nbytes = sysread( $task->{handle}, $task->{read_buffer}, 
					$profile{block_read} ? 0 : 65536, length ($task->{read_buffer}));
			}
			if ( $task->{read} > 0) {
				print STDERR "IO::Events: # $i read $nbytes bytes\n" 
					if $self->{debug};
				unless ( defined $nbytes) {
					$self-> error( $task, 'read') unless $! == EAGAIN;
					next;
				}
			} else {
				$nbytes = 1 unless defined $nbytes; # simulate read
				print STDERR "IO::Events: # $i simulated read $nbytes\n" 
					if $self->{debug};
			}
			next if $profile{block_read};
			if ( $nbytes > 0) {
				$task-> notify('on_read');
				next;
			}
			$task-> destroy unless $task-> {pid};
		} 

		if ( $w and not $task-> {dead}) {
			unless ( length $task->{write_buffer}) {
				vec( $self->{write}, $task-> {fileno}, 1) = 0;
				$task-> notify('on_write');
				next;
			}
			my $nbytes = syswrite( $task->{handle}, $task->{write_buffer});
			print STDERR "IO::Events: # $i wrote $nbytes bytes\n" 
				if $self->{debug};
			unless ( defined $nbytes) {
				$self-> error( $task, 'write') unless $! == EAGAIN;
				next;
			}
			if ( $nbytes > 0) {
				substr( $task->{write_buffer}, 0, $nbytes) = '';
				unless ( length $task->{write_buffer}) {
					vec( $self->{write}, $task-> {fileno}, 1) = 0;
					$task-> notify('on_write');
				}
				next;
			}
		}
		
		if ( $e and not $task-> {dead}) {
			print STDERR "IO::Events: exception $i\n" if $self->{debug};
			$task-> notify('on_exception');
		} 
	}

	# close processes
WAITPID:   
	if ( $self-> {waitpid}) {
		while (($_ = waitpid(-1,WNOHANG)) > 0) {
			next unless $self->{processes}->{$_};
			my @tasks = map { $self-> {ids}-> {$_}} @{$self->{processes}-> {$_}};
			# read leftovers
			for my $task ( @tasks) {
				if ( $task-> can_read && $task-> {read} > 0) {
					my $notify;
					while ( 1) {
						my $nbytes = sysread( $task->{handle}, 
							$task->{read_buffer}, 65536, 
							length ($task->{read_buffer}));
						unless ( defined $nbytes) {
							$self-> error( $task, 'read') 
								unless $! == EAGAIN;
							last;
						}
						$notify += $nbytes;
						last unless $nbytes;
					}
					$task-> notify('on_read') if $notify;
				}
				# XXX if $task-> can_exception ... read URG bytes?
				$task->{exitcode} = $?;
				$task->{finished} = 1;
			}
			for my $task ( @tasks) {
				$task-> destroy;
			}
		}
	}

	return $n;
}

sub handles
{
	return scalar(keys %{$_[0]->{ids}});
}

sub flush
{
	shift-> yield( block_read => 1, block_exc => 1, timeout => 0);
}

sub error 
{ 
	my ( $self, $task, $condition) = @_;
	$task-> notify('on_error', $condition, $!) if $task;
}

sub on_fork
{
	$IO::Events::FORK_MODE = 1;
	shift-> DESTROY;
	$IO::Events::FORK_MODE = undef;
}

sub DESTROY
{
	my $self = $_[0];
	return if $self->{dead};
	for ( values %{$self->{ids}}) {
		next unless $_;
		$_->{dead} = 1 if $IO::Events::FORK_MODE;
		$_-> destroy;
	}
	for ( @{$self->{timers}}) {
		next unless $_;
		$_->{dead} = 1;
	}
	@IO::Events::loops = grep { $self != $_ } @IO::Event::IPC::loops;
	$self-> {dead} = 1;
}

END
{
	for ( @IO::Events::loops) {
		eval { $_->DESTROY };
		warn "$@" if $@;
	}
	@IO::Events::loops = ();
}

# Single task
package IO::Events::Handle;
use vars qw(@ISA %events);

use Errno qw(EAGAIN);

use constant SINGLE   => 1;
use constant MULTIPLE => 2;

%events = (
	on_read      => SINGLE,
	on_write     => SINGLE,
	on_exception => SINGLE,
	on_close     => MULTIPLE,
	on_create    => MULTIPLE,
	on_error     => MULTIPLE,
);

use Fcntl;

sub new
{
	my $class = shift;
	my $self = bless {
		auto_close   => 1,
		finished     => 0,
		exitcode     => 0,
		read_buffer  => '',
		write_buffer => '',
		write        => 0,
		read         => 0,
		exception    => 0,
		pid          => undef,
		@_,
	}, $class;
	$self->{handle} = IO::Handle-> new() unless defined $self->{handle};
	for ( qw(owner)) {
		die "No `$_' field" unless defined $self-> {$_};
	}
	$self-> {id} = "$self" unless defined $self-> {id};
	my $owner = $self->{owner};
	die "Id `$self->{id}` already present" if exists $owner->{ids}->{$self->{id}};
	my $fno = fileno( $self->{handle});
	die "Cannot read fileno() from handle" unless defined $fno;
	$self-> {fileno} = $fno;
	unless ( $self-> {nonblock}) {
		my $fl;
		$fl = fcntl( $self->{handle}, F_GETFL, 0);
		die "$!" unless defined $fl;
		fcntl( $self->{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
	}
	if ($self-> {write}) {
		vec( $owner-> {write}, $fno, 1) = 1;
		#print "write\n";
	}
	if ($self-> {read}) {
		vec( $owner-> {read}, $fno, 1) = 1;
		#print "read\n";
	}
	if ($self-> {exception}) {
		vec( $owner-> {exc}, $fno, 1) = 1;
	}
	$owner-> {filenos}-> {$fno} = $self-> {id};
	push @{$owner-> {processes}-> {$self->{pid}}}, $self-> {id} if defined $self->{pid};
	$owner-> {ids}-> {$self->{id}} = $self;
	$self-> notify('on_create');
	return $self;
}

sub can_read
{
	return vec( $_[0]->{owner}->{read}, $_[0]-> {fileno}, 1) unless $#_;
	vec( $_[0]->{owner}->{read}, $_[0]-> {fileno}, 1) = $_[1];
	$_[0]-> {read} = $_[1];
}

sub can_write
{
	return vec( $_[0]->{owner}->{write}, $_[0]-> {fileno}, 1) unless $#_;
	vec( $_[0]->{owner}->{write}, $_[0]-> {fileno}, 1) = $_[1];
	$_[0]-> {write} = $_[1];
}

sub can_exception
{
	return vec( $_[0]->{owner}->{exc}, $_[0]-> {fileno}, 1) unless $#_;
	vec( $_[0]->{owner}->{exc}, $_[0]-> {fileno}, 1) = $_[1];
	$_[0]-> {exception} = $_[1];
}

sub DESTROY
{
	my $self = $_[0];
	return if $self->{dead};
	$self->{dead} = 1;
	$self-> flush;
	$self-> notify('on_close');
	$self-> {handle}-> close
		if defined $self->{handle} && $self->{auto_close};
	if ( defined $self->{owner}) {
		if ( defined $self->{fileno}) {
			vec( $self-> {owner}-> {exc},   $self->{fileno}, 1) = 0;
			vec( $self-> {owner}-> {write}, $self->{fileno}, 1) = 0;
			vec( $self-> {owner}-> {read}, $self->{fileno}, 1) = 0;
			delete $self-> {owner}-> {filenos}-> {$self->{fileno}};
		}
		if (defined $self->{pid}) {
			my $p = $self-> {owner}-> {processes}-> {$self->{pid}};
			@$p = grep { $_ ne $self->{id}} @$p;
			delete $self-> {owner}-> {processes}-> {$self->{pid}} unless @$p;
		}
		delete $self-> {owner}-> {ids}-> {$self->{id}};
	}
	delete $self->{fileno};
	delete $self->{id};
}

sub readline
{
	return $1 if $_[0]-> {read_buffer} =~ s/^([^\n]*\n)//;
	return undef;
}

sub read
{
	my $c = $_[0]-> {read_buffer};
	substr( $_[0]-> {read_buffer}, 0) = '';
	return $c;
}

sub write
{
	my ( $self, $data) = @_;
	$self-> {write_buffer} .= $data;
	vec( $self->{owner}->{write}, $self-> {fileno}, 1) = 1 if $self->{owner} and defined $self->{fileno};

	my $nbytes = syswrite( $self->{handle}, $self->{write_buffer});
	unless ( defined $nbytes) {
		$self-> {owner}-> error( $self, 'write') if $self->{owner} && $! != EAGAIN;
		$nbytes = 0 if $! == EAGAIN;
	} elsif ( $nbytes > 0) {
		substr( $self->{write_buffer}, 0, $nbytes) = '';
	}
	$nbytes;
}

sub flush
{
	my ( $self, $discard) = @_;
	if ( $discard) {
		$self-> {write_buffer} = '';
	} else {
		while ( length $self-> {write_buffer}) {
			return undef unless defined $self-> write('');
		}	 
	}
	return 1;
}

sub destroy { shift-> DESTROY }

sub notify
{
	my ( $self, $event, @params) = @_;
	die( "Unexistent event `$event'") unless $events{$event};
	
	$self-> {event_flag} = 0;
	if ( exists $self->{$event}) {
		$self->{$event}->($self,@params);
		return if $events{$event} == SINGLE || $self->{event_flag};
	}
	$self-> $event(@params) if $self-> can($event);
}

sub on_error
{
	my ( $self, $condition, $errno) = @_;
	if ( $self) {
		$condition .= ' '.ref($self);
		$condition .= ",#$self->{fileno}" if defined $self->{fileno};
		$condition .= " pid $self->{pid}" if defined $self->{pid};
		$condition .= " ($self->{process})" if defined $self->{process};
	}
	warn "Error on $condition: $errno\n";
	$_[0]-> destroy;
}


# external writer process
package IO::Events::Process::Write;
use vars qw(@ISA);
@ISA = qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	die "No `process'" unless defined $profile{process};
	my $handle = IO::Handle-> new();
	$handle-> autoflush(1);
	my $pid = open( $handle, "|$profile{process}");
	die("Cannot fork:$!") unless defined $pid;

	$self = $self-> SUPER::new( 
		write => 1, 
		%profile, 
		handle => $handle,
		pid    => $pid,
	);
	return $self;
}

# external reader process
package IO::Events::Process::Read;
use vars qw(@ISA);
@ISA = qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	die "No `process'" unless defined $profile{process};
	my $handle = IO::Handle-> new();
	$handle-> autoflush(1);
	my $pid = open( $handle, "$profile{process}|");
	die("Cannot fork:$!") unless defined $pid;

	$self = $self-> SUPER::new( 
		read => 1, 
		%profile, 
		handle => $handle,
		pid    => $pid,
	);
	return $self;
}

# internal reader process
package IO::Events::Fork::Read;
use vars qw(@ISA);
@ISA = qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	my $handle = IO::Handle-> new();
	$handle-> autoflush(1);
	my $pid = open( $handle, "-|");
	die("Cannot fork:$!") unless defined $pid;
	unless ( $pid) {
		# $profile{owner}->on_fork();
		$|=1;
		my $on_fork = $profile{on_fork} || $self->can('on_fork');
		$on_fork->(\%profile) if $on_fork;
		POSIX::_exit(0);
	}

	$self = $self-> SUPER::new( 
		read => 1, 
		%profile, 
		handle => $handle,
		pid    => $pid,
	);
	return $self;
}

# internal writer process
package IO::Events::Fork::Write;
use vars qw(@ISA);
@ISA = qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	my $handle = IO::Handle-> new();
	$handle-> autoflush(1);
	my $pid = open( $handle, "|-");
	die("Cannot fork:$!") unless defined $pid;
	unless ( $pid) {
		# $profile{owner}->on_fork();
		my $on_fork = $profile{on_fork} || $self->can('on_fork');
		$on_fork->(\%profile) if $on_fork;
		POSIX::_exit(0);
	}

	$self = $self-> SUPER::new( 
		write => 1, 
		%profile, 
		handle => $handle,
		pid    => $pid,
	);
	return $self;
}

package IO::Events::internal::Shadow;
use vars qw(@ISA);
@ISA = qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	$profile{shadow_task} = $profile{owner}->{ids}->{$profile{id}};
	$profile{id} = "shadow:$profile{id}";
	my $ret = $self-> SUPER::new(%profile);
	return $ret;
}

sub on_close
{
	undef $_[0]->{shadow_task}-> {shadow};
}

sub on_error
{
	my ( $self, $condition, $errno) = @_;
	$self-> {shadow_task}-> notify('on_error', $condition, $errno);
}


# internal bidirectional process
package IO::Events::Fork::ReadWrite;
use vars qw(@ISA);
@ISA = qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;

	# reader
	my $handle1 = IO::Handle-> new();
	$handle1-> autoflush(1);
		
	# writer
	my $handle2 = IO::Handle-> new();
	$handle2-> autoflush(1);

	# fork & pipes
	pipe(READER, $handle2);
	pipe($handle1, WRITER);
	WRITER->autoflush(1);
		
	my $pid = fork();
	die("Cannot fork:$!") unless defined $pid;
	
	unless ( $pid) {
		close $handle1;
		close $handle2;
		open STDOUT, ">&WRITER";
		open STDIN,  "<&READER";

		# $profile{owner}->on_fork();
		$|=1;
		my $on_fork = $profile{on_fork} || $self->can('on_fork');
		$on_fork->(\%profile) if $on_fork;
		POSIX::_exit(0);
	} 

	close WRITER;   
	close READER;   

	# create objects
	$self = $self-> SUPER::new( 
		read => 1, 
		%profile, 
		handle => $handle1,
		pid    => $pid,
	);

	$self-> {shadow} = IO::Events::internal::Shadow-> new( 
		write => 1, 
		%profile, 
		id     => $self-> {id},
		handle => $handle2,
		pid    => $pid,
		on_write => \&shadow_write,
		on_close => \&shadow_close,
	);

	return $self;
}

sub shadow_write
{
	shift-> {shadow_task}-> notify('on_write');
}

sub shadow_close
{
	my $shadow = shift;
	$shadow-> {shadow_task}->{finished} = $shadow-> {finished};
	$shadow-> {shadow_task}->{exitcode} = $shadow-> {exitcode};
	$shadow-> {shadow_task}-> notify('on_close', 1);
}

sub shutdown
{
	my ( $self, @cmd) = @_;
	for ( @cmd) {
		if ( $_ eq 'read') {
			$self-> SUPER::DESTROY;
		} elsif ( $_ eq 'write') {
			$self-> {shadow}-> DESTROY if $self-> {shadow};
		}
	}
}

sub DESTROY
{
	return if $_[0]->{dead};
	$_[0]->{shadow}->DESTROY if $_[0]->{shadow};
	$_[0]->SUPER::DESTROY;
	$_[0]->{dead} = 1;
}

sub write { 
	my $self = shift;
	return unless $self->{shadow};
	$self-> {shadow}-> write( @_) 
}

# external bidirectional process
package IO::Events::Process::ReadWrite;
use vars qw(@ISA);
@ISA = qw(IO::Events::Fork::ReadWrite);

sub on_fork
{
	exec( $_[0]->{process}) or POSIX::_exit(127);
}

package IO::Events::stdin;
use vars qw(@ISA);
@ISA=qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	$profile{id}     = "stdin";
	$profile{handle} = \*STDIN;
	$profile{read}   = 1;
	$profile{auto_close} = 0;
	return $self-> SUPER::new(%profile);
}

package IO::Events::stdout;
use vars qw(@ISA);
@ISA=qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	$profile{id}     = "stdout";
	$profile{handle} = \*STDOUT;
	$profile{write}  = 1;
	$profile{auto_close} = 0;
	return $self-> SUPER::new(%profile);
}

package IO::Events::stderr;
use vars qw(@ISA);
@ISA=qw(IO::Events::Handle);

sub new
{
	my ( $self, %profile) = @_;
	$profile{id}     = "stderr";
	$profile{handle} = \*STDERR;
	$profile{write}  = 1;
	$profile{auto_close} = 0;
	return $self-> SUPER::new(%profile);
}

package IO::Events::Socket;
use vars qw(@ISA);
@ISA=qw(IO::Events::Handle);

use Socket;

sub accept
{
	my ( $self, %profile) = @_;
	my $handle = IO::Handle-> new;
	accept( $handle, $self-> {handle}) or die "accept() error:$!";
	return IO::Events::Handle-> new(
		owner  => $self-> {owner},
		handle => $handle,
		%profile,
	);
}

sub connect
{
	$_[0]-> {callback} = \&socket_connect_error_checker;
}

sub socket_connect_error_checker
{
	my ( $self, $r, $w, $e) = @_;
	delete $self-> {callback};

	local $! = unpack('i', getsockopt($self-> {handle}, SOL_SOCKET, SO_ERROR)); 
	if ( $!) {
		$self-> {owner}-> error( $self, 'connect') if $self->{owner};
	}
}

package IO::Events::Socket::TCP;
use vars qw(@ISA);
@ISA=qw(IO::Events::Socket);

use strict;
use Socket;
use Fcntl;
use Errno qw(EWOULDBLOCK EINPROGRESS);

sub new
{
	my ( $self, %profile) = @_;

	$profile{handle} = IO::Handle-> new unless $profile{handle};
	die "Cannot create socket: $!" unless 
		socket( $profile{handle}, PF_INET, SOCK_STREAM, getprotobyname('tcp'));

	unless ( $profile{nonblock}) {
		my $fl;
		$fl = fcntl( $profile{handle}, F_GETFL, 0);
		die "$!" unless defined $fl;
		fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
	}

	if ( defined $profile{connect}) {
		my $iaddr;
		die "Cannot resolve host '$profile{connect}'" unless
			$iaddr = inet_aton( $profile{connect});
		my $ok = connect( $profile{handle}, sockaddr_in( $profile{port}, $iaddr));
		$ok = 1 if !$ok and ( $! == EWOULDBLOCK || $! == EINPROGRESS);
		die "Connect error: $!" unless $ok;
	} elsif ( exists $profile{listen}) {
		setsockopt( $profile{handle}, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or 
			die "Error in setsockopt(SOL_SOCKET,SO_REUSEADDR,1):$!";
		my $addr = $profile{addr} || '0.0.0.0';
		my $inet = inet_aton( $addr);
		die "Cannot resolve host '$addr'" unless defined $inet;
		bind( $profile{handle}, sockaddr_in( $profile{port}, $inet)) or
			die "Error in bind($profile{port}, $addr):$!";
		listen( $profile{handle}, SOMAXCONN);
		$profile{read} = -1;
	}

	my $this = $self-> SUPER::new(%profile);
	$this-> SUPER::connect() if $profile{connect};
	return $this;
}

sub accept
{
	my ( $self, %profile) = @_;
	my $client = $self-> SUPER::accept( %profile);
	my ($port, $ipaddr) = unpack_sockaddr_in( getpeername( $client->{handle}));
	$client-> {remote_addr} = inet_ntoa($ipaddr);
	$client-> {remote_port} = $port;
	return $client;
}

package IO::Events::Socket::UDP;
use vars qw(@ISA);
@ISA=qw(IO::Events::Socket);

use strict;
use Socket;
use Fcntl;

sub new
{
	my ( $self, %profile) = @_;

	$profile{handle} = IO::Handle-> new unless $profile{handle};
	die "Cannot create socket: $!" unless 
		socket( $profile{handle}, PF_INET, SOCK_DGRAM, getprotobyname('udp'));

	unless ( $profile{nonblock}) {
		my $fl;
		$fl = fcntl( $profile{handle}, F_GETFL, 0);
		die "$!" unless defined $fl;
		fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
	}
		
	my $addr = $profile{addr} || '0.0.0.0';
	my $inet = inet_aton( $addr);
	die "Cannot resolve host '$addr'" unless defined $inet;
		
	if ( $profile{broadcast}) {
		setsockopt( $profile{handle}, SOL_SOCKET, SO_BROADCAST, pack("l", 1)) or 
			die "Error in setsockopt(SOL_SOCKET,SO_BROADCAST,1):$!";
	}
	$profile{read} = -2;
	
	bind( $profile{handle}, sockaddr_in( $profile{port} || 0, $inet)) or
		die "Error in bind($profile{port}, $addr):$!";

	return $self-> SUPER::new(%profile);
}

sub recv
{
	my ( $self, %profile) = @_;

	$profile{maxlen} = 32768 unless defined $profile{maxlen};

	my $flags = MSG_DONTWAIT;
	$flags |= MSG_OOB if $profile{oob};
	$flags |= MSG_PEEK if $profile{peek};
	$flags |= MSG_WAITALL if $profile{waitall};
	$flags &= ~MSG_DONTWAIT if defined($profile{nonblock}) and $profile{nonblock} == 0;
	
	my ( $port, $host);
	my $data = '';
	$host = recv( $self-> {handle}, $data, $profile{maxlen}, $flags);
	unless ( defined $host) {
		$self-> error( 'recv');
		return undef;
	}
	( $port, $host) = sockaddr_in( $host);
	$self-> {remote_port} = $port;
	$self-> {remote_host} = gethostbyaddr( $host, AF_INET);

	return $data;
}

sub send
{
	my ( $self, $addr, $port, $data, %profile) = @_;

	my $flags = 0;
	$flags |= MSG_OOB if $profile{oob};
	$flags |= MSG_DONTROUTE if $profile{dontroute};
	$flags |= MSG_EOR if $profile{eor};
	$flags |= MSG_EOF if $profile{eof};
	
	my $inet = inet_aton($addr) || die "unknown host '$addr'\n";
	$inet = sockaddr_in( $port, $inet);

	my $ret = send( $self-> {handle}, $data, $flags, $inet);
	unless ( defined $ret) {
		$self-> error( 'recv');
		return undef;
	}
	return $ret;
}

package IO::Events::Socket::UNIX;
use vars qw(@ISA);
@ISA=qw(IO::Events::Socket);

use Socket;
use Fcntl;

sub new
{
	my ( $self, %profile) = @_;

	$profile{handle} = IO::Handle-> new unless $profile{handle};
	die "Cannot create socket: $!" unless 
		socket( $profile{handle}, PF_UNIX, SOCK_STREAM, 0);

	unless ( $profile{nonblock}) {
		my $fl;
		$fl = fcntl( $profile{handle}, F_GETFL, 0);
		die "$!" unless defined $fl;
		fcntl( $profile{handle}, F_SETFL, $fl|O_NONBLOCK) or die "$!";
	}

	if ( defined $profile{connect}) {
		connect( $profile{handle}, pack_sockaddr_un($profile{connect})) or
			die "connect($profile{connect}) error: $!";
	} elsif ( exists $profile{listen}) {
		bind( $profile{handle}, pack_sockaddr_un($profile{listen})) or 
			die "Error in bind($profile{listen}):$!";
		listen( $profile{handle}, SOMAXCONN);
		$profile{read} = -1;
	}

	my $this = $self-> SUPER::new(%profile);
	$this-> SUPER::connect() if $profile{connect};
	return $this;
}

package IO::Events::Timer;
use Time::HiRes qw(time);

sub new
{
	my $class = shift;

	my $self = bless {
		timeout		=> 10000,
		repetitive	=> 0,
		active		=> 0,
		@_,
	}, $class;
	
	for ( qw(owner)) {
		die "No `$_' field" unless defined $self-> {$_};
	}

	push @{$self-> {owner}-> {timers}}, $self;

	$self-> start if $self-> {active};

	return $self;
}

sub DESTROY
{
	my $self = $_[0];
	return if $self-> {dead};
	@{$self-> {owner}-> {timers}} = grep { $_ != $self } @{$self-> {owner}-> {timers}};
}

sub start
{
	my $self = $_[0];
	$self-> {alert} = time + $self-> {timeout};
	$self-> {active} = 1;
}

sub stop { $_[0]-> {active} = 0 }

sub active 
{
	my ( $self, $active) = @_;
	return if $active == $self-> {active}; # to avoid restarts
	$active ? $self-> start : $self-> stop;
}

sub notify
{
	my $self = $_[0];

	if ( $self-> {repetitive}) {
		my $time = time;
		# eat up late events
		$self-> {alert} += $self-> {timeout} while $self-> {alert} < $time;
	} else {
		$self-> {active} = 0;
	}
	
	$self-> {event_flag} = 0;
	if ( defined $self->{on_tick}) {
		$self->{on_tick}->($self);
		return if $self->{event_flag};
	}
	$self-> on_tick() if $self-> can('on_tick');
}


1;

__DATA__