/usr/local/CPAN/Thread-Apartment/Thread/Apartment.pm


#/**
# Provides apartment threading wrapper to encapsulate Perl objects
# in their own apartment thread.
# <p>
# Licensed under the Academic Free License version 2.1, as specified in the
# License.txt file included in this software package, or at
# <a href="http://www.opensource.org/licenses/afl-2.1.php">OpenSource.org</a>.
#
# @author D. Arnold
# @since 2005-12-01
# @self	$self
# @exports start	async method/closure request method
# @exports rendezvous method to wait for completion of async method/closure calls
# @exports rendezvous_any method to wait for completion of async method/closure calls
# @exports rendezvous_until method to wait for completion of async method/closure calls
# @exports rendezvous_any_until method to wait for completion of async method/closure calls
#*/
package Thread::Apartment;
#
#	encapsulates an object in an apartment thread
#
#		new($class, @args) :
#	my $obj = Thread::Apartment->new(
#		AptClass => 'MyClass',
#		AptTimeout => $timeout,
#		AptQueue => $tqd,
#		...args...
#
#	);
#
use threads;
use threads::shared;
use Time::HiRes qw(time sleep);
use Exporter;
use Thread::Queue::Duplex;
use Thread::Apartment::Server;
use Thread::Apartment::Closure;
use Thread::Apartment::Common;
use Thread::Apartment::Common qw(:ta_method_flags);

use base('Exporter');

@EXPORT = qw(start rendezvous rendezvous_any
	rendezvous_until rendezvous_any_until);

@EXPORT_OK = qw(
	set_single_threaded
	get_object_by_id
	get_tac_for_object
	destroy_object
	add_object_reference
	add_mapped_object
	alloc_mapped_object
	get_object_methods
	evict_objects
	register_closure
	get_closure
	get_reentrancy
	set_reentrancy
	set_ta_debug
);

#
#	class method for collecting class hierarchy and
#	available methods of an object
#
use strict;
use warnings;

our %apt_pool : shared = ();	# thread pool (maps TIDs to TQDs)
our @apt_tids : shared = ();	# TIDs of free threads
our $ta_debug : shared;	# global debug flag

our %closure_map = ();			# map ids to closures
our $next_closure_id = 0;		# closure ID generator
our $closure_signature = 0;		# closure validity test
our $closure_tac;				# a special TAC to handle closures
our $is_reentrant = undef;		# 1 => permit inbound calls during wait
our $autoload_all = undef;		# 1 => export any method
our $closure_calls = undef;		# TA_SIMPLEX | TA_URGENT => set default closure call behavior
#
#	object hierarchy maps
#
our %objmap = ();
our @objidmap = ();
our @objrefcnts = ();
#
#	start/rendezvous maps
#
our %tacmap = ();

#END {
#
#	run down any threads left in the pool
#
#warn "Thread::Apartment ENDing in thread ", threads->self()->tid(), "\n";
#	{
#	lock(%apt_pool);
#	$_->enqueue_simplex('STOP')
#		foreach (values %apt_pool);
#	sleep 1;
#	foreach (keys %apt_pool) {
#		threads->object($_)->join()
#			if threads->object($_);
#	}
#	}
#};


our $VERSION = '0.51';

our $single_threaded;		# when set, we fallback to normal behavior
#
#	clear out closures and object heirarchies when cloned
#
#/**
# ithreads CLONE() method to cleanup context when a new thread is spawned.
#*/
sub CLONE {
	%closure_map = ();
	$next_closure_id = 0;
	$closure_signature = 0;
	$closure_tac = undef;
	%objmap = ();
	@objidmap = ();
	@objrefcnts = ();
	%tacmap = ();
}

#/**
# Factory constructor. Creates a factory object
# for Thread::Apartment (useful for apps which
# may subclass T::A in future)
#
# @static
# @return		T::A object
#*/
sub get_factory {
	return bless [], $_[0];
}

