/usr/local/CPAN/Net-Gnutella/Net/Gnutella.pm


package Net::Gnutella;
use Net::Gnutella::Client;
use Net::Gnutella::Server;
use Net::Gnutella::Event;
use IO::Socket;
use IO::Select;
use Carp;
use strict;
use vars qw/@ISA @EXPORT $VERSION $AUTOLOAD/;

$VERSION = $VERSION = "0.1";

use constant GNUTELLA_CONNECT => 1;
use constant GNUTELLA_REQUEST => 2;

require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(GNUTELLA_CONNECT GNUTELLA_REQUEST);

# Use AUTOHANDLER to supply generic attribute methods
#
sub AUTOLOAD {
	my $self = shift;
	my $attr = $AUTOLOAD;
	$attr =~ s/.*:://;
	return unless $attr =~ /[^A-Z]/; # skip DESTROY and all-cap methods
	croak sprintf "invalid attribute method: %s->%s()", ref($self), $attr unless exists $self->{_attr}->{lc $attr};
	$self->{_attr}->{lc $attr} = shift if @_;
	return $self->{_attr}->{lc $attr};
}

sub add_handler {
	my ($self, $event, $coderef, $replace, @args) = @_;

	return $self->_add_handler($event, $coderef, $replace, $self->{_handler}, @args);
}

sub dequeue {
	my ($self, $qid) = @_;

	return delete $self->{_queue}->{$qid};
}

