POE::TIKC - Transparent Inter-Kernel Communication (IKC)


POE-TIKC documentation Contained in the POE-TIKC distribution.

Index


Code Index:

NAME

Top

POE::TIKC - Transparent Inter-Kernel Communication (IKC)

SYNOPSIS

Top

	use POE qw(TIKC);

	POE::TIKC->create_server({
		address => '127.0.0.1',	# default
		port => '2021',			# default
	});

# You can now assume all connected clients' aliases are available # AFTER...you check if $POE::TIKC::connected == 1 before starting.

DESCRIPTION

Top

This module connects many clients to one server and creates proxy sessions in the client and the server for all aliases. When you post to a proxied session, it is sent to the right client and reposted there.

BUGS

Top

Probably, its not fully tested. So use this at your own risk of life and data.

FEATURES

Top

You can't post globs

You can't use session ids

You must check $POE::TIKC::connected == 1 before posting events until I can fix this by queueing the posts until a connection is made

You must make sure Storable is the same version on all machines using TIKC

Using $_[SENDER] for anything would be bad

Using call() instead of post will not return usefull info from the remote kernel

AUTHOR

Top

David Davis <xantus@cpan.org>

COPYRIGHT AND LICENSE

Top


POE-TIKC documentation Contained in the POE-TIKC distribution.

package POE::TIKC;
# (c) Copyright 2004, David Davis

use POE qw( Filter::Reference Component::Server::TCP Component::Client::TCP );

use strict;
#use warnings FATAL => "all";

#$|++;

our $VERSION = '0.02';

sub DEBUG { 0 }

# the client and server use this hash
our %clients;
#{
#	tcp session id => {
#		alias => mock session id,
#		alias2 => another mock session id,
#	}
#}

our $connected = 0;

POE::Session->create(
	heap => {
		alias =>'_tikc_manager',
	},
	package_states => [
		'POE::TIKC' => [qw(
			_start
			shutdown
			create_session
			alias_list
		)],
	],
);

sub _start {
	my ($kernel, $heap) = @_[KERNEL, HEAP];

	$kernel->alias_set($heap->{alias});
#	$kernel->delay_set(alias_list => 5);
}

sub shutdown {
	return if ($_[HEAP]->{shutdown});
	$_[HEAP]->{shutdown} = 1;
	foreach my $c (keys %clients) {
		$_[KERNEL]->call($c => 'shutdown');
	}
	$_[KERNEL]->call('_tikc_server' => 'shutdown');
	$_[KERNEL]->call('_tikc_client' => 'shutdown');
}

sub alias_list {
	my ($kernel, $heap) = @_[KERNEL, HEAP];
			
	my @aliases;
	my $kr_sessions = $POE::Kernel::poe_kernel->[POE::Kernel::KR_SESSIONS];
	foreach my $key ( keys %$kr_sessions ) {
		next if $key =~ /POE::Kernel/;
		foreach my $a ($kernel->alias_list($kr_sessions->{$key}->[0])) {
			#next if ($a =~ m/^_tikc/);
			push(@aliases,$a);
		}
	}
	DEBUG && print "(s) aliases: ".join(',',@aliases)."\n";
	$kernel->delay_set(alias_list => 5);
}

sub create_session {
	my ($sid, $alias) = @_[ARG0,ARG1];					
	return if ($_[HEAP]->{shutdown});
	return POE::Session->create(
		heap => {
			_tikc_proxy_session => 1,
			client => $sid,
			alias => $alias,
		},
		inline_states => {
			_start => sub {
				DEBUG && print "proxy session ".$_[SESSION]->ID." startup as ".$_[HEAP]->{alias}."\n";
				$_[KERNEL]->alias_set($_[HEAP]->{alias});
			},
			_i_k_c_shutdown => sub {
				$_[HEAP]->{shutdown} = 1;
				DEBUG && print "proxy session ".$_[SESSION]->ID." shutdown called\n";
				$_[KERNEL]->alias_remove($_[HEAP]->{alias});
			},
			_stop => sub {
				DEBUG && print "proxy session ".$_[SESSION]->ID." stopped\n";
			},
			_default => sub {
				return undef if ($_[ARG0] =~ /^_signal/);
				return if ($_[HEAP]->{shutdown});
				DEBUG && print "(sp) calling $_[ARG0] in remote alias: $_[HEAP]->{alias} through proxy\n";
				$_[KERNEL]->call($_[HEAP]->{client} => _tikc_send => {
					action => 'post',
					event => $_[ARG0],
					alias => $_[HEAP]->{alias},
					args => splice(@_,ARG1),
				});
			},
		},
	)->ID;
}

