/usr/local/CPAN/P2P-pDonkey/ServBase.pm
#
# Copyright (c) 2003-2004 Alexey klimkin <klimkin at cpan.org>.
# All rights reserved.
# This program is free software; you can redistribute it and/or
# modify it under the same terms as Perl itself.
#
# looking forward Net::Server...
package ServBase;
use strict;
use vars qw($VERSION);
use Data::Hexdumper;
use POSIX;
use IO::Select;
use IO::Socket;
use P2P::pDonkey::Packet ':all';
use P2P::pDonkey::Util ':all';
use constant CS_CONNECTING => 1;
use constant CS_ACTIVE => 2;
use constant CS_CLOSED => 3;
use constant CS_PROXY => 4;
sub new {
my $class = shift;
my %opt = @_;
my %connections;
tie %connections, "Tie::RefHash";
# my $selRead = new IO::Select;
# my $selWrite = new IO::Select;
$opt{SelRead} = new IO::Select;
$opt{SelWrite} = new IO::Select;
$opt{CONN} = \%connections;
$opt{Log} || ($opt{Log} = \&Log);
my $self = \%opt;
bless($self, $class);
return $self;
}
sub connections {
my $self = shift;
return $self->{CONN};
}
sub stop {
my $self = shift;
$self->{STOP} = 1;
}
sub watch {
my $self = shift;
my ($sock) = @_;
$self->{SelRead}->add($sock);
}
sub unwatch {
my $self = shift;
my ($sock) = @_;
$self->{SelRead}->remove($sock);
}
sub ProcessPacket {
my $self = shift;
my ($conn) = @_;
my ($data, $up, $pp);
$data = \$conn->{RBuffer};
my $pt = unpack('C', $$data);
my $pname = PacketTagName($pt);
$pname = "Unknown" if !$pname;
$self->{Log}->($conn, sprintf("-> %s(%x) [%d]\n", $pname, $pt, $conn->{PLength} + SZ_TCP_HEADER));
print hexdump(data => $conn->{Header} . $$data) if $self->{Dump};
my @d;
my $off = 0;
$pp = $self->{ProcTable}->[$pt];
if (!($pp && (@d = unpackBody(\$pt, $$data, $off)))) {
if ($pp) {
$self->{Log}->($conn, "\tdropped: incorrect packet format\n");
} else {
$self->{Log}->($conn, "\tdropped: no processing function\n");
}
return;
}
if ($off != length($$data)) {
$self->{Log}->($conn, ": there are left ", length($$data)-$off,
" unpacked bytes in packet\n");
}
&$pp($conn, @d);
}
sub AddSocket {
my $self = shift;
my ($sock, $addr, $port) = @_;
my %rec = (
Socket => $sock,
IP => addr2ip($addr), Addr => $addr, Port => $port,
# read buffer
RBuffer => '', RLength => 0,
PLength => undef, Header => '', Protocol => undef,
# write buffer
WBuffer => '', WLength => 0
);
$self->{CONN}->{$sock} = \%rec;
return \%rec;
}
# incoming connection, we will wait for hello
sub Connected {
my $self = shift;
my ($sock) = @_;
my ($other_end, $port, $iaddr, $addr);
$other_end = getpeername($sock)
|| warn "Couldn't identify other end: $!\n" && return;
($port, $addr) = unpack_sockaddr_in($other_end);
$addr = inet_ntoa($addr);
my $conn = $self->AddSocket($sock, $addr, $port);
$self->{SelRead}->add($sock);
$conn->{State} = CS_ACTIVE;
$self->{Log}->($conn, "=> CONNECTED client at $self->{LocalPort}\n");
$self->{OnClientConnect} && $self->{OnClientConnect}->($conn);
return $conn;
}
# outgoing connection, should send hello
sub Connect {
my $self = shift;
my ($addr, $port) = @_;
my ($sock, $err, $state);
$self->{Log}->(undef, "connecting to $addr:$port\n");
if ($self->{ProxyAddr}) {
$sock = new IO::Socket::INET(PeerAddr => $self->{ProxyAddr},
PeerPort => $self->{ProxyPort},
Proto => 'tcp',
Blocking => 0);
if (!$sock) {
warn "Failed connect to proxy!\n";
return;
}
$state = CS_PROXY;
} else {
$sock = new IO::Socket::INET(PeerAddr => $addr,
PeerPort => $port,
Proto => 'tcp',
Blocking => 0)
|| return;
$state = CS_CONNECTING;
}
my $conn = $self->AddSocket($sock, $addr, $port);
$self->{SelRead}->add($sock);
$self->{SelWrite}->add($sock);
$conn->{State} = $state;
return $conn;
}
sub Disconnect {
my $self = shift;
my ($sock) = @_;
$self->{SelRead}->remove($sock);
$self->{SelWrite}->remove($sock);
$sock->shutdown(2);
my $conn = $self->{CONN}->{$sock};
$self->{Log}->($conn, "== DISCONNECTED\n");
$conn->{State} = CS_CLOSED;
delete $self->{CONN}->{$sock};
$self->{OnDisconnect} && $self->{OnDisconnect}->($conn);
}
sub Queue {
my $self = shift;
my ($conn, $pt) = (shift, shift);
my ($body, $data, $dlen);
$body = packBody($pt, @_);
$data = packTCPHeader($dlen = length $body) . $body;
$dlen += SZ_TCP_HEADER;
my $class;
if ($conn && ($conn eq 'Client' || $conn eq 'Server' || $conn eq 'Admin')) {
$class = $conn;
$conn = undef;
}
my @whom = $conn ? ($conn) : values %{$self->{CONN}};
my $is_dest = 0;
foreach $conn (@whom) {
next if $conn->{Socket}->sockopt(SOL_SOCKET, SO_ERROR);
next if $class && !$conn->{$class};
$conn->{WBuffer} .= $data;
$conn->{WLength} += $dlen;
$self->{SelWrite}->add($conn->{Socket});
my $pname = PacketTagName($pt) || "Unknown";
$self->{Log}->($conn, sprintf("<- %s(%x) [%d]\n", $pname, $pt, $dlen));
$is_dest = 1;
}
print hexdump(data => $data) if $is_dest && $self->{Dump};
}
sub MainLoop {
my $self = shift;
my $selRead = $self->{SelRead};
my $selWrite = $self->{SelWrite};
my ($server, $admin);
if ($self->{LocalPort}) {
$server = new IO::Socket::INET(LocalPort => $self->{LocalPort},
Listen => $self->{MaxClients} || 5,
Reuse => 1,
Blocking => 0)
or return;
$selRead->add($server);
}
if ($self->{AdminPort}) {
$admin = new IO::Socket::INET(LocalPort => $self->{AdminPort},
Listen => 1,
Reuse => 1,
Blocking => 0)
or return;
$selRead->add($admin);
}
$self->{Log}->(undef, "Ready\n");
my ($rready, $wready, $h, $conn, $err);
my ($data, $dlen, $plen, $len);
while (!$self->{STOP}) {
# print "Select\n";
($rready, $wready) = IO::Select->select($selRead, $selWrite, undef);
foreach $h (@$wready) {
# print "Write\n";
$self->{CanWriteHook} && $self->{CanWriteHook}->($h) && next;
$conn = $self->{CONN}->{$h};
next if $conn->{State} == CS_CLOSED;
next if !$conn;
$err = $h->sockopt(SOL_SOCKET, SO_ERROR);
if ($err) {
$self->Disconnect($h) unless $err == EINPROGRESS;
next;
}
if ($conn->{State} == CS_PROXY) {
my $proxy_req = "CONNECT $conn->{Addr}:$conn->{Port} HTTP/1.1\n"
. "Pragma: no-cache\n"
. "Cache-Control: no-cache\n"
. "Connection: Keep-Alive\n"
. "Proxy-Connection: Keep-Alive\n"
. "User-Agent: Mozilla/4.0 (compatible; MSIE 5.01; Windows NT; Hotbar 2.0)\n"
. "\n";
$len = syswrite($h, $proxy_req, length $proxy_req);
if (!$len || $len != length $proxy_req) {
$self->{Log}->($conn, "proxy traversal failed\n");
$self->Disconnect($h);
next;
}
$selWrite->remove($h);
}
if ($conn->{State} == CS_CONNECTING) {
$conn->{State} = CS_ACTIVE;
$self->{Log}->($conn, "<= CONNECTED\n");
$self->{OnConnect} && $self->{OnConnect}->($conn);
next;
}
($data, $dlen) = (\$conn->{WBuffer}, \$conn->{WLength});
if ($$dlen) {
$len = syswrite($h, $$data, $$dlen);
if (!$len) {
$self->Disconnect($h);
next;
}
if ($len > 0) {
$$data = unpack("x$len a*", $$data);
$$dlen -= $len;
}
}
$$dlen || $selWrite->remove($h);
}
foreach $h (@$rready) {
# print "Read\n";
if ($server && $h == $server) {
$h = $server->accept();
$conn = $self->Connected($h) or next;
$conn->{Client} = 1;
}
if ($admin && $h == $admin) {
$h = $admin->accept();
$conn = $self->Connected($h) or next;
$conn->{Admin} = 1;
}
$self->{CanReadHook} && $self->{CanReadHook}->($h) && next;
$conn = $self->{CONN}->{$h};
next if $conn->{State} == CS_CLOSED;
next if !$conn;
$err = $h->sockopt(SOL_SOCKET, SO_ERROR);
if ($err) {
$self->Disconnect($h) unless $err == EINPROGRESS;
next;
}
if ($conn->{State} == CS_PROXY) {
my $proxy_ans;
$len = sysread($h, $proxy_ans, 1024);
# print "$proxy_ans";
if (!$len) {
$self->{Log}->($conn, "proxy traversal failed (can't read answer)\n");
$self->Disconnect($h);
next;
}
if ($proxy_ans =~ /HTTP\/\S+ (\d+)/) {
if (int($1 / 100) != 2) {
$self->{Log}->($conn, "proxy traversal failed (code $1)\n");
$self->Disconnect($h);
next;
}
$conn->{State} = CS_CONNECTING;
} else {
$self->{Log}->($conn, "proxy traversal failed (can't parse answer)\n");
$self->Disconnect($h);
next;
}
}
if ($conn->{State} == CS_CONNECTING) {
$conn->{State} = CS_ACTIVE;
$self->{Log}->($conn, "<= CONNECTED\n");
$self->{OnConnect} && $self->{OnConnect}->($conn);
next;
}
($data, $dlen, $plen) = (\$conn->{RBuffer}, \$conn->{RLength}, \$conn->{PLength});
if (!$$plen) {
# try to read header
$len = sysread($h, $$data, SZ_TCP_HEADER-$$dlen, $$dlen);
if (!$len) {
$self->Disconnect($h);
next;
}
$$dlen += $len;
if ($$dlen == SZ_TCP_HEADER)
{
my ($prot, $npl) = unpack('CL', $$data);
if ($prot == PT_HEADER)
{
$conn->{Header} = $$data;
$conn->{Protocol} = $prot;
$$plen = $npl;
}
($$data, $$dlen) = ('', 0);
}
}
if ($$plen) {
# try to read packet
$len = sysread($h, $$data, $$plen-$$dlen, $$dlen);
if (!$len) {
$self->Disconnect($h);
next;
}
$$dlen += $len;
if ($$dlen == $$plen) {
$self->ProcessPacket($conn);
($$data, $$dlen, $$plen) = ('', 0, undef);
}
}
}
}
}
sub Log {
my $conn = shift;
# print strftime "%b %e %H:%M:%S ", gmtime;
print "$conn->{Addr}:$conn->{Port} " if $conn;
print @_;
}
1;