#/**
#
# Thread governor for installed objects
#
# @static
# @return		1
#*/
sub run {
	$@ = 'No installed object.',
	return undef
		unless $objidmap[1];

	return _run($objmap{$objidmap[1][0]}->get_queue(), 1, $objidmap[1][0]);
}
#
#	internal thread governor
#
sub _run {
	my ($cmdq, $takeQ, $tas) = @_;
#
#	manufacture our TQD for return to the caller
#	unless $takeQ indicates we should take the provided
#	queue (which is the T::A->new() constructor)
#
	my $tqd = $takeQ ? $cmdq : Thread::Queue::Duplex->new(ListenerRequired => 1);
	$tqd->listen();
	$cmdq->enqueue_simplex(threads->self()->tid(), $tqd)
		unless $takeQ;
	my $tid = threads->self()->tid();
#
#	NOTE: we must explicitly init these to undef due to threads::shared
#	bugs
#
	my ($installed, $class, $req, $isa, $methods, $is_evt, $is_mux, $is_tas, $tac) =
		(undef, undef, undef, undef, undef, undef, undef, undef, undef);
	my ($id, $method, $objid, $wantary, $obj) = (undef, undef, undef, undef, undef);
	my ($simplex, $async, $sig, $cid) = (undef, undef, undef, undef);
	my $ta_debug = undef;
#
#	create closure for responses; we'll switch between this
#	and any async respons closure
#
	my $tqdresp = sub { $tqd->respond(@_); };
	my $respmeth = undef;
	my @resp = ();
	my $resp = undef;
#
#	init object hierarchy map
#
	if ($tas) {
		$installed = 1;
		$class = ref $tas;
		$is_tas = $tas->isa('Thread::Apartment::Server');
		$is_evt = $tas->isa('Thread::Apartment::EventServer');
		$is_mux = $tas->isa('Thread::Apartment::MuxServer');
		$tac = $tas->get_client();
	}
	else {
		%objmap = ();		# from object to ID
		@objidmap = ();		# from ID to object
		@objrefcnts = ();	# object ID to refcount map
	}

#print STDERR "Thread::Apartment::run started in thread $tid\n";
	while (1) {
#
#	type of dequeue depends on type of root object
#	!!!NOTE: need to break this down and move into TAS
#	so that e.g. Perl/Tk can interleave TQD polling with
#	MainLoop (i.e., invert control flow)!!!
#
		if ($is_evt) {
			print STDERR "Polling Event server\n"
				if $ta_debug;

			$req = $tqd->dequeue_nb();
			$tas->poll(),
			next
				unless $req;
		}
		elsif ($is_mux) {
			print STDERR "Starting Mux server\n"
				if $ta_debug;

			my $vacated = $tas->run();
#
#	on return, we need to re-init
#
			$is_tas = $is_evt = $is_mux = undef;
			$tas = undef;
			%objmap = ();
			@objidmap = ();
			@objrefcnts = ();
			return 1
				if $installed;

#			print STDERR "leaving apt\n" and
			last unless $vacated;

			next;
		}
		else {
			$req = $tqd->dequeue();
		}

#print STDERR "Thread::Apartment::run got request in thread $tid\n";

		if ($req->[1] eq 'STOP') {
#
#	give object a chance to rundown
#
#print STDERR "Stopping $tid...\n";
			if ($tas) {
				$tas->evict()
					if $is_tas;
#print STDERR "Evicted $tid...\n";
				$tas = undef;
				%objmap = ();
				@objidmap = ();
				@objrefcnts = ();
			}
			last;
		}
#
#	must be function call
#
		$id = shift @$req;

		$method = shift @$req;
		($objid, $wantary) = ($method eq 'ta_install') ?
			(undef, undef) : (shift @$req, shift @$req);
#
#	create and install new instance of the class
#
		($tas, $class, $is_tas, $is_evt, $is_mux) = _ta_install($tqd, $req, $id),
		next
			if ($method eq 'ta_install');
#
#	if we're DESTROY'd, post error
#
		$tqd->respond($id, undef, "Apartment threaded $class object has been evicted."),
		next
			if (! $tas) && $class && $id;
#
#	if we're new and empty, post error
#
		$tqd->respond($id, undef, "Apartment thread is empty."),
		next
			if (! $tas) && $id;
#
#	if unknown object id, post error
#
		$tqd->respond($id, undef, "Unknown object (ID $objid)."),
		next
			unless $objidmap[$objid];
#
#	check for ref count (this is simplex)
#
		$objrefcnts[$objid]++,
		next
			if ($method eq '_add_ref');

		if ($method eq 'DESTROY') {
#
#	only destroy when refcount <= 0
#	also a simplex method
#
#print STDERR "DESTROYing\n";
			$objrefcnts[$objid]--;
			if ($objrefcnts[$objid] <= 0) {
				$objrefcnts[$objid] = 0;
				delete $objmap{$objidmap[$objid][0]};
				$objidmap[$objid] = undef;
				if ($objid == 1) {
					free_thread();
					$tas = undef;
					return 1
						if $installed;
				}
			}
			next;
		}
#
#	permits the object to be forced out, wo/ regard to objrefcnts
#
		if ($method eq 'evict') {
			$tas->evict()
				if $is_tas;
			%objmap = ();
			@objidmap = ();
			@objrefcnts = ();
			$tas = undef;
			return 1
				if $installed;
			next;
		}
#
#	verify method
#
		$respmeth = $tqdresp;
		$methods = $objidmap[$objid][2];
		$tqd->respond($id, undef, "Unknown method $method."),
		next
			unless ($method eq 'ta_async') ||
				($method eq 'ta_invoke_closure') ||
				(exists $methods->{$method}) ||
				(exists $methods->{AUTOLOAD});

		$async = undef;
		@resp = ();
		$async = 1,
		$method = shift @$req
			if ($method eq 'ta_async');
#
#	unmarshal args
#
		$req = scalar @$req ?
			($is_tas ?
				$tas->unmarshal($req->[0]) :
				Thread::Apartment::Server->unmarshal($req->[0])) :
				[];

		if ($async) {
#
#	async method call
#
			$respmeth = $req->[0];
			print STDERR "T::A::run: async call on $method with respmethod $respmeth\n"
				if $ta_debug;

			$tqd->respond($id, undef, "No closure provided for async call to $method."),
			next
				unless $respmeth && (ref $respmeth) && (ref $respmeth eq 'CODE');

			$respmeth->($id, undef, "Unknown method $method."),
			next
				unless (exists $methods->{$method}) ||
					(exists $methods->{AUTOLOAD});

			print STDERR "T::A::run: valid async call on $method \n"
				if $ta_debug;
		}

		$simplex = (exists $methods->{$method}) ?
			($methods->{$method} & TA_SIMPLEX) : 0;

		if ($method eq 'ta_invoke_closure') {
#
#	closure call
#	verify the signature and the ID
#	then call the closure
#
			($sig, $cid) = (shift @$req, shift @$req);

			$simplex = $cid & TA_SIMPLEX,
			$cid &= 0xFFFFFFFC
				if $cid;	# clear behavior bits

			print STDERR "T::A::run: calling closure $cid for sig $sig\n"
				if $ta_debug;

			$respmeth->($id, undef, "Stale closure call to thread $tid."),
			next
				unless $sig && $cid && ($sig == $closure_signature) && $closure_map{$cid};

			$cid &= 0xFFFFFFFC;	# remove flags values
#	print STDERR "Invoking closure w/ ", join(', ', @$req), "\n";

			if ($wantary) {
				@resp = $closure_map{$cid}->(@$req);
			}
			elsif (defined($wantary)) {
				$resp[0] = $closure_map{$cid}->(@$req);
			}
			else {
				$closure_map{$cid}->(@$req);
				$resp[0] = 1;
			}

			print STDERR "T::A::run: returned from $method on object $objid\n"
				if $ta_debug;
		}
		else {
			print STDERR "T::A::run: calling $method on object $objid\n"
				if $ta_debug;

			$obj = $objidmap[$objid][0];

			if ($wantary) {
				@resp = $obj->$method(@$req);
			}
			elsif (defined($wantary)) {
				$resp[0] = $obj->$method(@$req);
			}
			else {
				$obj->$method(@$req);
				$resp[0] = 1;
			}

			print STDERR "T::A::run: returned from $method on object $objid\n"
				if $ta_debug;
		}
#
#	if simplex, we're done
#
		@resp = (),	# just to GC any results
		next
			if $simplex;
#
#	now return results
#	check for errors
#
#		print STDERR "T::A::_run: async response has error \n"
#			unless (!$async) || defined($resp[0]);

		$resp[1] = $@,
		$respmeth->($id, @resp),
		next
			unless defined($resp[0]);
#
#	marshal the results per TAS's methods and return
#	(presumes that the TAC implements a complementary unmarshal)
#	NOTE: TAS marshal scans for new objects, and creates/registers
#	TACs for them
#
#	no marshalling of async responses, since the proxied closure will
#	marshall for us
#
		$resp = $tas->isa('Thread::Apartment::Server') ?
			$tas->marshalResults(@resp) :
			Thread::Apartment::Server->marshalResults(@resp)
			unless $async;

		$async ? $respmeth->(@resp) : $respmeth->($id, $resp);

		print STDERR ($async ?
			"T::A::_run: async response for $method on object $objid\n" :
			"T::A::run: responding for $method on object $objid\n")
			if $ta_debug;

	}	# end while not STOP

	$tqd->ignore();
	remove_thread();
#print STDERR "Returning from _run $tid...\n";
	return 1;
}
#/**
# Intermediate thread governor for re-entrant method calls.
# Used when an object has made a call on another proxied object,
# but needs to be able to service external calls to itself until
# the pending call completes.
# <p>
# Relies on the threads::shared nature of the thread pool
# map to recover the TQD for the thread in which the re-entrant call
# is made, <b>and</b> the <b>non-</b>threads::shared nature of the
# proxied object map to recover the root object.
#
# @static
# @param $pendingq	TQD of the proxied object with a pending call
# @param $call_id	request ID of the pending call
# @param $timeout	(optional) max. number of seconds to wait for an event
#
# @return		undef if timeout expires before the pending call returns,
#				or if a STOP, DESTROY on the root object, or evict()
#				call is received. 1 otherwise.
#*/
#
sub run_wait {
	my ($pendingq, $call_id, $timeout) = @_;
#
#	check that we're running in a TAS
#
	my $tid = threads->self()->tid();
	my $tqd;
	{
		lock(%apt_pool);
		$tqd = $apt_pool{$tid}
			if exists $apt_pool{$tid};
	}
#
#	we must not be a TAS, so just let TAC finish the job
#
#	print STDERR "We're not in a TAS in $tid!!!\n" and
	return 1 unless $tqd;

#	print STDERR "pendingq $pendingq call id $call_id our tqd $tqd\n";
#
#	get our root object, just in case
#
	my $tas = $objidmap[1][0];
	my $is_tas = $tas->isa('Thread::Apartment::Server');
#
#	NOTE: we must explicitly init these to undef due to threads::shared
#	bugs
#
	my ($req, $isa, $methods, $tac) = (undef, undef, undef, undef);
	my ($id, $method, $objid, $wantary, $obj) = (undef, undef, undef, undef, undef);
	my ($simplex, $async, $sig, $cid) = (undef, undef, undef, undef);
	my $ta_debug = undef;
#
#	create closure for responses; we'll switch between this
#	and any async respons closure
#
	my $tqdresp = sub { $tqd->respond(@_); };
	my $respmeth = undef;
	my @resp = ();
	my $resp = undef;
	my $expires = $timeout ? time() + $timeout : -1;
	my @ready = ();
	my $qwait;

	print STDERR "Thread::Apartment::run_wait in thread $tid\n"
		if $ta_debug;

	while (($expires == -1) || ($expires > time())) {
#
#	unlike _run(), we always handle our own wait, and use the
#	static version
#
		@ready = $timeout ?
			Thread::Queue::Duplex->wait_any_until($expires - time(), $tqd, [ $pendingq, $call_id ]) :
			Thread::Queue::Duplex->wait_any($tqd, [ $pendingq, $call_id ]);

#print STDERR "Thread::Apartment::run_wait got response to pending call in thread $tid\n" and
	return 1 if $pendingq->ready($call_id);
#
#	something untoward happened, so just exit
#
#		return undef
#			unless @ready;
#
#	check if the pending call has returned
#
#		return 1
#			if ($ready[0] eq $pendingq) ||
#				($ready[1] && ($ready[1] eq $pendingq));
#
#	else fetch a request from our queue
#
		$req = $tqd->dequeue_nb();

		next
			unless $req;

		if ($req->[1] eq 'STOP') {
#
#	retrieve our root object from the class variable
#	and give it a chance to rundown
#	lord only knows what sort of mayhem may result...
#
			print STDERR "Stopping $tid...\n"
				if $ta_debug;
			if ($tas) {
				$tas->evict() if $is_tas;
				print STDERR "Evicted $tid...\n"
					if $ta_debug;
				%objmap = ();
				@objidmap = ();
				@objrefcnts = ();
			}
			return undef;
		}
#
#	must be function call
#
		$id = shift @$req;

		$method = shift @$req;
		$tqd->respond($id, undef, "Apartment thread already occupied."),
		next
			if ($method eq 'ta_install');

		($objid, $wantary) =  (shift @$req, shift @$req);
#
#	if unknown object id, post error
#
		$tqd->respond($id, undef, "Unknown object (ID $objid)."),
		next
			unless $objidmap[$objid];
#
#	check for ref count (this is simplex)
#
		$objrefcnts[$objid]++,
		next
			if ($method eq '_add_ref');

		if ($method eq 'DESTROY') {
#
#	only destroy when refcount <= 0
#	also a simplex method
#	once again, this is likely to end badly if called on root thread
#
			print STDERR "DESTROYing\n"
				if $ta_debug;

			$objrefcnts[$objid]--;
			next if ($objrefcnts[$objid] > 0);

			$objrefcnts[$objid] = 0;
			delete $objmap{$objidmap[$objid][0]};
			$objidmap[$objid] = undef;

			free_thread(),
			return undef
				if ($objid == 1);
			next;
		}
#
#	permits the object to be forced out, wo/ regard to objrefcnts
#	yet again, this is likely to end badly
#
		if ($method eq 'evict') {
			$tas->evict() if $is_tas;
			%objmap = ();
			@objidmap = ();
			@objrefcnts = ();
			return undef;
		}
#
#	verify method
#
		$respmeth = $tqdresp;
		$methods = $objidmap[$objid][2];
		$tqd->respond($id, undef, "Unknown method $method."),
		next
			unless ($method eq 'ta_async') ||
				($method eq 'ta_invoke_closure') ||
				(exists $methods->{$method}) ||
				(exists $methods->{AUTOLOAD});

		$async = undef;
		@resp = ();
		$async = 1,
		$method = shift @$req
			if ($method eq 'ta_async');
#
#	unmarshal args
#
		$req = scalar @$req ?
			($is_tas ?
				$tas->unmarshal($req->[0]) :
				Thread::Apartment::Server->unmarshal($req->[0])) :
				[];

		if ($async) {
#
#	async method call
#
			$respmeth = $req->[0];
			print STDERR "T::A::run_wait: async call on $method with respmethod $respmeth\n"
				if $ta_debug;

			$tqd->respond($id, undef, "No closure provided for async call to $method."),
			next
				unless $respmeth && (ref $respmeth) && (ref $respmeth eq 'CODE');

			$respmeth->($id, undef, "Unknown method $method."),
			next
				unless (exists $methods->{$method}) ||
					(exists $methods->{AUTOLOAD});

			print STDERR "T::A::run_wait: valid async call on $method \n"
				if $ta_debug;
		}

		$simplex = (exists $methods->{$method}) ?
			($methods->{$method} & TA_SIMPLEX) : 0;

		if ($method eq 'ta_invoke_closure') {
#
#	closure call
#	verify the signature and the ID
#	then call the closure
#
			($sig, $cid) = (shift @$req, shift @$req);

			print STDERR "T::A::run_wait: calling closure $cid\n"
				if $ta_debug;

			$respmeth->($id, undef, "Stale closure call to thread $tid."),
			next
				unless $sig && $cid && ($sig == $closure_signature) && $closure_map{$cid};

			$simplex = $cid & TA_SIMPLEX;
			$cid &= 0xFFFFFFFC;

			print STDERR "Invoking closure w/ ", join(', ', @$req), "\n"
				if $ta_debug;

			if ($wantary) {
				@resp = $closure_map{$cid}->(@$req);
			}
			elsif (defined($wantary)) {
				$resp[0] = $closure_map{$cid}->(@$req);
			}
			else {
				$closure_map{$cid}->(@$req);
				$resp[0] = 1;
			}

			print STDERR "T::A::run_wait: returned from $method on object $objid\n"
				if $ta_debug;
		}
		else {
			print STDERR "T::A::run_wait: calling $method on object $objid\n"
				if $ta_debug;

			$obj = $objidmap[$objid][0];

			if ($wantary) {
				@resp = $obj->$method(@$req);
			}
			elsif (defined($wantary)) {
				$resp[0] = $obj->$method(@$req);
			}
			else {
				$obj->$method(@$req);
				$resp[0] = 1;
			}

			print STDERR "T::A::run_wait: returned from $method on object $objid\n"
				if $ta_debug;
		}
#
#	if simplex, we're done
#
		@resp = (),	# just to GC any results
		next
			if $simplex;
#
#	now return results
#	check for errors
#
#		print STDERR "T::A::run_wait: async response has error \n"
#			unless (!$async) || defined($resp[0]);

		$resp[1] = $@,
		$respmeth->($id, @resp),
		next
			unless defined($resp[0]);
#
#	marshal the results per TAS's methods and return
#	(presumes that the TAC implements a complementary unmarshal)
#	NOTE: TAS marshal scans for new objects, and creates/registers
#	TACs for them
#
#	no marshalling of async responses, since the proxied closure will
#	marshall for us
#
		$resp = $is_tas ?
			$tas->marshalResults(@resp) :
			Thread::Apartment::Server->marshalResults(@resp)
			unless $async;

		$async ? $respmeth->(@resp) : $respmeth->($id, $resp);

		print STDERR ($async ?
			"T::A::run_wait: async response for $method on object $objid\n" :
			"T::A::run_wait: responding for $method on object $objid\n")
			if $ta_debug;

	}	# end while not timeout

	print STDERR "T::A::run_wait expired in thread $tid...\n"
		if $ta_debug;

	$@ = 'run_wait() expired.';
	return undef;
}
#/**
# Stop and remove a thread
#
# @simplex
#
# @return		The object
#*/
sub stop {
	$_[0]->{_tqd}->enqueue_simplex('STOP');
	return $_[0];
}

