Event::IO::Record - buffered asynchronous I/O, timeouts


Event-IO documentation Contained in the Event-IO distribution.

Index


Code Index:

NAME

Top

Event::IO::Record - buffered asynchronous I/O, timeouts

METHODS

Top

new ( named parameters... )

init

If true (default), generate an init_event immediately (otherwise you must call init_event later).

timeout

Default timeout; see Timeout method.

irs, ors

Input/output record separators; default irs => "\r?[\0\n]", ors => "\n".

handle

Handle for connection, should be an IO::Socket object (::INET or ::UNIX).

timeout ( time )

Time is the time in seconds; 0 disables; undef reinitializes the current value. We generates a timeout_event when the timer expires.

init_event

Initialization event, called before anything else happens.

read_event

Data is available for reading. We buffer it up and emit lines to derived classes as line_events.

line_event ( line )

Override in derived class to process incoming data.

write( data )

Buffered write.

write_event

Write event - handle buffered writes.

timer ( enable flag )

Disable or restart inactivity timer.

timeout_event

Inactivity timeout event.

close

Remove event handlers, this will close the connection (as long as no other outstanding references exist).

closed

Return true iff socket is closed.

error( message )

Log error, subclasses may do more.

IRS( [ input record separator ] )

Get/set input record separator.

ORS( [ output record separator ] )

Get/set output record separator.

AUTHOR

Top

David B. Robins <dbrobins@davidrobins.net>


Event-IO documentation Contained in the Event-IO distribution.
package Event::IO::Record;

use strict;
our $VERSION = '0.01';

use Event;
use Fcntl;
use Errno qw(:POSIX);

use constant READ_SIZE          => 1024;  # bytes per read


sub new {
  my ($class,%param) = @_;
  my ($init,$timeout,$irs,$ors,$handle) =
   delete @param{qw[init timeout irs ors handle]};
  die 'unknown parameter(s): '.(join ', ',keys %param) if keys %param;

  # defaults
  $init      = 1 if not defined $init;
  $timeout ||= 0;
  $irs     ||= "\r?[\0\n]";
  $ors     ||= "\n";

  # create object
  my $self = bless { handle => $handle, in => '', out => '',
   timeout => $timeout, irs => $irs, ors => $ors }, ref $class || $class;
  $self->init_event() if $init;

  return $self
}


sub timeout {
  my ($self,$time) = @_;
  $time = $self->{timeout} unless defined $time;

  if($self->{timer}) {
    $self->{timer}->cancel();
    delete $self->{timer};
  }

  $self->{timeout} = $time;

  $self->{timer} =
   Event->timer(after => $time, cb => [$self,'timeout_event'])
   if $time and $self->{init};
}


sub init_event {
  my $self = shift;
  warn "@{[ref $self]} initialized twice!" if $self->{init}++;

  # set non-blocking
  if(my $flags = $self->{handle}->fcntl(F_GETFL,pack '') >= 0) {
    $self->{handle}->fcntl(F_SETFL,$flags | O_NONBLOCK);
  }

  # set up read/write event watchers and inactivity timeout
  $self->{read} =
   Event->io(fd => $self->{handle}, poll => 'r', cb => [$self,'read_event']);
  $self->{write} =
   Event->io(fd => $self->{handle}, poll => 'w', cb => [$self,'write_event'],
   repeat => 0, parked => 1);
  $self->timeout();
}


sub read_event {
  my $self = shift;
  $self->timer(0);

  # buffer up input until we can't read any more
  my ($data,$frag,$count) = ($self->{in},'',0);
  my $close;
  $self->{in} = '';

  do {{
    # undef means we have an error so log it and close
    unless(defined $self->{handle}->recv($frag,READ_SIZE)) {
      last if EAGAIN == $! or EWOULDBLOCK == $!;   # no data available
      next if EINTR == $!;                         # interrupted by signal

      # queue up the read error until we've processed what we've read
      warn "@{[ref $self]} socket read error: $!";
      $close = "read error: $!";
      last;
    }

    # assume if we got 0 bytes and no error that it's time to bail
    # if not, we get an infinite sequence of read_events....
    # don't bail until we've sent the lines that we have, however
    unless(length $frag) {
      $close = 'remote closed socket';
      last;
    }

    # otherwise append to the existing block and read until we run out of data
    $data .= $frag;
    $count .= length $frag;
  }} while length $frag == READ_SIZE;

  # send each line as an event
  my $irs = $self->{irs};
  while(length $data and $data =~ s/^(.*?)$irs//s) {
    $self->line_event($1);
    $irs = $self->{irs};  # refresh in case line_event changes it
  }
  $self->{in} = $data;

  $self->timer(1);

  # if the socket was closed, we can now send the close event
  $self->close($close) if $close;
}


sub line_event {
}


sub write {
  my ($self,$data) = @_;
  $self->{out} .= $data.$self->{ors};
  $self->write_event();
}


sub write_event {
  my $self = shift;
  my $data = $self->{out};

  # send as much as we can from the buffer
  while(length $data) {
    my $count = $self->{handle}->send($data);
    unless(defined $count) {
      if(EAGAIN == $! or EWOULDBLOCK == $!) {   # writing would block
        $self->{write}->start();
        last;
      }
      next if EINTR == $!;                      # interrupted by signal
      warn "@{[ref $self]} socket write error: $!";
      $self->{out} = $data;
      return $self->close('write error');
    }
    $data = substr($data,$count);
    $self->timer(1) if $count;  # reinitialize the inactivity timer
  }
  $self->{out} = $data;

  # send an event if we've written everything in the buffer
  $self->sent_event() if not length $data and $self->can('sent_event');
}


sub timer {
  my ($self,$enable) = @_;
  $enable ?  $self->{timer}->again() : $self->{timer}->stop()
   if $self->{timer};
}


sub timeout_event {
  my $self = shift;
  $self->error('closing inactive connection after '.
   "@{[$self->{timeout}]} s");
  $self->close('timed out');
}


sub close {
  my $self = shift;
  if($self->{read}) {
    for my $ev(qw[read write timer]) {
      (delete $self->{$ev})->cancel() if $self->{$ev};
    }
  }
  (delete $self->{handle})->close() if $self->{handle};  # close the socket
}


sub closed {
  my $self = shift;
  return not $self->{read}
}


sub error {
  my ($self,$err) = @_;
  warn "@{[ref $self]} error: $err";
}


sub IRS {
  my $self = shift;
  $self->{irs} = shift if @_;
  $self->{irs}
}


sub ORS {
  my $self = shift;
  $self->{ors} = shift if @_;
  $self->{ors}
}



1;