/usr/local/CPAN/Thread-Apartment/Thread/Apartment/Client.pm


#/**
# Client proxy for apartment threaded objects.
# <p>
# Licensed under the Academic Free License version 2.1, as specified in the
# License.txt file included in this software package, or at
# <a href="http://www.opensource.org/licenses/afl-2.1.php">OpenSource.org</a>.
#
# @author D. Arnold
# @since 2005-12-01
# @self	$self
#*/
package Thread::Apartment::Client;

use Carp;
use threads;
use threads::shared;
use Thread::Queue::Queueable;
use Thread::Queue::TQDContainer;
use Thread::Apartment;
use Thread::Apartment::Common;

use Thread::Apartment::Common qw(:ta_method_flags);

use base qw(Thread::Queue::Queueable Thread::Queue::TQDContainer Thread::Apartment::Common);

our $AUTOLOAD;

use strict;
use warnings;

our $VERSION = '0.51';

use constant TAC_CLASS_LEN => 27;
use constant TAC_REENT_LEN => 13;
use constant TAC_ASYNC_LEN => 9;

our $async_method;	# set by T::A::start()

sub CLONE { $async_method = undef; }

#/**
# Constructor. Creates a threads::shared hash to contain the proxy
# information, so it can be readily passed between threads.
#
# @param $proxied_class		the class of the object to be proxied
# @param $tqd				TQD communications channel to proxied object
# @param $id				unique object ID for proxied object
# @param $isa				arrayref of object's class hierarchy
# @param $methods			hashref mapping exported method names to behavior flags
# @param $timeout			TQD timeout seconds
# @param $tid				thread ID of the apartment thread for the proxied object
#
# @return		Thread::Apartment::Client object
#*/
sub new {
	my ($class, $proxied_class, $tqd, $id, $isa, $methods, $timeout, $tid)= @_;
#
#	create isa, can as shared so we can curse/redeem them easily
#
	my @isa : shared = ( @$isa, 'ta_invoke_closure' );
	my %can : shared = ( %$methods );

	my %self : shared = (
		_class	=> $proxied_class,	# class we're proxying
		_id 	=> $id,				# object unique ID (for object hierarchies)
		_tqd	=> $tqd,			# our comm. channel
		_isa	=> \@isa,			# classes in hierarchy of proxied object
		_can	=> \%can,			# exported methods of proxied object
		_timeout => $timeout,		# TQD timeout
		_server_tid => $tid,		# tid of apartment thread
	);
	bless \%self, $class;
#
#	if we have the $method, then we should proceed to
#	install all the exported methods into our object
#
	return \%self;
}
#/**
# Overload UNIVERSAL::isa() to test the class hierarchy of the proxied object.
#
# @param $class		class to check if implemented by the proxied object
#
# @return		1 if the proxied object implements $class; undef otherwise
#*/
sub isa {
	my ($self, $class) = @_;
#
#	catch stuff we need to expose for queueing purposes
#
	return 1
		if (($class eq 'Thread::Queue::Queueable') ||
			($class eq 'Thread::Queue::TQDContainer') ||
			($class eq 'Thread::Apartment::Client'));
	foreach (@{$self->{_isa}}) {
		return 1 if ($_ eq $class);
	}
	return undef;
}

#/**
# Overload UNIVERSAL::can() to test the available methods of the proxied object.
#
# @param $method	method to check if implemented by the proxied object
#
# @return		if the proxied object exports $method (or exports AUTOLOAD),
#				a closure forcing an AUTOLOAD of the specified $method; undef otherwise
#*/
sub can {
	my ($self, $method) = @_;
#
#	we may need to trap the methods for TQQ here...
#	NOTE!!! Need to return a coderef here!!!
#
	return ((exists $self->{_can}{$method}) ||
		(exists $self->{_can}{'*'}) ||
		(exists $self->{_can}{AUTOLOAD})) ?
		sub { $AUTOLOAD = $method; return $self->AUTOLOAD(@_); } :
		undef;
}
#/**
# Test if the specified method is exported as simplex
#
# @param $method method to test for simplex behavior
#
# @return		1 if $method is exported and is simplex; undef otherwise
#*/
sub ta_is_simplex {
	return (exists $_[0]->{_can}{$_[1]} ?
		($_[0]->{_can}{$_[1]} & TA_SIMPLEX) : undef);
}

