/usr/local/CPAN/Net-PSYC/Net/PSYC/Datagram.pm


package Net::PSYC::Datagram;

our $VERSION = '0.5';

use strict;
use IO::Socket::INET;

import Net::PSYC qw( watch add W sendmsg same_host send_mmp parse_uniform BLOCKING makeMSG make_psyc parse_psyc parse_mmp PSYC_PORT PSYCS_PORT register_host register_route make_mmp UNL);

sub TRUST {
    return 1;
}

sub new {
    my $class = shift;

    my $addr = shift || undef;		# NOT 127.1
    my $port = int(shift||0) || undef;	# also, NOT 4404

    my %a = (LocalPort => $port, Proto => 'udp');
    $a{LocalAddr} = $addr if $addr;
    my $socket = IO::Socket::INET->new(%a)
	or return $!;
    my $self = {
	'SOCKET' => $socket,
	'IP' => $socket->sockhost,
	'PORT' => $port || $socket->sockport,
	'TYPE' => 'd',
	'I_BUFFER' => '',
	'O_BUFFER' => [],
	'O_COUNT'  => 0,
	'LF' => '',
    };
    W1('UDP bind to %s:%s successful', $self->{'IP'}, $self->{'PORT'});
    bless $self, $class; 

    watch($self) unless (BLOCKING() & 2);
    add($self->{'SOCKET'}, 'w', sub {$self->write()}, 0) 
	unless (BLOCKING() & 1);
    
    return $self;
}

#   send ( target, mc, data, vars ) 
sub send {
    my $self = shift;
    my ( $target, $data, $vars ) = @_;
    W2('send(%s, %s, %s)', $target, $data, $vars);
    
    push(@{$self->{'O_BUFFER'}}, [ [$vars, $data, $target, 0 ] ]);

    if (BLOCKING() || $Net::PSYC::ANACHRONISM) { # send the packet instantly
        return !$self->write(); 
    } else {
        Net::PSYC::Event::revoke($self->{'SOCKET'});
    }
    return 0;
}

sub write () {
    my $self = shift;

    return 1 if (!${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]);
    
    # get a packet from the buffer
    my $packet = shift(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]});
    my $target = $packet->[2];
    my ($user, $host, $port, $type, $object) = parse_uniform($target);
    
    $port ||= PSYC_PORT();
    
    $packet->[0]->{'_target'} ||= $target;

# funny, but not what we want.. returns 0.0.0.0 for INADDR_ANY and even
# when the ip is useful, the port may not - the other side should better
# use its own peer info. or the perl app provides _source.
#
#   $vars->{'_source'} |= "psyc://$self->{'IP'}:$self->{'PORT'}/";

    my $m = ".\n"; # empty packet!
    $m .= make_mmp($packet->[0], $packet->[1]);
    
    unless ($host) {
	W0('This target (%s) needs a host. Dropping message.', $target);
	return 1;
    }

    my $taddr = gethostbyname($host); # hm.. strange thing!
    my $tin = sockaddr_in($port, $taddr);
    
    if (!defined($self->{'SOCKET'}->send($m, 0, $tin))) {
	if (++$packet->[3] >= 3) {
	    W0('Delivery of a udp packet to %s failed for the third time. Dropping message.', $target);
	    return 1;
	}
        unshift(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]}, $packet);
        return 1;
    }
    W1('UDP[%s:%s] <= %s', $host, $port, 
	$packet->[0]->{'_source'} || UNL());
    if (!scalar(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]})) {
        # all fragments of this packet sent
        splice(@{$self->{'O_BUFFER'}}, $self->{'O_COUNT'}, 1);
        $self->{'O_COUNT'} = 0 if (!${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]);
    } else {
        # fragments of this packet left
        $self->{'O_COUNT'} = 0 if (!${$self->{'O_BUFFER'}}[++$self->{'O_COUNT'}]);
    }
    if(scalar(@{$self->{'O_BUFFER'}})) {
	if (BLOCKING() || $Net::PSYC::ANACHRONISM) {
	    $self->write();
	} else {
	    Net::PSYC::Event::revoke($self->{'SOCKET'});
	}
    }
    return 1;
}

sub read () {
    my $self = shift;
    my ($data, $last);
    
    $self->{'LAST_RECV'} = $self->{'SOCKET'}->recv($data, 8192); # READ socket

    return if (!$data); # connection lost !?
    # gibt es nen 'richtigen' weg herauszufinden, ob der socket noch lebt?

    $self->{'I_BUFFER'} .= $data;
    delete $self->{'LF'};
    return 1;
}

sub negotiate { 1 }

#   returns _one_ mmp-packet .. or undef if the buffer is empty
sub recv () {    
    my $self = shift;
    if (length($self->{'I_BUFFER'}) > 2) {
	if ( $self->{'LF'} || $self->{'I_BUFFER'} =~ s/^\.(\r?\n)//g ) {
	    
	    $self->{'LF'} ||= $1;
	    my ($vars, $data) = parse_mmp(\$$self{'I_BUFFER'}, $self->{'LF'});
	    return if (!defined $vars);
	    unless (exists $vars->{'_source'}) {
		my ($port, $ip) = sockaddr_in($self->{'LAST_RECV'});
		$vars->{'_source'} = "psyc://$ip:$port";
	    }
	    return ($vars, $data);
	}
	# TODO : we need to provide a proper algorithm to clean up the
	# in-buffer if we got corrupted packets in it. and we need to
	# detect corrupted packets.. udp sucks noodles! ,-)
    }
    return;
}



1;