#/**
# Class method to force single threading.
# Note that this behavior is irreversible.
#
# @static
# @return		nothing
#*/
sub set_single_threaded {
	$single_threaded = 1;
}

#/**
# Constructor. Factory method to create an instance of a class in an
# apartment thread. Produces a client proxy version as the return
# value. If <a href="#set_single_threaded">set_single_threaded()</a>
# has been called, then acts as a simple factory returning an instance
# of the class without installing in an apartment thread (useful for
# debugging purposes).
# <p>
# The caller may supply a <a href='http://search.cpan.org/perldoc?Thread::Queue::Duplex'>TQD</a>
# (<i> and hence, the apartment thread</i>)
# to be used as the communications channel between the apartment thread and client proxy instances.
# If not provided, either a thread and TQD are allocated from the existing pool
# (see <a href="#create_pool">create_pool()</a>), or, if no pooled threads
# are available, a new apartment thread and TQD are created, in which to install the created object.
# By supplying a TQD, the application can create a pool
# of threads and TQDs as early as possible with the least context neccesary, and then
# allocate them to apartment threads as needed. However, the
# <a href="#create_pool">create_pool()</a> method may be simpler for most applications.
# <p>
# Some default behaviors of the object(s) created/installed in the apartment thread may be
# directed using the AptReentrant, AptAutoload, or AptClosureCalls parameters
# <i>(see below)</i>.
#
# @static
# @param AptClass	class to be instantiated into an apartment thread
# @param AptMaxPending	(optional) passed throught to any TQD created for the thread
# @param AptQueue	(optional) <a href='Thread_Queue_Duplex.html'>Thread::Queue::Duplex</a>
#					to be used to communicate to the proxied object(s)
# @param AptTimeout (optional) timeout (in seconds) for responses to any non-simplex
#					proxied method call.
# @param AptParams  (optional) arrayref or hashref of parameters required for the proxied
#					class's constructor (if the object requires something other than
#					a hash for constructor parameters)
# @param AptReentrant  (optional) boolean indicating whether the objects in the apartment
#					thread should permit re-entrancy (i.e., handle inbound method calls)
#					while waiting for the results of outbound calls to other T::A objects;
#					default is undef (false).
# @param AptAutoload  (optional) boolean indicating whether the objects in the apartment
#					thread should permit <i>any</i> method call, rather than be restricted
#					to introspected, public methods. Default is undef (false).
# @param AptClosureCalls  (optional) scalar string, or arrayref of strings, indicating whether
#					proxied closures called from objects in the apartment
#					should be treated as 'Simplex', 'Urgent', or both.
#					Default is undef (duplex, non-urgent). Valid (case-insensitive) values
#					are 'Simplex', 'Urgent', or an arrayref containing either or both of
#					those values.
#
# @return		<a href='./Apartment/Client.html'>Thread::Apartment::Client</a> object
#*/
sub new {
	my ($class, %args) = @_;

	$@ = 'Thread::Apartment::new: no AptClass specified',
	return undef
		unless $args{AptClass};

	$class = $args{AptClass};
#
#	if we're single threaded, invoke constructor directly
#
	if ($single_threaded) {
		eval "require $class;";
		return undef if $@;
		delete $args{AptClass};
		return ${class}->new(%args);
	}

	my $cmdq = $args{AptQueue};
	$@ = 'Thread::Apartment::new: AptQueue is not a Thread::Queue::Duplex.',
	return undef
		if ($cmdq && (!$cmdq->isa('Thread::Queue::Duplex')));
#
#	validate/convert AptClosureCalls
#
	my $closures = $args{AptClosureCalls};
	if ($closures) {
		my $flags = 0;
		if (ref $closures) {
			$@ = 'Thread::Apartment::new: Invalid value for AptClosureCalls.',
			return undef
				unless (ref $closures eq 'ARRAY');
			foreach (@$closures) {
				$@ = 'Thread::Apartment::new: Invalid value for AptClosureCalls.',
				return undef
					unless ((lc $_ eq 'simplex') || (lc $_ eq 'urgent'));
				$flags |= (lc $_ eq 'simplex') ? TA_SIMPLEX : TA_URGENT;
			}
		}
		else {
			$@ = 'Thread::Apartment::new: Invalid value for AptClosureCalls.',
			return undef
				unless ((lc $_ eq 'simplex') || (lc $_ eq 'urgent'));
			$flags = (lc $_ eq 'simplex') ? TA_SIMPLEX : TA_URGENT;
		}
		$args{AptClosureCalls} = $flags;
	}
#
#	check for thread pool
#
	unless ($cmdq) {
		lock(%apt_pool);
#
#	since threads can be removed, we need to shift thru the list
#
		while (scalar @apt_tids) {
			my $tid = shift @apt_tids;
			$cmdq = $apt_pool{$tid},
			last
				if exists $apt_pool{$tid};
		}
		unless ($cmdq) {
			$cmdq = Thread::Queue::Duplex->new(
				ListenerRequired => 1,
				MaxPending => $args{AptMaxPending}),
			my $thread = threads->create(\&_run, $cmdq, 1);
			$apt_pool{$thread->tid()} = $cmdq;
		}
	}
	$cmdq->wait_for_listener();
#
#	if we have parameters, marshal them using the Common methods
#
	$args{AptParams} = (ref $args{AptParams} eq 'ARRAY') ?
		Thread::Apartment::Common->marshal(@{$args{AptParams}}) :
		Thread::Apartment::Common->marshal(%{$args{AptParams}})
		if ($args{AptParams} && ref $args{AptParams});

	my $params = Thread::Apartment::Common->marshal(%args);

	my $resp = $args{AptTimeout} ?
		$cmdq->enqueue_and_wait_until($args{AptTimeout}, 'ta_install', $params) :
		$cmdq->enqueue_and_wait('ta_install', $params);
#
#	returned value is the TAC for the object
#
	$@ = $resp->[1] unless defined($resp->[0]);
	return ($resp && $resp->[0]) ? $resp->[1] : undef;
}
#/**
# Install an object into the current thread. Similar to <a href="#new">new()</a>,
# except that the current thread is converted to an apartment thread, rather
# than creating a new thread (or allocating one from a thread pool). Useful for
# some legacy packages (e.g., <a href="http://search.cpan.org/perldoc?Tk">Perl/Tk</a>).
# <p>
# Whereas <a href='#new'>new()</a> creates a new instance of
# a class and installs it in <b>another</b> thread, which immediately begins
# monitoring the TQD channel for proxied method calls,
# <code>install()</code> creates a new instance of a class and installs it <b>in the
# current thread</b>, returning a <a href='./Apartment/Container.html'>TACo</a>,
# which references both the actual created object, <b>and</b> its TAC, so the installed object
# can be invoked within the current thread, yet still be distributed to other apartment
# threaded objects.
# <p>
# When an application needs to pass an <code>install()</code>ed object to other threads,
# it has 2 options:
# <ol>
# <li>The TAC objects to receive a reference to the installed object must be
# passed to the installed object constructor, and implement a known method which the
# installed object calls to supply its own TAC. The resulting tight coupling
# requires additional wrappers or subclassing for use by POPOs and legacy classes.
# <p>
# <li>Alternately, install() simply returns a TACo for the installed object
# (rather than its TAC, as for <a href='#new'>new()</a>), and the
# main flow of the application can distribute the TACo object to the other
# apartment threaded objects as needed. Once the installed object is fully distributed,
# and any other initialization is completed,
# the main flow simply calls <a href="#run">Thread::Apartment::run()</a> method,
# which assumes control of the current thread.
# The resulting loosely coupled component based architecture simplifies the assembly
# of apartment threaded objects, and is more easily supported by POPO's and legacy objects.
# </ol>
#
# @static
# @param AptClass	class to be instantiated into an apartment thread
# @param AptMaxPending	(optional) passed throught to any TQD created for the thread
# @param AptQueue	(optional) <a href='http://search.cpan.org/perldoc?Thread::Queue::Duplex'>Thread::Queue::Duplex</a>
#					to be used to communicate to the proxied object(s)
# @param AptTimeout (optional) timeout (in seconds) for responses to any non-simplex
#					proxied method call.
# @param AptParams  (optional) arrayref or hashref of parameters required for the proxied
#					class's constructor
# @param AptReentrant  (optional) boolean indicating whether the objects in the apartment
#					thread should permit re-entrancy (i.e., handle inbound method calls)
#					while waiting for the results of outbound calls to other T::A objects;
#					default is undef (false).
# @param AptAutoload  (optional) boolean indicating whether the objects in the apartment
#					thread should permit <i>any</i> method call, rather than be restricted
#					to introspected, public methods. Default is undef (false).
# @param AptClosureCalls  (optional) scalar string, or arrayref of strings, indicating whether
#					proxied closures called from objects in the apartment thread
#					should be treated as 'Simplex', 'Urgent', or both.
#					Default is undef (duplex, non-urgent). Valid (case-insensitive) values
#					are 'Simplex', 'Urgent', or an arrayref containing either or both of
#					those values.
#
# @return		nothing
# @returnlist	nothing
#*/
sub install {
	my ($class, %args) = @_;

	$@ = 'Thread::Apartment::install: no AptClass specified',
	return undef
		unless $args{AptClass};

	$class = $args{AptClass};
#
#	if we're single threaded, invoke constructor directly
#
	delete $args{AptClass},
	return ${class}->new(%args)
		if $single_threaded;

	my $cmdq = $args{AptQueue};
	$@ = 'Thread::Apartment::install: AptQueue is not a Thread::Queue::Duplex.',
	return undef
		if ($cmdq && (!$cmdq->isa('Thread::Queue::Duplex')));
#
#	validate/convert AptClosureCalls
#
	my $closures = $args{AptClosureCalls};
	if ($closures) {
		my $flags = 0;
		if (ref $closures) {
			$@ = 'Thread::Apartment::new: Invalid value for AptClosureCalls.',
			return undef
				unless (ref $closures eq 'ARRAY');
			foreach (@$closures) {
				$@ = 'Thread::Apartment::new: Invalid value for AptClosureCalls.',
				return undef
					unless ((lc $_ eq 'simplex') || (lc $_ eq 'urgent'));
				$flags |= (lc $_ eq 'simplex') ? TA_SIMPLEX : TA_URGENT;
			}
		}
		else {
			$@ = 'Thread::Apartment::new: Invalid value for AptClosureCalls.',
			return undef
				unless ((lc $_ eq 'simplex') || (lc $_ eq 'urgent'));
			$flags = (lc $_ eq 'simplex') ? TA_SIMPLEX : TA_URGENT;
		}
		$args{AptClosureCalls} = $flags;
	}
#
#	create queue if none provided
#
	$args{AptQueue} = $cmdq = Thread::Queue::Duplex->new(
		ListenerRequired => 1,
		MaxPending => $args{AptMaxPending})
		unless $cmdq;
#
#	map us into the thread pool
#
	{
		lock(%apt_pool);
		$apt_pool{threads->self()->tid()} = $cmdq;
	}
#
#	we're the listener...
#
	$cmdq->listen();

	return _install(%args);
}
#/**
# Create a thread pool for apartment threaded objects.
# Useful for limiting the amount of context cloned into
# apartment threads. By creating a pool of threads before
# <code>require</code>'ing any modules, the threads will
# have minimal context before the apartment thread objects
# are installed into them.
#
# @static
# @param AptMaxPending	used to set MaxPending on created <a href='http://search.cpan.org/perldoc?Thread::Queue::Duplex'>TQD's</a>.
# @param AptPoolSize    the number of threads to create in the pool
#
# @return		number of threads created
#*/
sub create_pool {
	my ($class, %args) = @_;
#
#	if we're single threaded, invoke constructor directly
#
	return 1 if $single_threaded;

	$@ = 'No AptPoolSize specified.',
	return undef
		unless $args{AptPoolSize} && ($args{AptPoolSize} > 0);

	lock(%apt_pool);
	foreach (1..$args{AptPoolSize}) {
		my $cmdq = Thread::Queue::Duplex->new(
			ListenerRequired => 1,
			MaxPending => $args{AptMaxPending});
		my $thread = threads->create(\&_run, $cmdq, 1);
		$cmdq->wait_for_listener();

		$apt_pool{$thread->tid()} = $cmdq;
		push @apt_tids, $thread->tid();
	}
	return $args{AptPoolSize};
}

