/usr/local/CPAN/Combine/Saa.pm


#!/usr/bin/perl

use strict;

package Saa;
use Tana;

use Data::Dumper;
use Sys::Hostname;
use IO::Socket;
use IO::Select;
use Fcntl;

my $LOCALADDR_PREFIX = "/var/tmp/searchrpc_localsoc_";
my $debug = 0;

sub new
{
    my ($this) = @_;
    my $class = ref($this) || $this;

    my $my_addr = gethostbyname(hostname());
    if(!defined($my_addr))
    {
	return undef;
    }

    $this = {
	'servs' => {},
	'serv_sel' => IO::Select->new(),
	'conns' => {},
	'conn_sel' => IO::Select->new(),
	'ip_clis' => {},
	'my_addr' => $my_addr,
	'err' => '',
	'queue' => [],
    };

    bless $this, $class;

    $SIG{'PIPE'} = 'IGNORE';

    return $this;
}

sub err
{
    my $this = shift;

    return $this->{'err'};
}

# 'auto_arb' => bool    # Autoread arb messages?
# 'callback' => [func, params]
sub listen
{
    my $this = shift;
    my $port = shift;

    my %par = @_;

    if(exists($this->{'servs'}->{$port}))
    {
	$this->{'err'} = "Already listening";
	return 0;
    }
    
    my $serv = 
    {
	'port' => $port,
	'auto_arb' => 0,
    };

    if(exists($par{'callback'}))
    {
	$serv->{'callback'} = $par{'callback'};
    }

    if(exists($par{'auto_arb'}))
    {
	$serv->{'auto_arb'} = $par{'auto_arb'};
    }

    my $inet_sock = IO::Socket::INET->new(LocalPort => $port,
					  Type => SOCK_STREAM,
					  Reuse => 1,
					  Listen => 10);
    if(!defined($inet_sock))
    {
	$this->{'err'} = "$@";
	return 0;
    }

#    print STDERR "Soketti on $LOCALADDR_PREFIX$port\n";
    unlink "$LOCALADDR_PREFIX$port";
    my $unix_sock = IO::Socket::UNIX->new(Local => "$LOCALADDR_PREFIX$port",
					  Type => SOCK_STREAM,
					  Listen => 10);
    if(!defined($unix_sock))
    {
	$this->{'err'} = "$@";
	close($inet_sock);
	return 0;
    }

    binmode($inet_sock, ":raw");
    binmode($unix_sock, ":raw");

    $serv->{'inet_sock'} = $inet_sock;
    $serv->{'unix_sock'} = $unix_sock;

    $this->{'servs'}->{$port} = $serv;
    $this->{'serv_sel'}->add($inet_sock);
    $this->{'serv_sel'}->add($unix_sock);

    return 1;
}

sub connected
{
    my $this = shift;
    my $host = shift;
    my $port = shift;

    return(exists($this->{'conns'}->{"${host}_$port"}));
}

sub disconnect_all
{
    my $this = shift;

    foreach (keys(%{$this->{'conns'}}))
    {
	my $conn = $this->{'conns'}->{$_}->{'conn'};
	$this->{'conn_sel'}->remove($conn);
	delete($this->{'conns'}->{"$_"});

	shutdown($conn, 2);
	close($conn);
    }

    return 1;
}

sub disconnect
{
    my $this = shift;
    my $host = shift;
    my $port = shift;

    if(!exists($this->{'conns'}->{"${host}_$port"}))
    {
	$this->{'err'} = "Not connected";
	return 0;
    }

    my $conn = $this->{'conns'}->{"${host}_$port"}->{'conn'};
    $this->{'conn_sel'}->remove($conn);
    delete($this->{'conns'}->{"${host}_$port"});

    shutdown($conn, 2);
    close($conn);

    return 1;
}