sub do_one_loop {
	my $self = shift;

	my $timeout = $self->timeout;
	my $time = time();

	foreach my $key ($self->queue) {
		my $event = $self->queue($key);

		if ($event->[0] <= $time) {
			$event->[1]->( @{$event}[2..$#{$event}] );

			$self->dequeue($key);
		} else {
			my $nexttimeout = $event->[0] - $time;

			$timeout = $nexttimeout if $nexttimeout < $timeout or not $timeout;
		}
	}

	my ($rr, $wr, $er) = IO::Select->select(@{$self}{'_read', '_write', '_error'}, $timeout);

	foreach my $sock (@$rr) {
		my $conn = $self->{_connhash}->{read}->{$sock} or next;

		$conn->[0]->($conn->[1] ? ($conn->[1], $sock) : $sock, @{$conn}[2..$#{$conn}]);
	}

	foreach my $sock (@$wr) {
		my $conn = $self->{_connhash}->{write}->{$sock} or next;

		$conn->[0]->($conn->[1] ? ($conn->[1], $sock) : $sock, @{$conn}[2..$#{$conn}]);
	}
}

# Cache the latest 500 PONG hosts (host:port combinations)
#
sub _host_cache {
	my $self = shift;

	if (@_) {
		my $time = time();
		my $count = 500;
		my $cache = $self->{_host_cache};
		my $new = {};
		my $i = 0;

		# Add the specified entries
		#
		foreach (@_) {
			$cache->{$_} = $time;
		}

		# Build a new list containing the most recent n elements
		#
		foreach (grep { $i++ < $count } sort { $cache->{$b} <=> $cache->{$a} } keys %{$cache}) {
			$new->{$_} = $cache->{$_};
		}

		$self->{_host_cache} = $new;
	}

	return keys %{ $self->{_host_cache} };
}

sub connections {
	my $self = shift;
	my @ret;

	foreach my $key (keys %{ $self->{_connhash}->{all} }) {
		my $conn = $self->{_connhash}->{all}->{$key};

		next unless ref $conn eq "Net::Gnutella::Connection";
		next unless $conn->connected;

		push @ret, $conn;
	}

	return @ret;
}

sub new {
	my $class = shift;
	my %args = @_;

	my $self = {
		_connhash => {
			read  => {},
			write => {},
			all   => {},
		},
		_read  => new IO::Select,
		_write => new IO::Select,
		_attr  => {
			timeout => 10,
			debug   => 0,
			id      => [ map { rand(65535**2) } 0..4 ],
		},
		_handler => {},
		_host_cache => {},
		_msgid_source => {},
		_qid => 'a',
		_queue => {},
	};

	bless $self, $class;

	foreach my $key (keys %args) {
		my $lkey = lc $key;

		$self->$lkey($args{$key});
	}

	return $self;
}

sub new_client {
	my $self = shift;
	my $conn = Net::Gnutella::Client->new($self, @_);

	return if $conn->error;
	return $conn;
}

sub new_server {
	my $self = shift;
	my $conn = Net::Gnutella::Server->new($self, @_);

	return if $conn->error;
	return $conn;
}

sub queue {
	my $self = shift;

	if (@_) {
		return $self->{_queue}->{$_[0]};
	} else {
		return keys %{ $self->{_queue} };
	}
}

sub schedule {
	my ($self, $when, $coderef, @args) = @_;

	unless ($when =~ /^\d+[dhmst]$/i) {
		croak "First argument must be a number";
	}

	unless (defined $coderef && ref $coderef eq 'CODE') {
		croak "Second argument must be a coderef!";
	}

	my $time = time();

	$when *= 24*60*60 if $when =~ s/d$//i;
	$when *= 60*60    if $when =~ s/h$//i;
	$when *= 60       if $when =~ s/m$//i;
	                     $when =~ s/s$//i;

	if ($when =~ s/t$//i) {
		$time = $when;
	} else {
		$time += $when;
	}

	$self->{_qid} = 'a' if $self->{_qid} eq 'zzzzzzzz';

	my $id = $self->{_qid}++;
	$self->{_queue}->{$id} = [ $time, $coderef, @args ];
	return $id;
}

# Returns the connection a msgid originated from if it
# has been seen previously.
#
sub _msgid_source {
	my ($self, $msgid, $conn) = @_;

	unless ($msgid && ref($msgid) eq 'ARRAY') {
		carp "Invalid message ID: $msgid";
	}

	if ($conn) {
		my $i = 0;
		my $count = 5000;
		my $source = $self->{_msgid_source};

		$source->{join(":", @$msgid)} = [ $conn, time() ];

		foreach (grep { $i++ > $count } sort { $source->{$b}->[1] <=> $source->{$a}->[1] } keys %{$source}) {
			delete $source->{$_};
		}
	}

	return unless $self->{_msgid_source}->{join(":", @$msgid)};
	return $self->{_msgid_source}->{join(":", @$msgid)}->[0];
}

sub start {
	my $self = shift;

	$self->do_one_loop while 1;
}

sub _add_fh {
	my ($self, $fh, $coderef, $flags, $obj, @args) = @_;

	unless (ref $coderef eq "CODE") {
		croak "Second argument to ->_add_fh not a coderef";
	}

	$flags ||= 'r';

	if ($flags =~ /r/i) {
		$self->{_read}->add($fh);
		$self->{_connhash}->{read}->{$fh} = [ $coderef, $obj, @args ];
	}

	if ($flags =~ /w/i) {
		$self->{_write}->add($fh);
		$self->{_connhash}->{write}->{$fh} = [ $coderef, $obj, @args ];
	}

	$self->{_connhash}->{all}->{$fh} = $obj;
}

sub _add_handler {
	my ($self, $event, $coderef, $replace, $hashref, @args) = @_;

	unless (ref $coderef eq "CODE") {
		croak "Second argument to ->_add_handler not a coderef";
	}

	my %define = ( replace=>0, before=>1, after=>2 );

	if (not defined $replace) {
		$replace = 2;
	} elsif ($replace =~ /^\D/) {
		$replace = $define{lc $replace} || 2;
	}

	foreach my $ev (ref $event eq "ARRAY" ? @{$event} : $event) {
		if ($ev =~ /^\d/) {
			$ev = Net::Gnutella::Event->trans($ev);

			unless ($ev) {
				carp "Unknown event type in ->add_handler";
				return;
			}
		}

		$hashref->{lc $ev} = [ $coderef, $replace, @args ];
	}
}

sub _handler {
	my ($self, $event) = @_;
	my $handler;

	unless ($event) {
		confess "I messed up";
	}

	my $type = $event->type;
	my $conn = $event->from;
	my $default = $conn->can('_default') if $conn;

	if ($conn && exists $conn->{_handler}->{$type}) {
		printf STDERR " - Connection wide handler exists\n" if $self->debug >= 2;
		$handler = $conn->{_handler}->{$type};
	} elsif (exists $self->{_handler}->{$type}) {
		printf STDERR " - Global handler exists\n" if $self->debug >= 2;
		$handler = $self->{_handler}->{$type};
	} elsif ($default) {
		printf STDERR " - Calling default handler on connection\n" if $self->debug >= 2;
		return $conn->_default($event);
	} else {
		printf STDERR " - Calling default global handler\n" if $self->debug >= 2;
		return $self->_default($event);
	}

	my ($coderef, $replace, @args) = @$handler;

	if ($replace == 0) {      # REPLACE
		$coderef->($conn, $event, @args);
	} elsif ($replace == 1) { # BEFORE
		$coderef->($conn, $event, @args) or return;

		if ($default) {
			$conn->_default($event, @args);
		} else {
			$self->_default($event, @args);
		}
	} elsif ($replace == 2) { # AFTER
		if ($default) {
			$conn->_default($event, @args) or return;
		} else {
			$self->_default($event, @args) or return;
		}

		$coderef->($conn, $event, @args);
	}
}

sub _remove_fh {
	my ($self, $fh, $flags) = @_;

	$flags ||= 'r';

	if ($flags =~ /r/i) {
		$self->{_read}->remove($fh);
		delete $self->{_connhash}->{read}->{$fh};
	}

	if ($flags =~ /w/i) {
		$self->{_write}->remove($fh);
		delete $self->{_connhash}->{write}->{$fh};
	}
}

1;