#/**
# Stop and remove all threads
# @static
# @return		1
#*/
sub destroy_pool {
	my @tids = ();
	my $tid;
	my $q;
	{
		lock(%apt_pool);
		$q->enqueue_simplex('STOP'),
		push @tids, $tid
			while (($tid, $q) = each %apt_pool);
	}
#	print STDERR "Waiting for the following threads: ", join(', ', @tids), "\n";
	foreach (@tids) {
		my $thrd = threads->object($_);
		$thrd->join()
			if $thrd;
	}
#
#	when cleaned up, clear the free pool
#
	lock(%apt_pool);
	@apt_tids = ();
	return 1;
}
#/**
# Return a thread to the pool. Called within the
# thread to be returned.
#
# @static
#
# @return		1
#*/
sub free_thread {
	my $tid = threads->self()->tid();

	lock(%apt_pool);
	push @apt_tids, $tid
		if $apt_pool{$tid};
	return 1;
}
#/**
# Remove a thread from the pool. Called within the
# thread to be returned.
#
# @static
#
# @return		1
#*/
sub remove_thread {
	my $tid = threads->self()->tid();

	lock(%apt_pool);
	delete $apt_pool{$tid};
	return 1;
}
#
#	install for install()
#
sub _install {
	my %args = @_;

	$@ = 'Apartment thread already occupied.',
	return undef
		if $objidmap[1];
#
#	verify the class
#
	my $class = delete $args{AptClass};
	eval "require $class;";
	$@ = "Cannot require $class: $@",
	return undef
		if $@;
#
#	set re-entrancy
#
	$is_reentrant = $args{AptReentrant};
	$autoload_all = $args{AptAutoload};
	$closure_calls = $args{AptClosureCalls};
#
#	use local flags to optimize
#
	my $is_tas = ${class}->isa('Thread::Apartment::Server');
#
#	get any add'l params
#
	my $req = $args{AptParams};
#
#	set up thread-global variables
#
	my $timeout = delete $args{AptTimeout};
	my $tqd = $args{AptQueue};
#
#	install elements into TAS class variables
#
	Thread::Apartment::Server::init_tas($tqd, $timeout, 1);
#
#	get class's ISA and can() lists, and a TAC
#
	my ($isa, $methods, $tac) = $is_tas ?
		${class}->introspect(1) :
		Thread::Apartment::Server::introspect($class, 1);
#
#	create an instance:
#		since a TAS may need its TAC to pass to other
#		T::A objects, we need to construct it differently
#		so it generates its own TAC
#		Note that POPOs won't knowingly create T::A objects
#
	my $tas = $is_tas?
		($args{AptParams} ?
			${class}->new($tac, @$req) :
			${class}->new(AptTAC => $tac, %args)) :
		($args{AptParams} ?
			${class}->new(@$req) :
			${class}->new(%args));

	$@ ||= 'Unknown error',
	$@ = "Unable to construct a $class object: $@.",
	return undef
		unless $tas;
#
#	add object to hierarchy map
#
	$objmap{$tas} = $tac;
	$objidmap[1] = [ $tas, $isa, $methods ];
	$objrefcnts[1] = 1;

	print STDERR "_install: $class installed in thread ", threads->self()->tid(), "\n"
		if $ta_debug;
#
#	set up thread-global proxied closure members;
#	note that closure TAC is TACo
#
	%closure_map = ();
	$next_closure_id = 1;
	$closure_signature = time();
#	$closure_tac = Thread::Apartment::Container->new(-1,
#		Thread::Apartment::Client->new(
#			'Thread::Apartment::Closure', $tqd, -1, [ 'Thread::Apartment::Closure' ],
#				{ 'ta_invoke_closure' => 0 }, undef, threads->self()->tid()));

	$closure_tac = Thread::Apartment::Client->new(
			'Thread::Apartment::Closure', $tqd, -1, [ 'Thread::Apartment::Closure' ],
				{ 'ta_invoke_closure' => 0 }, undef, threads->self()->tid());

	return $tac;
}
#
#	install for new()
#
sub _ta_install {
	my ($tqd, $req, $id) = @_;
#
#	must be installing an object; note we shouldn't ever
#	get this if running an installed object
#
	my $tid = threads->self()->tid();
	$tqd->respond($id, undef, 'Apartment thread already occupied.'),
#	print STDERR "Apartment thread $tid already occupied by ", (ref $objidmap[1][0]), ".\n" and
	return undef
		if $objidmap[1];
#
#	for factory purposes, we'll use Common marshal/unmarshal
#
	$req = Thread::Apartment::Common->unmarshal(@$req);
	my %args = @$req;
	my $class = delete $args{AptClass};
	eval "require $class;";
	$tqd->respond($id, undef, "Cannot require $class: $@"),
	return undef
		if $@;
#
#	use local flags to optimize
#
	my $is_tas = ${class}->isa('Thread::Apartment::Server');
	my $is_evt = ${class}->isa('Thread::Apartment::EventServer');
	my $is_mux = ${class}->isa('Thread::Apartment::MuxServer');
#
#	check if we're to be re-entrant
#
	$is_reentrant = delete $args{AptReentrant};
	$autoload_all = delete $args{AptAutoload};
	$closure_calls = delete $args{AptClosureCalls};
#
#	if we've got any add'l params, unmarshal them
#
	$req = Thread::Apartment::Common->unmarshal($args{AptParams})
		if $args{AptParams};
#
#	in future, check if a TAC pool is requested
#
#	if ($args{AptClientPool}) {
#	}
#
#	return the class's ISA and can() lists,
#	and a TAC
#
	my $timeout = delete $args{AptTimeout};
	Thread::Apartment::Server::init_tas($tqd, $timeout, undef);

	my ($isa, $methods, $tac) = $is_tas ?
		${class}->introspect(1) :
		Thread::Apartment::Server::introspect($class, 1);
#
#	create an instance:
#		since a TAS may need its TAC to pass to other
#		T::A objects, we need to construct it differently
#		so it generates its own TAC
#		Note that POPOs won't knowingly create T::A objects
#
	my $tas = $is_tas ?
		($args{AptParams} ?
			${class}->new($tac, @$req) :
			${class}->new(AptTAC => $tac, %args)) :
		($args{AptParams} ?
			${class}->new(@$req) :
			${class}->new(%args));

	$@ ||= 'Unknown error',
	$tqd->respond($id, undef, "Unable to construct a $class object: $@."),
	return undef
		unless $tas;
#
#	add object to hierarchy map
#
	$objmap{$tas} = $tac;
	$objidmap[1] = [ $tas, $isa, $methods ];
	$objrefcnts[1] = 1;
#
#	no need to marshal, $tac is a TQQ... (and should be shared...)
#
	print STDERR "run: $class installed in thread $tid\n"
		if $ta_debug;
#
#	set up thread-global proxied closure members
#
	%closure_map = ();
	$next_closure_id = 1;
	$closure_signature = time();
	$closure_tac = Thread::Apartment::Client->new(
		'Thread::Apartment::Closure', $tqd, -1, [ 'Thread::Apartment::Closure' ],
			{ 'invoke_closure' => 0 }, undef, threads->self()->tid());

	$tqd->respond($id, 1, $tac);

	return ($tas, $class, $is_tas, $is_evt, $is_mux);
}