sub unlisten
{
    my $this = shift;
    my $port = shift;

    if(!exists($this->{'servs'}->{$port}))
    {
	$this->{'err'} = "Not connected";
	return 0;
    }

    my $serv = $this->{'servs'}->{$port};
    $this->{'serv_sel'}->remove($serv->{'unix_sock'});
    $this->{'serv_sel'}->remove($serv->{'inet_sock'});
    shutdown($serv->{'unix_sock'}, 2);
    shutdown($serv->{'inet_sock'}, 2);
    close($serv->{'unix_sock'});
    close($serv->{'inet_sock'});
    unlink("$LOCALADDR_PREFIX$port");
    delete($this->{'servs'}->{$port});

    return 1;
}

sub connect
{
    my $this = shift;
    my $host = shift;
    my $port = shift;

    if(exists($this->{'conns'}->{"${host}_$port"}))
    {
	$this->{'err'} = "Already connected";
	return 0;
    }

    my $cn = 
    {
	'host' => $host,
	'port' => $port,
	'auto_arb' => 1,
    };

    my $addr = gethostbyname($host);
    my $conn = undef;
# local socket handling is fundamentally broken, a saa-redesign is needed
#    if($this->{'my_addr'} eq $addr) # try domain socket first
#    {
#	$conn = IO::Socket::UNIX->new(Peer => "$LOCALADDR_PREFIX$port",
#				      Type => SOCK_STREAM,
#				      Timeout => 10);
#    }
    if(!defined($conn))
    {
#	$debug && print STDERR "Saa::connect(): domain socket $LOCALADDR_PREFIX$port failed with $!, trying inet\n";
	if(!($conn = IO::Socket::INET->new(PeerAddr => $host,
					   PeerPort => $port,
					   Proto => "tcp",
					   Type => SOCK_STREAM)))
	{
	    $debug && print STDERR "Saa::connect(): tcp connect failed with $@\n";
	    $this->{'err'} = "$@";
	    return 0;
	}
    }
    else
    {
	$debug && print STDERR "Saa::connect(): Successfully opened localsoc!\n";
    }

    binmode($conn, ":raw");

    $cn->{'conn'} = $conn;
    $this->{'conn_sel'}->add($conn);
    $this->{'conns'}->{"${host}_$port"} = $cn;

    return 1;
}

# 'auto_arb' => bool
sub conn_set
{
    my $this = shift;
    my $host = shift;
    my $port = shift;

    my %par = @_;

    my $c = "${host}_$port";
    if(!exists($this->{'conns'}->{$c}))
    {
	$this->{'err'} = "No such connection.";
	return 0;
    }

    for(keys(%par))
    {
	$this->{'conns'}->{$c}->{$_} = $par{$_};
    }
    
    return 1;
}


# 'tag' => client name for the msg
# 'arb' => scalar data or func(tag) that returs scalar or undef on end-of-data
# 'arb_name' => scalar
sub queue
{
    my $this = shift;
    my $host = shift;
    my $port = shift;
    my $msg = shift;

    my %par = @_;

    my $q_elem = {
	'host' => $host,
	'port' => $port,
	'msg'  => $msg
    };

    if(exists($par{'arb'}))
    {
	$q_elem->{'arb'} = $par{'arb'};
	$q_elem->{'arb_name'} = $par{'arb_name'};
    }

    if(exists($par{'tag'}))
    {
	$q_elem->{'tag'} = $par{'tag'};
    }

#    print STDERR "scheduled req: " . Dumper($q_elem);
    push(@{$this->{'queue'}}, $q_elem);

    return 1;
}