#/**
# Test if the specified method is exported as urgent
#
# @param $method method to test for urgent behavior
#
# @return		1 if $method is exported and is urgent; undef otherwise
#*/
sub ta_is_urgent {
	return (exists $_[0]->{_can}{$_[1]} ?
		($_[0]->{_can}{$_[1]} & TA_URGENT) : undef);
}

#/**
# Set debug level. When set to a "true" value, causes the TAC to emit
# diagnostic information.
#
# @param $level	debug level. zero or undef turns off debugging; all other values enable debugging
#
# @return		the new level
#*/
sub tac_debug { $_[0]->{_tac_debug} = $_[1]; }

sub AUTOLOAD {
#
#	called in client stub
#	passes method name
#	if starts w/ ta_async_, then return immediately
#	if starts w/ ta_reentrant_, or local thread's T::A::is_reentrant
#		is true, interleave local thread inbound calls
#		while waiting for method results
#	NOTE: use explicit substr() instead of regex for performance
#
	my $self = shift;

	my $method = $AUTOLOAD;

	print STDERR "TAC::AUTOLOAD: Method is $method\n"
		if (substr($method, -9) ne '::DESTROY') && $self->{_tac_debug};

	$async_method = undef,
	return 1
		if (substr($method, -9) eq '::DESTROY');
#
#	get rid of leading stuff
#
#warn "requested method $method\n";
	$method = substr($method, TAC_CLASS_LEN)
		if (substr($method, 0, TAC_CLASS_LEN) eq 'Thread::Apartment::Client::');

	my $tid = threads->self()->tid();

	my $async;
	my $reentrant = Thread::Apartment::get_reentrancy();
	if (substr($method, 0, TAC_ASYNC_LEN) eq 'ta_async_') {
		$method = substr($method, TAC_ASYNC_LEN);
		$@ = "No response closure supplied for async method $method.",
		$async_method = undef,
		return undef
			unless $_[0] && (ref $_[0]) && (ref $_[0] eq 'CODE');

		$async = 1;
#		$method = defined($1) ? "$1$2" : $2;
	}
	elsif (substr($method, 0, TAC_REENT_LEN) eq 'ta_reentrant_') {
		$reentrant = 1;
		$method = substr($method, TAC_REENT_LEN);
#		print STDERR "Got re-entrant call to $method\n";
	}

	unless (($method eq 'ta_invoke_closure') ||
		(exists $self->{_can}{$method}) ||
		(exists $self->{_can}{'AUTOLOAD'})) {
		$@ = "Can't locate method \"$method\" via package \"$self->{_class}\"";
		print STDERR $@, "\n"
			if $self->{_tac_debug};
		$async_method = undef;
		return undef;
	}
#	print STDERR "Client objid is $self->{_id}\n"
#		if exists $self->{_can}{'AUTOLOAD'};
#
#	support simplex/urgent specification
#
	my $flag = $self->{_can}{$method} || 0;
#
#	including for closures
#	check for default closure call behaviors;
#	note that these are cumulative
#
#print "Simplex is ", TA_SIMPLEX, " urgent is ", TA_URGENT, "\n";
	$_[1] |= Thread::Apartment::get_closure_behavior(),
	$flag = ($_[1] & (TA_SIMPLEX | TA_URGENT))
		if ($method eq 'ta_invoke_closure');

	my @params = ($async && (!$async_method)) ?
		('ta_async', $self->{_id}, wantarray, $method) :
		($method, $self->{_id}, wantarray);
#
#	marshal params
#	(assume the TAS implementation has a complementary unmarshal)
#
#	print join(', ', @_), "\n"
#		if (scalar @_) && ($method eq 'ta_invoke_closure');
	push @params, $self->marshal(@_)
		if scalar @_;

	my $tqd = $self->{_tqd};			# perf opt.
	my $timeout = $self->{_timeout};	# perf opt.
#
#	don't support start()/rendezvous() for simplex
#
	$async_method = undef,
	return (($flag & TA_URGENT) ?
		$tqd->enqueue_simplex_urgent(@params) :
		$tqd->enqueue_simplex(@params))
		if ($flag & TA_SIMPLEX);

#print STDERR "calling getCase in $tid\n" if ($method eq 'getCase');

#print STDERR "calling $method with ", join(', ', @params), "\n"
#	if $async;
	my $id = ($flag & TA_URGENT) ?
		$tqd->enqueue_urgent(@params) :
		$tqd->enqueue(@params);
#
#	NOTE: we don't support ta_async_ w/ start()/rendezvous()
#
	$async_method = undef,
#	print STDERR "Called async method $method\n" and
	return $id
		if $async;

	Thread::Apartment->map_async_request_id($async_method, $self, $id),
	$async_method = undef,
	return $id
		if $async_method;

#print STDERR "called getCase in $tid\n" if ($method eq 'getCase');
#
#	if reentrant, attempt to service inbound calls to the caller
#	while we wait for the response...
#	note that the return value doesn't matter, since the subsequent
#	wait()'s will retrieve any pending response if the caller is
#	a TAS, or will just do the usual wait() thing if the caller
#	isn't TAS (i.e., run_wait returns undef)
#
	if ($reentrant) {
#		print STDERR "Calling T::A::run_wait in $tid for id $id at ", time(), "\n";

#		print STDERR "Returned from T::A::run_wait in $tid for timed out\n" and
		return undef
			unless Thread::Apartment::run_wait($tqd, $id, $timeout);
#		print STDERR "Returned from T::A::run_wait in $tid for id $id at ", time(), "\n";
	}

#print STDERR "waiting for getCase in $tid\n" if ($method eq 'getCase');

	my $resp = $timeout ?
		$tqd->wait_until($id, $timeout) :
		$tqd->wait($id);

#print STDERR "getCase returned in $tid\n" if ($method eq 'getCase');

#	warn "\nwait failed: $@\n" and
	return undef
		unless $resp;

	unless (defined($resp->[0])) {
		$@ = $resp->[1];
		print STDERR $@, "\n"
			if $self->{_tac_debug};
		return undef;
	}

#warn "got response: $$results[0]\n";
#	shift @$results;
#
#	unmarshal results
#
	my $results = $self->unmarshal($resp->[0]);

	return wantarray ? @$results : defined(wantarray) ? $results->[0] : 1;
}