###########################################################
#
#	CLOSURE MGMT METHODS
#
###########################################################
#/**
# Registers a closure with the apartment thread before
# it is passed to another thread as either a parameter, or
# as a return value. Unlike a closure, the returned
# <a href='./Apartment/Closure.html'>Thread::Apartment::Closure</a> <i>aka</i>
# <b>TACl</b> object is suitable for marshalling across threads.
# <p>
# When another thread receives the TACl it will unmarshall it as
# a local closure that invokes a special method on the originating
# thread's TAC, which in turn will cause the originating thread
# to invoke the locally registered closure.
#
# @static
# @param $closure	closure to be registered
# @param $flags		bitmask of flags indicating simplex and/or urgent behavior
#					(see <a href="./Apartment/Common.html#exports">Thread::Apartment::Common</a>
#					for bitmask values)
#
# @return		<a href='./Apartment/Closure.html'>Thread::Apartment::Closure</a> object.
#*/
sub register_closure {
	my $id = ($next_closure_id++) << 2;

	$closure_map{$id} = $_[0];
	$id |= $_[1] if $_[1];
	return Thread::Apartment::Closure->new($closure_signature, $id, $closure_tac);
}

#/**
# Return the closure for a specified closure ID.
#
# @static
#
# @param $sig	closure signature (used to reject stale closures when an appartment thread is recycled)
# @param $id	closure ID
#
# @return		if the signature and ID match, the closure; undef otherwise
#*/
sub get_closure {
	my ($sig, $id) = @_;
	return ($sig && $id && ($sig == $closure_signature)) ?
		$closure_map{$id} : undef;
}
###########################################################
#
#	OBJECT HIERARCHY MGMT METHODS
#	*candidate for a separate class ?*
#
###########################################################
#/**
# Destroy a mapped object. Decrements the object's
# external reference count. If the reference count drops to
# zero, removes the object from the object map.
#
# @static
# @param $objecid	object ID
#
# @return		1 if the ID is for the root object; undef otherwise
#*/
sub destroy_object {
	my $objid = shift;
	$objrefcnts[$objid]--;
	return 1
		unless ($objrefcnts[$objid] <= 0);

	$objrefcnts[$objid] = 0;
	delete $objmap{$objidmap[$objid][0]};
	$objidmap[$objid] = undef;
	return ($objid == 1) ? undef :1;
}

