| Event-IO documentation | Contained in the Event-IO distribution. |
Event::IO::Record - buffered asynchronous I/O, timeouts
If true (default), generate an init_event immediately (otherwise you must call init_event later).
Default timeout; see Timeout method.
Input/output record separators; default irs => "\r?[\0\n]", ors => "\n".
Handle for connection, should be an IO::Socket object (::INET or ::UNIX).
Time is the time in seconds; 0 disables; undef reinitializes the current value. We generates a timeout_event when the timer expires.
Initialization event, called before anything else happens.
Data is available for reading. We buffer it up and emit lines to derived
classes as line_events.
Override in derived class to process incoming data.
Buffered write.
Write event - handle buffered writes.
Disable or restart inactivity timer.
Inactivity timeout event.
Remove event handlers, this will close the connection (as long as no other outstanding references exist).
Return true iff socket is closed.
Log error, subclasses may do more.
Get/set input record separator.
Get/set output record separator.
David B. Robins <dbrobins@davidrobins.net>
| Event-IO documentation | Contained in the Event-IO distribution. |
package Event::IO::Record; use strict; our $VERSION = '0.01'; use Event; use Fcntl; use Errno qw(:POSIX); use constant READ_SIZE => 1024; # bytes per read
sub new { my ($class,%param) = @_; my ($init,$timeout,$irs,$ors,$handle) = delete @param{qw[init timeout irs ors handle]}; die 'unknown parameter(s): '.(join ', ',keys %param) if keys %param; # defaults $init = 1 if not defined $init; $timeout ||= 0; $irs ||= "\r?[\0\n]"; $ors ||= "\n"; # create object my $self = bless { handle => $handle, in => '', out => '', timeout => $timeout, irs => $irs, ors => $ors }, ref $class || $class; $self->init_event() if $init; return $self }
sub timeout { my ($self,$time) = @_; $time = $self->{timeout} unless defined $time; if($self->{timer}) { $self->{timer}->cancel(); delete $self->{timer}; } $self->{timeout} = $time; $self->{timer} = Event->timer(after => $time, cb => [$self,'timeout_event']) if $time and $self->{init}; }
sub init_event { my $self = shift; warn "@{[ref $self]} initialized twice!" if $self->{init}++; # set non-blocking if(my $flags = $self->{handle}->fcntl(F_GETFL,pack '') >= 0) { $self->{handle}->fcntl(F_SETFL,$flags | O_NONBLOCK); } # set up read/write event watchers and inactivity timeout $self->{read} = Event->io(fd => $self->{handle}, poll => 'r', cb => [$self,'read_event']); $self->{write} = Event->io(fd => $self->{handle}, poll => 'w', cb => [$self,'write_event'], repeat => 0, parked => 1); $self->timeout(); }
sub read_event { my $self = shift; $self->timer(0); # buffer up input until we can't read any more my ($data,$frag,$count) = ($self->{in},'',0); my $close; $self->{in} = ''; do {{ # undef means we have an error so log it and close unless(defined $self->{handle}->recv($frag,READ_SIZE)) { last if EAGAIN == $! or EWOULDBLOCK == $!; # no data available next if EINTR == $!; # interrupted by signal # queue up the read error until we've processed what we've read warn "@{[ref $self]} socket read error: $!"; $close = "read error: $!"; last; } # assume if we got 0 bytes and no error that it's time to bail # if not, we get an infinite sequence of read_events.... # don't bail until we've sent the lines that we have, however unless(length $frag) { $close = 'remote closed socket'; last; } # otherwise append to the existing block and read until we run out of data $data .= $frag; $count .= length $frag; }} while length $frag == READ_SIZE; # send each line as an event my $irs = $self->{irs}; while(length $data and $data =~ s/^(.*?)$irs//s) { $self->line_event($1); $irs = $self->{irs}; # refresh in case line_event changes it } $self->{in} = $data; $self->timer(1); # if the socket was closed, we can now send the close event $self->close($close) if $close; }
sub line_event { }
sub write { my ($self,$data) = @_; $self->{out} .= $data.$self->{ors}; $self->write_event(); }
sub write_event { my $self = shift; my $data = $self->{out}; # send as much as we can from the buffer while(length $data) { my $count = $self->{handle}->send($data); unless(defined $count) { if(EAGAIN == $! or EWOULDBLOCK == $!) { # writing would block $self->{write}->start(); last; } next if EINTR == $!; # interrupted by signal warn "@{[ref $self]} socket write error: $!"; $self->{out} = $data; return $self->close('write error'); } $data = substr($data,$count); $self->timer(1) if $count; # reinitialize the inactivity timer } $self->{out} = $data; # send an event if we've written everything in the buffer $self->sent_event() if not length $data and $self->can('sent_event'); }
sub timer { my ($self,$enable) = @_; $enable ? $self->{timer}->again() : $self->{timer}->stop() if $self->{timer}; }
sub timeout_event { my $self = shift; $self->error('closing inactive connection after '. "@{[$self->{timeout}]} s"); $self->close('timed out'); }
sub close { my $self = shift; if($self->{read}) { for my $ev(qw[read write timer]) { (delete $self->{$ev})->cancel() if $self->{$ev}; } } (delete $self->{handle})->close() if $self->{handle}; # close the socket }
sub closed { my $self = shift; return not $self->{read} }
sub error { my ($self,$err) = @_; warn "@{[ref $self]} error: $err"; }
sub IRS { my $self = shift; $self->{irs} = shift if @_; $self->{irs} }
sub ORS { my $self = shift; $self->{ors} = shift if @_; $self->{ors} }
1;