sub create_server {
	my $class = shift;
	my $opt = shift || {};
	
	POE::Component::Server::TCP->new(
		Alias			=> "_tikc_server",
		Address			=> $opt->{address} || "127.0.0.1",
		Port			=> $opt->{port} || 2021,
		ClientFilter	=> "POE::Filter::Reference",
		ClientDisconnected => sub {
			my ($kernel,$sid) = ($_[KERNEL],$_[SESSION]->ID);
			if (ref($clients{$sid}) eq 'HASH') {
				my @aliases;
				foreach my $a (keys %{$clients{$sid}}) {
					# $a is an alias
					# value is a session id
					$kernel->call($clients{$sid}->{$a} => '_i_k_c_shutdown');
					push(@aliases, $a);
				}
				if (@aliases) {
					foreach my $c (keys %clients) {
						# skip the exiting client
						next if ($c == $sid);
						$kernel->call($c => _tikc_send => { type => 'server', action => 'remove', aliases => \@aliases });
					}
				}
			}
			delete $clients{$sid};
			$connected = 0;
			DEBUG && print "Client Disconnected!\n";
		},
		ClientShutdownOnError => 1,
		ClientError => sub {
			my ($kernel,$sid) = ($_[KERNEL],$_[SESSION]->ID);
			# shouldn't client error ALWAYS call client disconnected?
			if (ref($clients{$sid}) eq 'HASH') {
				my @aliases;
				foreach my $a (keys %{$clients{$sid}}) {
					# $a is an alias
					# value is a session id
					$kernel->call($clients{$sid}->{$a} => '_i_k_c_shutdown');
					push(@aliases, $a);
				}
				if (@aliases) {
					foreach my $c (keys %clients) {
						# skip the exiting client
						next if ($c == $sid);
						$kernel->call($c => _tikc_send => { type => 'server', action => 'remove', aliases => \@aliases });
					}
				}
			}
			delete $clients{$sid};
			$connected = 0;
			DEBUG && print "Client Error, disconnected: $_[ARG2]\n";
		},
		ClientConnected => sub {
			my ($kernel,$heap) = @_[KERNEL, HEAP];
			
			DEBUG && print "Client Connected!\n";
			# TODO put a timer here to disconnect clients that
			# don't auth before 15 seconds
			
			# tell the client what aliases the server already knows
			my %aliases;
			my $kr_sessions = $POE::Kernel::poe_kernel->[POE::Kernel::KR_SESSIONS];
			foreach my $key ( keys %$kr_sessions ) {
				next if $key =~ /POE::Kernel/;
				foreach my $a ($kernel->alias_list($kr_sessions->{$key}->[0])) {
					# skip over internal sessions
					next if ($a =~ m/^_tikc/);
					my $h = $kr_sessions->{$key}->[0]->get_heap();
					next if (ref($h) eq 'HASH' && $h->{_tikc_proxy_session});
					$aliases{$a}++;
				}
			}
			# tell this client about sessions that we know about from other clients
			foreach my $a (keys %clients) {
				foreach my $k (keys %{$clients{$a}}) {
					$aliases{$k}++;
				}
			}
			DEBUG && print "(s) sending  aliases: ".join(',',keys %aliases)."\n";
			$kernel->call($_[SESSION] => _tikc_send => { type => 'client', action => 'setup', aliases => [keys %aliases] });
		},
		ClientInput => \&input,
		InlineStates => {
			_tikc_send => sub {
				my ($kernel, $heap, $data) = @_[KERNEL, HEAP, ARG0];

				if ($heap->{client}) {
					DEBUG && print "(s) sending data to ".$_[SESSION]->ID."\n";
					$heap->{client}->put($data);
				} else {
					DEBUG && print "(s) data sent to client ".$_[SESSION]->ID." ignored\n";
				}
			},
		},
	);
}