sub process_accept
{
    my $this = shift;
    my $timeout = shift;

    $timeout=10;

    my @servs = keys(%{$this->{'servs'}});
    my @reads = $this->{'serv_sel'}->can_read($timeout);
    
#    print "Riidit: " . Dumper(\@reads) . "\n";
    my $conn;
    foreach $conn (@reads)
    {
	my $serv;
	my $found = 0;
	for(@servs)
	{
	    if($this->{'servs'}->{$_}->{'inet_sock'} == $conn ||
	       $this->{'servs'}->{$_}->{'unix_sock'} == $conn)
	    {
		$serv = $this->{'servs'}->{$_};
		$found = 1;
		last;
	    }
	}

	my $client = $conn->accept();
#	print "Conn " . Dumper($conn);
	if(!defined($client))
	{
#	    print STDERR "PRKL: $!\n";
	    next;
	}

	my $str_ip;
	my $port;
#for some reason sockdomain returns undef
#	if(AF_INET == $client->sockdomain)
#	{
	    my $sockaddr = $client->peername();
	    my $iaddr;
	    ($port, $iaddr) = sockaddr_in($sockaddr);
	    $str_ip = inet_ntoa($iaddr);
#	    print STDERR "Saa: accept found port $port and ip $str_ip\n";
#	}
#	else # AF_UNIX
#	{
#	    my $sn = $client->sockname();
#	    $sn =~ /$LOCALADDR_PREFIX([0-9]+)/;
#	    $port = $1;
#	    $str_ip = inet_ntoa($this->{'my_addr'});
#	    $debug && print STDERR "Saa::process_accept(): AF_UNIX connection with ip $str_ip port $port\n";
#	}

	my $cn = 
	{
	    'host' => $str_ip,
	    'port' => $port,
	    'conn' => $client,
	    'lport' => $serv->{'port'},
	};

	$serv->{'auto_arb'} && ($cn->{'auto_arb'} = $serv->{'auto_arb'});
	$serv->{'callback'} && ($cn->{'callback'} = $serv->{'callback'});

	$this->{'conns'}->{"${str_ip}_$port"} = $cn;
	$this->{'conn_sel'}->add($client);
    }

    return (1, 0);
}

sub process_write
{
    my $this = shift;
    my $sent = shift;

    my $q = $this->{'queue'};
    my %banned = (); # makes sure the order of messages for the same connection is kept

    my $offset = 0;
    while($offset < scalar(@$q))
    {
	my $qe = $q->[$offset];

	# ensure connection
	if(! $this->connected($qe->{'host'}, $qe->{'port'}))
	{
#	    print STDERR "Write connects to $qe->{'host'} $qe->{'port'}\n";
	    if(!$this->connect($qe->{'host'}, $qe->{'port'}))
	    {
#		print STDERR "Write is not connected to $qe->{'host'} $qe->{'port'}: $@\n";
		$this->{'err'} = $@;
		$qe->{'status'} = "failed";
		shift(@$q);
		push(@$sent, $qe);

		return (0, scalar(@$q));
	    }
	}
	
	my $connstr = $qe->{'host'} . "_" . $qe->{'port'};
	my $conn = $this->{'conns'}->{$connstr}->{'conn'};

	#see writability if not known
	if($banned{$connstr}) 
	{
	    $offset++;
	    next; # earlier message in q already unsent to this connstr
	}
	
	if(scalar(IO::Select->new($conn)->can_write(0)))
	{
	    my $ok;
	    if(defined($qe->{'arb_name'}))
	    {
#		print STDERR "Saa: writing arb " . $qe->{'arb_name'} . " " . Dumper($qe->{'msg'});
		$ok = Tana::write($conn, $qe->{'msg'}, $qe->{'arb_name'});
	    }
	    else
	    {
#		print STDERR "Saa: writing fix " . Dumper($qe->{'msg'});
		$ok = Tana::write($conn, $qe->{'msg'}, undef);
	    }
	    if(!$ok)
	    {
		$qe->{'status'} = "failed";
		$this->{'err'} = Tana::error($conn);
		$this->disconnect($qe->{'host'}, $qe->{'port'});
		shift(@$q);
		push(@$sent, $qe);
		
		return (0, $offset < scalar(@$q));
	    }
	    
	    if(defined($qe->{'arb_name'}))
	    {
		$ok = 1;
		my $func = undef;
		my @param;
		if(!ref($qe->{'arb'})) #scalar
		{
#		    print STDERR "S: writing scalar arb " . $qe->{'arb'} . "\n";
#		    print STDERR Dumper($qe->{'arb'});
		    $ok = Tana::write_arb($conn, $qe->{'arb'}, 1);
		}
		elsif(ref($qe->{'arb'}) eq 'ARRAY')
		{
#		    print STDERR "S: arb-callback with params\n";
		    @param = @{$qe->{'arb'}};
		    $func = shift(@param);
		}
		else
		{
#		    print STDERR "S: arb-callback without params\n";
		    $func = $qe->{'arb'};
		    @param = ();
		}
		
		if(defined($func))
		{
#		    print STDERR "S: writing arb from func...\n";
		    my $end = 0;
		    my $arb;
		    while($ok && !$end)
		    {
			($arb, $end) = $func->($this, $qe->{'tag'}, $qe->{'host'}, $qe->{'port'}, @param);
			$ok = Tana::write_arb($conn, $arb, $end);
		    }
		    
		    if(!$ok)
		    {
			$qe->{'status'} = "failed";
			$this->{'err'} = Tana::error($conn);
			$this->disconnect($qe->{'host'}, $qe->{'port'});
			shift(@$q);
			push(@$sent, $qe);
			
			return (0, $offset < scalar(@$q));
		    }
		}
	    }
	    $qe->{'status'} = "ok";
	    shift(@$q);
	    push(@$sent, $qe);
	}
    }	

    return(1, $offset < scalar(@$q));
}

