/usr/local/CPAN/Thread-Apartment/Thread/Apartment/MuxServer.pm


#/**
# Abstract base class for Multiplexing server objects.
# Extends <a href='./Server.html'>Thread::Apartment::Server</a>
# to provide a multiplexing server which explicitly polls its
# TQD for incoming method calls (i.e., inverts the control scheme
# of T::A::Server). (Useful for object which implement their
# own control loop, e.g., Perl/Tk).
# <p>
# Uses <a href='./Server.html'>T::A::Server's</a> constructor.
# <p>
# <a href='../Apartment.html#run'>T::A::run</a> calls the object's run() method,
# which is responsible for testing the TQD at regular intervals
# <p>
# Licensed under the Academic Free License version 2.1, as specified in the
# License.txt file included in this software package, or at
# <a href="http://www.opensource.org/licenses/afl-2.1.php">OpenSource.org</a>.
#
# @author D. Arnold
# @since 2005-12-01
# @self	$self
#*/
package Thread::Apartment::MuxServer;

use Thread::Apartment;
use Thread::Apartment::Server;
use Thread::Apartment::Server qw($tqd $timeout $installed);
use Thread::Apartment::Common qw(:ta_method_flags);

use base qw(Thread::Apartment::Server);

use strict;
use warnings;

our $VERSION = '0.50';

#/**
# Thread governor for MuxServer subclasses.
# Pure virtual function to be implemented by concrete MuxServer
# classes. Interleaves method request handling with its own
# class-specific control loop.
# <p>
# Returns when either the class has determined it is completed,
# or when a STOP command is received.
#
# @return 1 if the object is voluntarily vacating the thread;
#		undef if the thread has been STOP'ed.
#
#*/
sub run {
	my $self = shift;
}

#/**
# Polls the TQD and handles any received method/closure
# requests. Mimics the behavior of Thread::Apartment's
# internal thread governor.
#
# @return		1
#*/
sub handle_method_requests {
	my $self = shift;

	my $tid = threads->self()->tid();
	print STDERR "Mux Polling TQD\n"
		if $self->{_debug};
#
#	copied from T::A::_run, but different enough that we
#	can't refactor to a common method
#
	my ($req, $id, $method, $objid, $wantary, $obj) =
		(undef, undef, undef, undef, undef, undef);
	my ($simplex, $async, $sig, $cid, $closure) = (undef, undef, undef, undef, undef);
	my $methods = undef;
#
#	create closure for responses; we'll switch between this
#	and any async respons closure
#
	my $tqdresp = sub { $tqd->respond(@_); };
	my $respmeth = undef;
	my @resp = ();
	my $resp = undef;

	while ($req = $tqd->dequeue_nb()) {

#print STDERR ref $self, " got request in thread $tid\n";

#		print STDERR "Stopping...\n" and
		return undef
			if ($req->[1] eq 'STOP');
#
#	must be function call
#
		($id, $method) = (shift @$req, shift @$req);

#print STDERR "MuxServer: $method\n";

		($objid, $wantary) = ($method eq 'ta_install') ?
			(undef, undef) : (shift @$req, shift @$req);
#
#	if unknown object id, post error
#
print STDERR "Unknown object for method $method\n"
	unless $objid;
		$obj = Thread::Apartment::get_object_by_id($objid);
		$tqd->respond($id, undef, "Unknown object (ID $objid)."),
		next
			unless $obj;
#
#	check for ref count (this is simplex)
#
		Thread::Apartment::add_object_reference($objid),
		next
			if ($method eq '_add_ref');

		if ($method eq 'DESTROY') {
#
#	only destroy when refcount <= 0
#	also a simplex method
#
			return undef
				unless Thread::Apartment::destroy_object($objid);
			next;
		}
#
#	permits the object to be forced out, wo/ regard to objrefcnts
#
		$self->evict(),
		return Thread::Apartment::evict_objects()
			if ($method eq 'evict');
#
#	verify method
#
		$respmeth = $tqdresp;
		$methods = Thread::Apartment::get_object_methods($objid);
		$tqd->respond($id, undef, "Unknown method $method."),
		next
			unless ($method eq 'ta_async') ||
				($method eq 'ta_invoke_closure') ||
				(exists $methods->{$method}) ||
				(exists $methods->{AUTOLOAD});

		$async = undef;
		@resp = ();
		$async = 1,
		$method = shift @$req
			if ($method eq 'ta_async');
#print STDERR "MuxServer real method is $method\n"
#	if $async;
#
#	unmarshal args
#
		$req = scalar @$req ? $self->unmarshal($req->[0]) : [];

		if ($async) {
#
#	async method call
#
#print STDERR "MuxServer:: handling async\n";

			$respmeth = $req->[0];
			$tqd->respond($id, undef, "No closure provided for async call to $method."),
			next
				unless $respmeth && (ref $respmeth) && (ref $respmeth eq 'CODE');

			$respmeth->($id, undef, "Unknown method $method."),
			next
				unless (exists $methods->{$method}) ||
					(exists $methods->{AUTOLOAD});
		}

		$simplex = (exists $methods->{$method}) ?
			($methods->{$method} & TA_SIMPLEX) : 0;

		if ($method eq 'ta_invoke_closure') {
#
#	closure call
#	verify the signature and the ID
#	then call the closure
#
#print STDERR "MuxServer:: handling closure\n";

			($sig, $cid) = (shift @$req, shift @$req);

			print STDERR "T::A::Mux::handle_method_requests: calling closure $cid\n"
				if $self->{_debug};

			$closure = Thread::Apartment::get_closure($sig, $cid);
			$respmeth->($id, undef, "Stale closure call to thread $tid."),
			next
				unless $closure;

			$simplex = $cid & TA_SIMPLEX;

			if ($wantary) {
				@resp = $closure->(@$req);
			}
			elsif (defined($wantary)) {
				$resp[0] = $closure->(@$req);
			}
			else {
				$closure->(@$req);
				$resp[0] = 1;
			}

			print STDERR "T::A::Mux::handle_method_requests: returned from $method on object $objid\n"
				if $self->{_debug};
		}
		else {
			print STDERR "T::A::Mux::handle_method_requests: calling $method on object $objid\n"
				if $self->{_debug};

			if ($wantary) {
				@resp = $obj->$method(@$req);
			}
			elsif (defined($wantary)) {
				$resp[0] = $obj->$method(@$req);
			}
			else {
				$obj->$method(@$req);
				$resp[0] = 1;
			}

			print STDERR "T::A::Mux::handle_method_requests: returned from $method on object $objid\n"
				if $self->{_debug};
		}
#
#	if simplex, we're done
#
		@resp = (),	# just to GC any results
		next
			if $simplex;
#
#	now return results
#	check for errors
#
		$resp[1] = $@,
		$respmeth->($id, @resp),
		next
			unless defined($resp[0]);
#
#	marshal the results per TAS's methods and return
#	(presumes that the TAC implements a complementary unmarshal)
#	NOTE: TAS marshal scans for new objects, and creates/registers
#	TACs for them
#
		$resp = $self->marshalResults(@resp);

		$async ? $respmeth->(@resp) : $respmeth->($id, $resp);

		print STDERR ($async ?
			"T::A::Mux::handle_method_requests: async response for $method on object $objid\n" :
			"T::A::Mux::handle_method_requests: responding for $method on object $objid\n")
			if $self->{_debug};
	}
	return 1;
}

1;