#/**
# Increment the reference count for an object.
#
# @param $objid	object ID
#
# @return		the object's new reference count
#*/
sub add_object_reference { $objrefcnts[$_[0]]++; }

#/**
# Evict the current resident objects from the apartment thread.
#
# @static
#
# @return		undef
#*/
sub evict_objects {
	%objmap = ();
	@objidmap = ();
	@objrefcnts = ();
	return undef;
}

#/**
# Return the hash map of method names to simplex/urgent flags
# for a specified object ID.
#
# @static
# @param $objid	object ID
#
# @return		hashref mapping method names to behavior flags
#*/
sub get_object_methods { return $objidmap[$_[0]][2]; }

#/**
# Return the object for a specified object ID.
#
# @static
# @param $objid	object ID
#
# @return		the object
#*/
sub get_object_by_id {
	return $objidmap[$_[0]] ? $objidmap[$_[0]][0] : undef;
}

#/**
# Return the TAC for a mapped object
#
# @static
# @param $object	the object (<b>not</b> the object ID!)
#
# @return		<a href='./Apartment/Client.html'>TAC</a> for the object
#*/
sub get_tac_for_object { return $objmap{$_[0]}; }

#/**
# Add an object to the object map
#
# @static
# @param $objid	object ID
# @param $tac		the TAC (or possibly TACo) for the object
# @param $result	the object to map
# @param $isa		the class hierarchy of the object
# @param $methods	the method name map for the object
#
# @return		<a href='./Apartment/Client.html'>TAC</a> for the object
#*/
sub add_mapped_object {
	my ($objid, $tac, $result, $isa, $methods) = @_;
	$objmap{$result} = $tac;
	$objidmap[$objid] = [ $result, $isa, $methods ];
	$objrefcnts[$objid] = 1;
	return $tac;
}