#/**
# Return current TQD timeout
#
# @return		TQD timeout in seconds
#*/
sub get_timeout {
	return $_[0]->{_timeout};
}

#/**
# Return proxied class
#
# @return		proxied class name string
#*/
sub get_proxied_class {
	return $_[0]->{_class};
}

#/**
# Set TQD timeout
#
# @param $timeout	max. number of seconds to wait for TQD responses.
#
# @return		previous timeout value
#*/
sub set_timeout {
	my $to = $_[0]->{_timeout};
	$_[0]->{_timeout} = $_[1];
	return $to;
}
#/**
# Wait for the proxied object's apartment thread to exit.
#
# @return		1
#*/
sub join {
#
#	Don't know why, but unless we use the scalar TID
#	instead of deref'ing, object() just won't work ???
#
	my $tid = $_[0]->{_server_tid};

#print STDERR "Joining $tid\n";
	my $thread = threads->object($tid);
#print STDERR "Thread $tid not found\n" and
	return 1
		unless $thread;
	$thread->join();
#print STDERR "Joined...$tid\n";
	return 1;
}

#/**
# Stop the proxied object's apartment thread.
#*/
sub stop {
	$_[0]->{_tqd}->enqueue_simplex('STOP');
}
#/**
# TQQ redeem() override. Checks if the TAC has been passed into
# the thread in which is was created, in which case it looks up
# and returns the proxied object in the T::A object map. Otherwise, just
# blesses the object back into a TAC.
#
# @param $class	our TAC class
# @param $obj   the object to be redeem()ed
#
# @return		if in the originating thread, the proxied object; else a
#				reblessed TAC.
#*/
sub redeem {
	my ($class, $obj) = @_;

	bless $obj, $class;
	return ($obj->{_server_tid} == threads->self()->tid()) ?
		Thread::Apartment::get_object_by_id($obj->{_id}) : $obj;
}
#/**
# Return results of a pending method/closure request.
# Looks up the pending request ID in the current thread's T::A,
# then waits for the completion of the request, unmarshals and returns
# the results.
#
# @return		results of the currently pending request (if any)
#*/
sub get_pending_results {
	my $self = shift;
	my $id = Thread::Apartment->get_pending_request($self);
	return undef unless $id;
	my @results = $self->{_tqd}->wait($id);
	return @results;
}
#/**
# Set async method for next call in current thread.
#
# @param $async	boolean value to set $async_method flag
#
# @return		none
#*/
sub set_async { $async_method = $_[0]; }

1;