sub create_client {
	my $class = shift;
	my $opt = shift || {};
	
	POE::Component::Client::TCP->new(
		Alias			=> "_tikc_client",
		RemoteAddress	=> $opt->{address} || "127.0.0.1",
		RemotePort		=> $opt->{port} || 2021,
		Filter			=> "POE::Filter::Reference",
		ConnectError => sub {
			my ($kernel,$sid) = ($_[KERNEL],$_[SESSION]->ID);
			
			$kernel->delay_set(reconnect => 2);
			
			if (ref($clients{$sid}) eq 'HASH') {
				foreach my $a (keys %{$clients{$sid}}) {
					# $a is an alias
					# value is a session id
					$kernel->call($clients{$sid}->{$a} => '_i_k_c_shutdown');
				}
			}
			
			delete $clients{$sid};
			$connected = 0;
			DEBUG && print "Connect error, $_[ARG2]\n";
		},
		Disconnected => sub {
			my ($kernel,$sid) = ($_[KERNEL],$_[SESSION]->ID);
	
			$kernel->delay_set(reconnect => 2);
			
			if (ref($clients{$sid}) eq 'HASH') {
				foreach my $a (keys %{$clients{$sid}}) {
					# $a is an alias
					# value is a session id
					$kernel->call($clients{$sid}->{$a} => '_i_k_c_shutdown');
				}
			}
			
			delete $clients{$sid};
			$connected = 0;	
			DEBUG && print "Disconnected! Reconnecting...\n";
		},
		Connected => sub {
			my ($kernel,$heap) = @_[KERNEL, HEAP];
	
			DEBUG && print "Connected!\n";
			# tell the server what aliases we have
			my @aliases;
			my $kr_sessions = $POE::Kernel::poe_kernel->[POE::Kernel::KR_SESSIONS];
			foreach my $key ( keys %$kr_sessions ) {
				next if $key =~ /POE::Kernel/;
				foreach my $a ($kernel->alias_list($kr_sessions->{$key}->[0])) {
					# skip over internal sessions
					next if ($a =~ m/^_tikc/);
					my $h = $kr_sessions->{$key}->[0]->get_heap();
					next if (ref($h) eq 'HASH' && $h->{_tikc_proxy_session});
					push(@aliases,$a);
				}
			}
			push(@aliases, map { keys %{$clients{$_}} } %clients);
			DEBUG && print "(c) sending  aliases: ".join(',',@aliases)."\n";
			$kernel->call($_[SESSION] => _tikc_send => { type => 'server', action => 'setup', aliases => \@aliases });
		},
		ServerInput => \&input,
		InlineStates => {
			_tikc_send => sub {
				my ($kernel, $heap, $data) = @_[KERNEL, HEAP, ARG0];

				if ($heap->{server}) {
					DEBUG && print "(c) sending data to ".$_[SESSION]->ID."\n";
					$heap->{server}->put($data);
				} else {
					DEBUG && print "(c) data sent to server ".$_[SESSION]->ID." ignored\n";
				}
			},
		},
	);
}

sub input {
	my ( $heap, $kernel, $data ) = @_[ HEAP, KERNEL, ARG0 ];

	if (ref($data) eq 'HASH') {
		if (exists($data->{action})) {
			if ($data->{action} eq 'post') {
				# if this session isn't a 'real' session
				# _default of our mock session forwards it to the server
				DEBUG && print "(i) searching for == $data->{alias} ==\n";
				my $sr = $kernel->alias_resolve($data->{alias});
				if (ref($sr)) {
					my $h = $sr->get_heap();
					DEBUG && print "(i) posting to $data->{alias}\n";
					if (@{$data->{args}}) {
						$kernel->call($sr => $data->{event} => @{$data->{args}});
					} else {
						$kernel->call($sr => $data->{event});
					}
				} else {
					# XXX notify client?
					DEBUG && print "(i) client posted to invalid alias $data->{alias}, ignoring\n";
				}
			} elsif ($data->{action} eq 'setup') {
				foreach my $i (@{$data->{aliases}}) {
					my $sr = $kernel->alias_resolve($i);
					if (ref($sr)) {
						warn "Session (alias $i) already exists as session_id: ".$sr->ID."\n";
						my $h = $sr->get_heap();
						if (ref($h) eq 'HASH' && $h->{_tikc_proxy_session}) {
							warn "!!!! it is a tikc proxy session, I'll use that session\n";
							$clients{$_[SESSION]->ID}->{$i} = $sr->ID;
						}
					} else {
						DEBUG && print "(i) creating proxy session for remote alias $i\n";
						my $sid = $_[SESSION]->ID;
						# create a client key with the session id

						$clients{$sid}->{$i} = $kernel->call(_tikc_manager => 'create_session' => $sid => $i);

						DEBUG && do {
							require Data::Dumper;
							print Data::Dumper->Dump([\%clients]);
						};
					}
				}
				foreach my $c (keys %clients) {
					next if ($c == $_[SESSION]->ID);
					$kernel->call($c => _tikc_send => {
						action => 'setup',
						aliases => $data->{aliases},
					});
				}
				# we're considered connected after the first setup command
				$connected = 1;
			} elsif ($data->{action} eq 'remove') {
				my $sid = $_[SESSION]->ID;
				DEBUG && print "(i) I was told me to remove aliases: ".join(',',@{$data->{aliases}})."\n";
				foreach my $a (@{$data->{aliases}}) {
					if (exists($clients{$sid}->{$a})) {
						DEBUG && print "(i) shutting down session $clients{$sid}->{$a}\n";
						$kernel->call($clients{$sid}->{$a} => '_i_k_c_shutdown');
					}
				}
			}
		} else {
			DEBUG && print "Received response from TIKC without action\n";
		}
	} else {
		DEBUG && print "Received an unknown response from TIKC type: ", ref($data)."\n";
	}
}

1;

__END__