sub process_read
{
    my $this = shift;
    my $received = shift;
    my $timeout = shift;

    my $pending = 0;

#    if(rand(100000) < 10)
#    {
#	my $ch = $this->{'conns'};
#	print STDERR "Saa::process_read() does can_read for " . scalar(keys(%$ch)) . " sockets\n";
#    }
    my @conns = $this->{'conn_sel'}->can_read(0);
    my @cns = keys(%{$this->{'conns'}});

#    print STDERR "read conns: " . scalar(@conns) . Dumper(@conns);
    
    my $conn;
    foreach $conn (@conns)
    {
	my $cn;
	my $cnkey;
	my $found = 0;
	for (@cns)
	{
	    if($this->{'conns'}->{$_}->{'conn'} == $conn)
	    {
		$cn = $this->{'conns'}->{$_};
		$cnkey = $_;
		$found = 1;
		last;
	    }
	}

	my $arb_type = 0;
#		print STDERR "saa: reading msg from " . $conns[$i] . " / " . fileno($conns[$i]) . "\n";
	my $msg = Tana::read($conn, \$arb_type);

#	warn "Saa process_read(): Tana::read() gave msg",Dumper($msg);

	if(!defined($msg))
	{
	    $this->{'err'} = Tana::error($conn);
	    if(scalar(@conns) > scalar(@$received))
	    {
		$pending = 1;
	    }
	    
	    $this->disconnect($cnkey);
	    next;
	}
	my $entry = 
	{
	    'msg' => $msg,
	    'type' => 'fix',
	    'host' => $cn->{'host'},
	    'port' => $cn->{'port'},
	    'conn' => $conn,
	};
	if(defined($arb_type))
	{
	    $entry->{'type'} = 'arb';
	    $entry->{'arb_name'} = $arb_type;
	    if(exists($cn->{'auto_arb'}) && ($cn->{'auto_arb'}))
	    {
		my $eom = 0;
		my $arb = '';
		while(!$eom)
		{
		    my $ext = Tana::read_arb($entry->{'conn'}, 1024000, \$eom);
		    if(!defined($ext))
		    {
			$this->{'err'} = "Error auto-reading arb: " . Tana::error($entry->{'conn'});
			if(scalar(@conns) > scalar(@$received))
			{
			    $pending = 1;
			}
			return (0, $pending);
		    }
		    
		    $arb .= $ext;
		}
		
		$entry->{'arb'} = $arb;
	    }
	}
	
	if($cn->{'callback'})
	{
	    my $cb = $cn->{'callback'};
	    my $func = undef;
	    my @param = ();
	    
	    $debug && print STDERR "Callback = ", ref($cb), "\n";
	    if(ref($cb) eq 'CODE')
	    {
		$func = $cb;
	    }
	    else
	    {
		@param = @$cb;
		$func = shift(@param);
	    }
	    $debug && print STDERR "Func cb ref = ", ref($func), "\n";
	    $func->($this, $entry, @param);
	}
	else
	{
	    push(@$received, $entry);
	}
    }

    return (1, 0);
}


