/usr/local/CPAN/Net-Gnutella/Net/Gnutella.pm
package Net::Gnutella;
use Net::Gnutella::Client;
use Net::Gnutella::Server;
use Net::Gnutella::Event;
use IO::Socket;
use IO::Select;
use Carp;
use strict;
use vars qw/@ISA @EXPORT $VERSION $AUTOLOAD/;
$VERSION = $VERSION = "0.1";
use constant GNUTELLA_CONNECT => 1;
use constant GNUTELLA_REQUEST => 2;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(GNUTELLA_CONNECT GNUTELLA_REQUEST);
# Use AUTOHANDLER to supply generic attribute methods
#
sub AUTOLOAD {
my $self = shift;
my $attr = $AUTOLOAD;
$attr =~ s/.*:://;
return unless $attr =~ /[^A-Z]/; # skip DESTROY and all-cap methods
croak sprintf "invalid attribute method: %s->%s()", ref($self), $attr unless exists $self->{_attr}->{lc $attr};
$self->{_attr}->{lc $attr} = shift if @_;
return $self->{_attr}->{lc $attr};
}
sub add_handler {
my ($self, $event, $coderef, $replace, @args) = @_;
return $self->_add_handler($event, $coderef, $replace, $self->{_handler}, @args);
}
sub dequeue {
my ($self, $qid) = @_;
return delete $self->{_queue}->{$qid};
}
sub do_one_loop {
my $self = shift;
my $timeout = $self->timeout;
my $time = time();
foreach my $key ($self->queue) {
my $event = $self->queue($key);
if ($event->[0] <= $time) {
$event->[1]->( @{$event}[2..$#{$event}] );
$self->dequeue($key);
} else {
my $nexttimeout = $event->[0] - $time;
$timeout = $nexttimeout if $nexttimeout < $timeout or not $timeout;
}
}
my ($rr, $wr, $er) = IO::Select->select(@{$self}{'_read', '_write', '_error'}, $timeout);
foreach my $sock (@$rr) {
my $conn = $self->{_connhash}->{read}->{$sock} or next;
$conn->[0]->($conn->[1] ? ($conn->[1], $sock) : $sock, @{$conn}[2..$#{$conn}]);
}
foreach my $sock (@$wr) {
my $conn = $self->{_connhash}->{write}->{$sock} or next;
$conn->[0]->($conn->[1] ? ($conn->[1], $sock) : $sock, @{$conn}[2..$#{$conn}]);
}
}
# Cache the latest 500 PONG hosts (host:port combinations)
#
sub _host_cache {
my $self = shift;
if (@_) {
my $time = time();
my $count = 500;
my $cache = $self->{_host_cache};
my $new = {};
my $i = 0;
# Add the specified entries
#
foreach (@_) {
$cache->{$_} = $time;
}
# Build a new list containing the most recent n elements
#
foreach (grep { $i++ < $count } sort { $cache->{$b} <=> $cache->{$a} } keys %{$cache}) {
$new->{$_} = $cache->{$_};
}
$self->{_host_cache} = $new;
}
return keys %{ $self->{_host_cache} };
}
sub connections {
my $self = shift;
my @ret;
foreach my $key (keys %{ $self->{_connhash}->{all} }) {
my $conn = $self->{_connhash}->{all}->{$key};
next unless ref $conn eq "Net::Gnutella::Connection";
next unless $conn->connected;
push @ret, $conn;
}
return @ret;
}
sub new {
my $class = shift;
my %args = @_;
my $self = {
_connhash => {
read => {},
write => {},
all => {},
},
_read => new IO::Select,
_write => new IO::Select,
_attr => {
timeout => 10,
debug => 0,
id => [ map { rand(65535**2) } 0..4 ],
},
_handler => {},
_host_cache => {},
_msgid_source => {},
_qid => 'a',
_queue => {},
};
bless $self, $class;
foreach my $key (keys %args) {
my $lkey = lc $key;
$self->$lkey($args{$key});
}
return $self;
}
sub new_client {
my $self = shift;
my $conn = Net::Gnutella::Client->new($self, @_);
return if $conn->error;
return $conn;
}
sub new_server {
my $self = shift;
my $conn = Net::Gnutella::Server->new($self, @_);
return if $conn->error;
return $conn;
}
sub queue {
my $self = shift;
if (@_) {
return $self->{_queue}->{$_[0]};
} else {
return keys %{ $self->{_queue} };
}
}
sub schedule {
my ($self, $when, $coderef, @args) = @_;
unless ($when =~ /^\d+[dhmst]$/i) {
croak "First argument must be a number";
}
unless (defined $coderef && ref $coderef eq 'CODE') {
croak "Second argument must be a coderef!";
}
my $time = time();
$when *= 24*60*60 if $when =~ s/d$//i;
$when *= 60*60 if $when =~ s/h$//i;
$when *= 60 if $when =~ s/m$//i;
$when =~ s/s$//i;
if ($when =~ s/t$//i) {
$time = $when;
} else {
$time += $when;
}
$self->{_qid} = 'a' if $self->{_qid} eq 'zzzzzzzz';
my $id = $self->{_qid}++;
$self->{_queue}->{$id} = [ $time, $coderef, @args ];
return $id;
}
# Returns the connection a msgid originated from if it
# has been seen previously.
#
sub _msgid_source {
my ($self, $msgid, $conn) = @_;
unless ($msgid && ref($msgid) eq 'ARRAY') {
carp "Invalid message ID: $msgid";
}
if ($conn) {
my $i = 0;
my $count = 5000;
my $source = $self->{_msgid_source};
$source->{join(":", @$msgid)} = [ $conn, time() ];
foreach (grep { $i++ > $count } sort { $source->{$b}->[1] <=> $source->{$a}->[1] } keys %{$source}) {
delete $source->{$_};
}
}
return unless $self->{_msgid_source}->{join(":", @$msgid)};
return $self->{_msgid_source}->{join(":", @$msgid)}->[0];
}
sub start {
my $self = shift;
$self->do_one_loop while 1;
}
sub _add_fh {
my ($self, $fh, $coderef, $flags, $obj, @args) = @_;
unless (ref $coderef eq "CODE") {
croak "Second argument to ->_add_fh not a coderef";
}
$flags ||= 'r';
if ($flags =~ /r/i) {
$self->{_read}->add($fh);
$self->{_connhash}->{read}->{$fh} = [ $coderef, $obj, @args ];
}
if ($flags =~ /w/i) {
$self->{_write}->add($fh);
$self->{_connhash}->{write}->{$fh} = [ $coderef, $obj, @args ];
}
$self->{_connhash}->{all}->{$fh} = $obj;
}
sub _add_handler {
my ($self, $event, $coderef, $replace, $hashref, @args) = @_;
unless (ref $coderef eq "CODE") {
croak "Second argument to ->_add_handler not a coderef";
}
my %define = ( replace=>0, before=>1, after=>2 );
if (not defined $replace) {
$replace = 2;
} elsif ($replace =~ /^\D/) {
$replace = $define{lc $replace} || 2;
}
foreach my $ev (ref $event eq "ARRAY" ? @{$event} : $event) {
if ($ev =~ /^\d/) {
$ev = Net::Gnutella::Event->trans($ev);
unless ($ev) {
carp "Unknown event type in ->add_handler";
return;
}
}
$hashref->{lc $ev} = [ $coderef, $replace, @args ];
}
}
sub _handler {
my ($self, $event) = @_;
my $handler;
unless ($event) {
confess "I messed up";
}
my $type = $event->type;
my $conn = $event->from;
my $default = $conn->can('_default') if $conn;
if ($conn && exists $conn->{_handler}->{$type}) {
printf STDERR " - Connection wide handler exists\n" if $self->debug >= 2;
$handler = $conn->{_handler}->{$type};
} elsif (exists $self->{_handler}->{$type}) {
printf STDERR " - Global handler exists\n" if $self->debug >= 2;
$handler = $self->{_handler}->{$type};
} elsif ($default) {
printf STDERR " - Calling default handler on connection\n" if $self->debug >= 2;
return $conn->_default($event);
} else {
printf STDERR " - Calling default global handler\n" if $self->debug >= 2;
return $self->_default($event);
}
my ($coderef, $replace, @args) = @$handler;
if ($replace == 0) { # REPLACE
$coderef->($conn, $event, @args);
} elsif ($replace == 1) { # BEFORE
$coderef->($conn, $event, @args) or return;
if ($default) {
$conn->_default($event, @args);
} else {
$self->_default($event, @args);
}
} elsif ($replace == 2) { # AFTER
if ($default) {
$conn->_default($event, @args) or return;
} else {
$self->_default($event, @args) or return;
}
$coderef->($conn, $event, @args);
}
}
sub _remove_fh {
my ($self, $fh, $flags) = @_;
$flags ||= 'r';
if ($flags =~ /r/i) {
$self->{_read}->remove($fh);
delete $self->{_connhash}->{read}->{$fh};
}
if ($flags =~ /w/i) {
$self->{_write}->remove($fh);
delete $self->{_connhash}->{write}->{$fh};
}
}
1;