/usr/local/CPAN/Apache-Backend-POE/Apache/Backend/POE/Connection.pm
package Apache::Backend::POE::Connection;
use warnings;
use strict;
use Apache::Backend::POE::Message;
use IO::Socket::INET;
use POSIX qw(:errno_h);
use Carp qw(croak);
use bytes;
use Storable qw(nfreeze thaw);
sub new {
my $class = shift;
return bless({
@_, # can be a hash
}, $class);
}
sub connect {
my ($obj, $poe, $host, $port) = @_;
my $self = bless({%{$obj}},ref($obj));
$self->{parent} = $poe;
$self->{service_name} = $self->{alias} || 'backend-';
if ($host && $port) {
$self->{host} = $host;
$self->{port} = $port;
}
unless ($self->{host} && $self->{port}) {
croak "Must pass host and port to connect or initial new";
}
# connect
$self->{socket} = IO::Socket::INET->new(
PeerAddr => $self->{host},
PeerPort => $self->{port},
) or do {
#croak "Couldn't connect to $self->{host} : $self->{port} - $!";
print STDERR "$$ Apache::Backend::POE:Connection Couldn't connect to $self->{host} : $self->{port} - $!\n";
return undef;
};
binmode($self->{socket});
#$self->{socket}->autoflush(1); # default
$self->{socket}->blocking(0);
$self->{buffer} = "";
$self->{read_length} = undef;
# register
$self->msg_send($self->msg({
cmd => "register_service",
svc_name => $self->{service_name}.$$,
}));
#$self->msg_read(5);
return $self;
}
sub msg {
my $self = shift;
return Apache::Backend::POE::Message->new(@_);
}
sub ping {
my $self = shift;
my $start = time();
# my $no_pong = 1;
$self->msg_send($self->msg({
cmd => 'ping',
time => time(),
}));
my $prefix = "$$ Apache::Backend::POE:Connection ";
print STDERR "$prefix going to msg_read in ping()\n" if $Apache::Backend::POE::DEBUG > 1;
my $msg = $self->msg_read(10);
print STDERR "$prefix back from msg_read in ping()\n" if $Apache::Backend::POE::DEBUG > 1;
unless (ref($msg)) {
print STDERR "$prefix received a non reference\n" if $Apache::Backend::POE::DEBUG;
return -1;
}
if ($Apache::Backend::POE::DEBUG) {
# print STDERR "$prefix got a ".ref($msg)." package with no event()\n",return -99 unless ($msg->can('event'));
}
my $function = $msg->event();
# print STDERR "$prefix function: $function\n";
if ($Apache::Backend::POE::DEBUG) {
print STDERR "$prefix no function in message\n" unless defined $function;
}
return 1 if (defined $function && $function eq 'pong');
print STDERR "$prefix WRONG function received: $function\n" if $Apache::Backend::POE::DEBUG;
return 0;
}
sub disconnect {
my $self = shift;
close($self->{socket}) if ($self->{socket});
$self->{socket} = undef;
}
sub msg_read {
my $self = shift;
my $timeout = shift || undef;
# no timeout blocks indef!
# my $prefix = "$$ Apache::Backend::POE:Connection ";
my $st = time();
$st += $timeout if defined($timeout);
# print STDERR "$prefix going into while block in msg_read()\n" if $Apache::Backend::POE::DEBUG > 1;
while (1) {
if (defined $self->{read_length}) {
# print STDERR "$prefix looking for msg in buffer...\n" if $Apache::Backend::POE::DEBUG > 1;
if (length($self->{buffer}) >= $self->{read_length}) {
my $message = thaw(substr($self->{buffer}, 0, $self->{read_length}, ""));
$self->{read_length} = undef;
$message->{recv_time} = time();
return $message;
}
} elsif ($self->{buffer} =~ s/^(\d+)\0//) {
$self->{read_length} = $1;
# print STDERR "$prefix got read length: $1\n" if $Apache::Backend::POE::DEBUG > 1;
next;
}
#print STDERR "$prefix going to sysread\n" if $Apache::Backend::POE::DEBUG > 1;
my $rv = sysread($self->{socket}, $self->{buffer}, 4096, length($self->{buffer}));
if (!defined($rv) && $! == EAGAIN) {
# print STDERR "$prefix sysread was going to block\n" if $Apache::Backend::POE::DEBUG > 1;
# was going to block
#return if (defined($timeout) && $st > time());
}
# read $rv bytes from socket
# print STDERR "$prefix read $rv bytes\n" if $Apache::Backend::POE::DEBUG > 1 && defined $rv;
if (defined($timeout)){
return if time() > $st;
}
}
}
sub msg_send {
my $self = shift;
my $message = shift;
my $prefix = "$$ Apache::Backend::POE:Connection ";
print STDERR "$prefix socket is not connected\n" if $Apache::Backend::POE::DEBUG && !defined($self->{socket});
$message->{send_time} = time();
my $streamable = nfreeze($message);
$streamable = length($streamable).chr(0).$streamable;
my $len = length($streamable);
print STDERR "$prefix sending $len bytes\n" if $Apache::Backend::POE::DEBUG > 1;
while ($len > 0) {
if (my $w = syswrite($self->{socket},$streamable,4096)) {
$len -= $w;
print STDERR "$prefix sent $w bytes\n" if $Apache::Backend::POE::DEBUG > 1;
} else {
last;
}
}
print STDERR "$prefix done sending\n" if $Apache::Backend::POE::DEBUG > 1;
#print *{$self->{socket}} $streamable;
}
sub msg_oneshot {
my ($obj, $host, $port, $message) = @_;
my $self = $obj->connect(undef, $host, $port);
my $msg = $self->msg($message);
$self->msg_send($msg);
$self->disconnect();
}
1;