sub process
{
    my $this = shift;
    my $timeout = shift;

    $timeout=10;

    my $received = [];
    my $sent = [];

    #cleanup
    for (keys(%{$this->{'conns'}}))
    {
	my $c = $this->{'conns'}->{$_}->{'conn'};
	if((!$c->connected()))
	{
	    print STDERR "Reaping $_\n";
	    my($host, $port) = split("_", $_);
	    $this->disconnect($host, $port);
	}
    }

    # read from conns
#    print STDERR "*saa read\n";
    my ($ok, $pending) = $this->process_read($received, $timeout);
    if(!$ok)
    {
	print STDERR "read sanoi nok\n";
	return (0, $sent, $received, $pending);
    }

#    print STDERR "*saa write\n";
    # write queue
    if($pending)
    {
	($ok, undef) = $this->process_write($sent, $timeout);
    }
    else
    {
	($ok, $pending) = $this->process_write($sent, $timeout);
    }
    if(!$ok)
    {
#	print STDERR "write sanoi nok\n";
	return (0, $sent, $received, $pending);
    }

#    print STDERR "*saa accept\n";
    # accept
    if($pending)
    {
	($ok, undef) = $this->process_accept($timeout);
    }
    else
    {
	($ok, $pending) = $this->process_accept($timeout);
    }

    return ($ok, $sent, $received, $pending);
}

sub tana_msg_reply
{
    my ($saa, $msg, $host, $port, $wait) = @_;

    my $giveup_time = time() + $wait;
    my $done = 0;
    my $reply = undef;
    my $ok = 1;
    do
    {
	if(!$saa->queue($host, $port, $msg))
	{
	    return (0, "Saa::queue() failed: " . $saa->{'err'}); 
	}

	my $received = [];
	$ok = 1;
	while(scalar(@$received) < 1 && (time() < $giveup_time) && $ok)
	{
	    ($ok, undef, $received, undef) = $saa->process(0.1);
	    if(!$ok)
	    {
		return (0, "Saa::process() failed: " . $saa->{'err'});
	    }
	}
	if(scalar(@$received) > 0)
	{
	    $reply = $received->[0]->{'msg'};
	    $done = 1;
	}
    } while((!$done) && (time() < $giveup_time));

    if(!$done)
    {
	return (0, "Timeout.");
    }

    return ($ok, $reply);
}

sub tana_msg_send
{
    my ($saa, $msg, $host, $port, $wait) = @_;

    my $giveup_time = time() + $wait;
    my $done = 0;
    my $stat = undef;
    my $sent = [];
    my $ok = 1;
    do
    {
	if(!$saa->queue($host, $port, $msg))
	{
	    return (0, "Saa::queue() failed: " . $saa->{'err'}); 
	}
	$ok = 1;
	$sent = [];
	while(scalar(@$sent) < 1 && (time() < $giveup_time) && $ok)
	{
	    ($ok, $sent, undef, undef) = $saa->process(0.1);
	    if(!$ok)
	    {
		return (0, "Saa::process() failed: " . $saa->{'err'});
	    }
	}
	if(scalar(@$sent) > 0)
	{
	    $done = 1;
	}
    } while((!$done) && (time() < $giveup_time));

    if(!$done)
    {
	return (0, "Timeout.");
    }

    return ($ok, $sent->[0]->{'status'});
}



1;