#/**
# Allocate a unique object ID. ID's are indexes into the
# object ID map array; a scan of the array is performed
# to locate the first free entry. If no free entries remain,
# the array is extended.
#
# @static
#
# @return		the object ID
#*/
sub alloc_mapped_object {
	my $objid = 2;
	$objid++
		while ($objid < scalar @objidmap) && $objidmap[$objid];
	return $objid;
}

#/**
# Get current re-entrancy setting.
#
# @static
#
# @return		re-entrancy flag value
#*/
sub get_reentrancy {
	return $is_reentrant;
}

#/**
# Set re-entrancy flag.
#
# @static
# @param $reentrancy the boolean value for the flag
#
# @return		previous re-entrancy flag value
#*/
sub set_reentrancy {
	my $old = $is_reentrant;
	$is_reentrant = $_[0];
	return $old
}

#/**
# Get current autoload setting.
#
# @static
#
# @return		autoload flag value
#*/
sub get_autoload {
	return $autoload_all;
}

#/**
# Set autoload flag.
#
# @static
# @param $autoload the boolean value for the flag
#
# @return		previous $autoload_all flag value
#*/
sub set_autoload {
	my $old = $autoload_all;
	$autoload_all = $_[0];
	return $old
}

#/**
# Get current closure call behaviors
#
# @static
#
# @return		closure call behaviors bitmask
#*/
sub get_closure_behavior {
	return $closure_calls || 0;
}

#/**
# Set closure call behaviors
#
# @static
# @param $behavior the bitmask of TA_SIMPLEX and/or TA_URGENT values
#
# @return		previous $closure_calls bitmask value
#*/
sub set_closure_behavior {
	my $old = $closure_calls || 0;
	$closure_calls = $_[0];
	return $old
}
###########################################################
#
#	ASYNC METHOD CALL METHODS
#
###########################################################
#/**
# Starts an asynchronous method or closure call.
# Sets the <i>thread-local</i> TAC async flag class variable.
# When the next proxied TAC method/closure call
# is invoked within the thread, the TAC will
# <ol>
# <li>add itself and the pending method/closure request identifier,
# to T::A's <i>thread-local</i> async TAC map class variable
# <li>clear the TAC async flag
# <li>return the TAC for the method/closure request.
# </ol>
# <p>
# For async method calls, the parameter is the TAC object; for proxied
# closure calls, the closure is specified. The returned value is the provided
# TAC or closure, in order to support the following syntax<br>
# <pre>
# my $tac = Thread::Apartment::start($tac)->someMethod(@params);
# my $tac = Thread::Apartment::start($closure)->(@params);
# </pre>
#
# @static
# @param $tac_or_closure	TAC or closure to be invoked asynchronously
#
# @return		the input TAC or closure parameter
#*/
sub start {
	Thread::Apartment::Client::set_async($_[0]);
	return $_[0];
}

#/**
# Map a TAC to a async method/closure request ID.
# Called by a TAC when the async request has been initiated.
#
# @static
# @param $key	TAC or closure being mapped
# @param $tac	TAC to map to async request id
# @param $id	request id
#
# @return		The TAC object.
#*/
sub map_async_request_id {
# if called as a class method
	shift unless
		(ref $_[0] && (! $_[0]->isa('Thread::Apartment')));
	my $key = shift;
	$tacmap{$key} = [ @_ ];
	return $key;
}

#/**
# Wait for completion of pending method/closure requests
# on <b>all</b> of the specified TACs/closures. If no TACs/closures are
# specified, waits for all TACs/closures currently in the async TAC map.
# Returns when the pending requests are completed, using the
# Thread::Queue::Duplex <code>wait_all()</code> class method.
# <p>
# Note that the application is responsible for calling
# <a href='classdocs/Thread/Apartment/Client.html#get_pending_results'>
# get_pending_results()</a> on the appropriate TACs to get any method/closure
# return values.
# <p>
# For closures, the input parameter is the closure
#
# @static
# @param @tac_list	(optional) list of TACs or closures to wait for;
#					default is all pending TACs
#
# @return		list of TACs that have rendezvoused; note that closures
#				will be replaced by an appropriate TAC
#*/
sub rendezvous {
# if called as a class method
	shift unless
		(ref $_[0] && ((ref $_[0] eq 'CODE') || (! $_[0]->isa('Thread::Apartment'))));

	my @pending = ();
	if (scalar @_) {
		foreach (@_) {
			push(@pending, $tacmap{$_})
				if exists $tacmap{$_};
		}
	}
	else {
		@pending = values %tacmap;
	}
#	print STDERR "Pending is ", scalar @pending, "\n";
	return (scalar @pending) ?
		Thread::Queue::Duplex->wait_all(@pending) : ();
}

#/**
# Wait for completion of pending method/closure requests
# on <b>any</b> of the specified TACs. If no TACs are
# specified, waits for any TACs currently in the async TAC map.
# Returns when the pending requests are completed, using the
# Thread::Queue::Duplex <code>wait_any()</code> class method.
# <p>
# Note that the application is responsible for calling
# <a href='classdocs/Thread/Apartment/Client.html#get_pending_results'>
# get_pending_results()</a> on the appropriate TACs to get any method/closure
# return values.
# <p>
# For closures, the input TAC parameter is the TAC returned by <A href='#start'>start()</a>.
#
# @static
# @param @tac_list	(optional) list of TACs to wait for;
#					default is all pending TACs
#
# @return		list of TACs that have rendezvoused
#*/
sub rendezvous_any {
# if called as a class method
	shift unless
		(ref $_[0] && ((ref $_[0] eq 'CODE') || (! $_[0]->isa('Thread::Apartment'))));
	my @pending = ();
	if (scalar @_) {
		foreach (@_) {
			push(@pending, $tacmap{$_})
				if exists $tacmap{$_};
		}
	}
	else {
		@pending = values %tacmap;
	}
	return (scalar @pending) ?
		Thread::Queue::Duplex->wait_any(@pending) : ();
}

#/**
# Wait upto to $timeout seconds for completion of pending
# method/closure requests on <b>all</b> of the specified TACs.
# If no TACs are specified, waits for any TACs currently in the async TAC map.
# Returns when the pending requests are completed, or the timeout has expired,
# using the Thread::Queue::Duplex <code>wait_all_until()</code> class method.
# <p>
# Note that the application is responsible for calling
# <a href='classdocs/Thread/Apartment/Client.html#get_pending_results'>
# get_pending_results()</a> on the appropriate TACs to get any method/closure
# return values.
# <p>
# For closures, the input TAC parameter is the TAC returned by <A href='#start'>start()</a>.
#
# @static
# @param $timeout	timeout in seconds to wait for completion
# @param @tac_list	(optional) list of TACs to wait for;
#					default is all pending TACs
#
# @return		undef if $timeout expired; otherwise, the list of TACs that have rendezvoused
#*/
sub rendezvous_until {
# if called as a class method
	shift if (ref $_[0] || ($_[0] eq 'Thread::Apartment'));
	my $timeout = shift;

	my @pending = ();
	if (scalar @_) {
		foreach (@_) {
			push(@pending, $tacmap{$_})
				if exists $tacmap{$_};
		}
	}
	else {
		@pending = values %tacmap;
	}
	return (scalar @pending) ?
		Thread::Queue::Duplex->wait_all_until($timeout, @pending) : ();
}

#/**
# Wait upto to $timeout seconds for completion of pending
# method/closure requests on <b>any</b> of the specified TACs.
# If no TACs are specified, waits for any TACs currently in the async TAC map.
# Returns when the pending requests are completed, or the timeout has expired,
# using the Thread::Queue::Duplex <code>wait_all_until()</code> class method.
# <p>
# Note that the application is responsible for calling
# <a href='classdocs/Thread/Apartment/Client.html#get_pending_results'>
# get_pending_results()</a> on the appropriate TACs to get any method/closure
# return values.
# <p>
# For closures, the input TAC parameter is the TAC returned by <A href='#start'>start()</a>.
#
# @static
# @param $timeout	timeout in seconds to wait for completion
# @param @tac_list	(optional) list of TACs to wait for;
#					default is all pending TACs
#
# @return		undef if $timeout expired; otherwise, the list of TACs that
#				have rendezvoused
#*/
sub rendezvous_any_until {
# if called as a class method
	shift if (ref $_[0] || ($_[0] eq 'Thread::Apartment'));
	my $timeout = shift;

	my @pending = ();
	if (scalar @_) {
		foreach (@_) {
			push(@pending, $tacmap{$_})
				if exists $tacmap{$_};
		}
	}
	else {
		@pending = values %tacmap;
	}
	return (scalar @pending) ?
		Thread::Queue::Duplex->wait_any_until($timeout, @pending) : ();
}

#/**
# Return the pending request ID for the input TAC.
# Called from TAC::<a href='./Apartment/Client.html#get_pending_results'>get_pending_results()</a>
# to recover the pending request ID. Causes the TAC and request ID to be removed
# from the TAC map.
#
# @static
# @param $tac	TAC for which the pending request ID is to be returned
#
# @return		undef if the input TAC has no pending requests;
#				otherwise, the request id
#*/
sub get_pending_request {
# if called as a class method
	shift unless
		(ref $_[0] && ((ref $_[0] eq 'CODE') || (! $_[0]->isa('Thread::Apartment'))));
	if (exists $tacmap{$_[0]}) {
		my $t = delete $tacmap{$_[0]};
		return $t->[1];
	}
#
#	must be a closure, go look for its TAC
#
	my ($tac, $id);
	while (($tac, $id) = each %tacmap) {
		delete $tacmap{$tac},
		return $id->[1]
			if ($id->[0] eq $_[0]);
	}
	return undef;
}

sub set_ta_debug {
	$ta_debug = 1;
}

1;

__END__