| POE-Component-IKC documentation | Contained in the POE-Component-IKC distribution. |
POE::Component::IKC::Responder - POE IKC state handler
use POE;
use POE::Component::IKC::Responder;
create_ikc_responder();
...
$kernel->post('IKC', 'post', $to_state, $state);
$ikc->publish('my_name', [qw(state1 state2 state3)]);
This module implements POE IKC state handling. The responder handles posting states to foreign kernels and calling states in the local kernel at the request of foreign kernels.
There are 2 interfaces to the responder. Either by sending states to the 'IKC' session or the object interface. While the latter is faster, the better behaved, because POE is a cooperative system.
spawnPOE::Component::IKC::Responder->spawn();
This function creates the Responder session and object. Normally, POE::Component::IKC::Client or POE::Component::IKC::Server does this for you. But in some applications you want to make sure that the Responder is up and running before then.
postSends an state request to a foreign kernel. Returns logical true if the state was sent and logical false if it was unable to send the request to the foreign kernel. This does not mean that the foreign kernel was able to post the state, however. Parameters are as follows :
foreign_stateSpecifier for the foreign state. See POE::Component::IKC::Specifier.
parametersA reference to anything you want the foreign state to get as ARG0. If you want to specify several parameters, use an array ref and have the foreign state dereference it.
$kernel->post('IKC', 'post',
{kernel=>'Syslog', session=>'logger', state=>'log'},
[$faculty, $priority, $message];
or
$ikc->post('poe://Syslog/logger/log', [$faculty, $priority, $message]);
This logs an state with a hypothetical logger.
See the PROXY SENDER below.
callThis is identical to post, except it has a 3rd parameter that describes
what state should receive the return value from the foreign kernel.
$kernel->post('IKC', 'call',
'poe://Pulse/timeserver/time', '',
'poe:get_time');
or
$ikc->call({kernel=>'Pulse', session=>'timeserver', state=>'time'},
'', 'poe://me/get_time');
This asks the foreign kernel 'Pulse' for the time. 'get_time' state in the current session is posted with whatever the foreign state returned.
You do not have to publish callback messages, because they are temporarily published. How temporary? They can be posted from a remote kernel ONCE only. This, of course, is a problem because someone else could get in a post before the callback. Such is life.
foreign_stateIdentical to the post foreign_state parameter.
parametersIdentical to the post parameters parameter.
rsvpEvent identification for the callback. That is, this state is called with
the return value of the foreign state. Can be a foreign_state specifier
or simply the name of an state in the current session.
$kernel->call('IKC', 'post',
{kernel=>'e-comm', session=>'CC', state=>'check'},
{CC=>$cc, expiry=>$expiry}, folder=>$holder},
'is_valid');
# or
$ikc->call('poe://e-comm/CC/check',
{CC=>$cc, expiry=>$expiry}, folder=>$holder},
'poe://me/is_valid');
This asks the e-comm server to check if a credit card number is "well formed". Yes, this would probably be massive overkill.
The rsvp state does not need to be published. IKC keeps track of the
rsvp state and will allow the foreign kernel to post to it.
See the PROXY SENDER below.
defaultSets the default foreign kernel. You must be connected to the foreign kernel first.
Unique parameter is the name of the foreign kernel kernel.
Returns logical true on success.
registerRegisters foreign kernel names with the responder. This is done during the
negociation phase of IKC and is normaly handled by IKC::Channel. Will
define the default kernel if no previous default kernel exists.
First parameter is either a single kernel name. Second optional parameter is an array ref of kernel aliases to be registered.
unregisterUnregisters one or more foreign kernel names with the responder. This is done when the foreign kernel disconnects by POE::Component::IKC::Channel. If this is the default kernel, there is no more default kernel.
First parameter is either a single kernel name or a kernel alias. Second optional parameter is an array ref of kernel aliases to be unregistered. This second parameter is a tad silly, because if you unregister a remote kernel, it goes without saying that all it's aliases get unregistered also.
register_localRegisters new aliases for local kernel with the responder. This is done internally by POE::Component::IKC::Server and POE::Component::IKC::Client. Will NOT define the default kernel.
First and only parameter is an array ref of kernel aliases to be registered.
publishTell IKC that some states in the current session are available for use by foreign sessions.
sessionA session alias by which the foreign kernels will call it. The alias must already have been registered with the local kernel.
statesArrayref of states that foreign kernels may post.
$kernel->post('IKC', 'publish', 'me', [qw(foo bar baz)]);
# or
$ikc->publish('me', [qw(foo bar baz)]);
retractTell IKC that some states should no longer be available for use by foreign sessions. You do not have to retract all published states.
sessionSame as in publish
statesSame as in publish. If not supplied, *all* published states are
retracted.
$kernel->post('IKC', 'retract', 'me', [qw(foo mibble dot)]);
# or
$ikc->retract('me', [qw(foo)]);
published$list=$kernel->call(IKC=>'published', $session);
Returns a list of all the published states.
$hash=$kernel->call(IKC=>'published');
Returns a hashref, keyed on session IDs. Values are arrayref of states published by that session.
sessionA session alias that you wish the list of states for.
subscribeSubscribe to foreign sessions or states. When you have subscribed to a foreign session, a proxy session is created on the local kernel that will allow you to post to it like any other local session.
specifiersAn arrayref of the session or state specifiers you wish to subscribe to. While the wildcard '*' kernel may be used, only the first kernel that acknowledges the subscription will be proxied.
callbackEither a state (for the state interface) or a coderef (for the object interface) that is posted (or called) when all subscription requests have either been replied to, or have timed out.
When called, it has a single parameter, an arrayref of all the specifiers that IKC was able to subscribe to. It is up to you to see if you have enough of the foreign sessions or states to get the job done, or if you should give up.
While callback isn't required, it makes a lot of sense to use it because
it is only way to find out when the proxy sessions become available.
Example :
$ikc->subscribe([qw(poe://Pulse/timeserver)],
sub { $kernel->post('poe://Pulse/timeserver', 'connect') });
(OK, that's a bad example because we don't check if we actually managed to subscribe or not.)
$kernel->post('IKC', 'subscribe',
[qw(poe://e-comm/CC poe://TouchNet/validation
poe://Cantax/JDE poe://Informatrix/JDE)
],
'poe:subscribed',
);
# and in state 'subscribed'
sub subscribed
{
my($kernel, $specs)=@_[KERNEL, ARG0];
if(@$specs != 4)
{
die "Unable to find all the foreign sessions needed";
}
$kernel->post('poe://Cantax/JDE', 'write', {...somevalues...});
}
This is a bit of a mess. You might want to use the subscribe parameter
to spawn instead.
Subscription receipt timeout is currently set to 120 seconds.
unsubscribeReverse of the subscribe method. However, it is currently not documented well.
pingResponds with 'PONG'. This is auto-published, so it can be called from remote kernels to see if the local kernel is still around. In fact, I don't see any other use for this.
$kernel->post('poe://remote/IKC', 'ping', 'some_state');
$kernel->delay('some_state', 60); # timeout
sub some_state
{
my($pong)=$_[ARG0];
return if $pong; # all is cool
# YOW! Remote kernel timed out. RUN AROUND SCREAMING!
}
shutdownHopefully causes IKC and all peripheral sessions to dissapear in a puff of smoke. At the very least, any sessions left will be either not related to IKC or barely breathing (that is, only have aliases keeping them from GC). This should allow you to sanely shut down your process.
monitorAllows a session to monitor the state of remote kernels. Currently, a
session is informed when a remote kernel is registered, unregistered,
subscribed to or unsubscribed from. One should make sure that the IKC alias
exists before trying to monitor. Do this by calling
POE::Component::IKC::Responder->spawn
or in an on_connect callback.
$kernel->post('IKC', 'monitor', $remote_kernel_id, $states);
$remote_kernel_idName or alias or IKC specifier of the remote kernel you wish to monitor.
You can also specify * to monitor ALL remote kernels. If you do, your
monitor will be called several times for a given kernel. This is because a
kernel has one name and many aliases. For example, a remote kernel will
have a unique ID within the local kernel, a name (passed to or generated by
create_ikc_{kernel,client}) and a globaly unique ID assigned by the remote
kernel via $kernel->ID. This suprises some people, but see the short note
after the explanation of the callback parameters.
Note: An effort has been made to insure that when monitoring *,
register is first called with the remote kernel's unique ID, and
subsequent calls are aliases. This can't be guaranteed at this time,
however.
$statesHashref that specifies what callback states are called when something interesting happens. If $state is empty or undef, the session will no longer monitor the given remote kernel.
The following states can be monitored:
registerCalled when a remote kernel or alias is registered. This is equivalent to when the connection phase is finished.
unregisterCalled when a remote kernel or alias is unregistered. This is equivalent to when the remote kernel disconnects.
subscribeCalled when IKC succeeds in subscribing to a remote session. ARG3 is an IKC::Specifier of what was subscribed to. Use this for posting to the proxy session.
unsubscribeCalled when IKC succeeds in unsubscribing from a remote session.
shutdownYou are informed whenever someone tries to do a sane shutdown of IKC and all peripheral sessions. This will called only once, after somebody posts an IKC/shutdown event.
dataLittle bit of data (can be scalar or reference) that is passed to the callback. This allows you to more magic.
The callback states are called the following parameters :
ARG0Name of the kernel that was passed to poe://*/IKC/monitor
ARG1ID or alias of remote kernel from IKC's point of view.
ARG2A flag. If this is true, then ARG1 is the remote kernel unique ID, if
false, then ARG1 is an alias. This is mostly useful when monitoring *
and is in fact a bit bloatful.
ARG3$state->{data} ie any data you want.
ARG4 ... ARGNCallback-specific parameters. See above.
Most of the time, ARG0 and ARG1 will be the same. Exceptions are if you are
monitoring * or if you supplied a full IKC event specifier to
IKC/monitor rather then just a plain kernel name.
*There are 2 reasons circonstances in which you will be monitoring all remote kernels : names known in advance and names unknown in advance.
If you know kernel names in advance, you might be better off monitoring a
given kernel name. However, you might prefer doing a case-like compare on
ARG1 (with regexes, say). This would be useful for clustering, where
various redundant kernels could follow a naming convention like
[application]-[host], so you could compare ARG1 with /^credit-/ to
find out if you want to set up specific things for that kernel.
Not knowing the name of a kernel in advance, you could be doing some sort of autodiscovery or maybe just monitoring for debuging, logging or book-keeping purposes. You obviously don't want to do autodiscovery for every alias of every kernel, only for the "cannonical name", hence the need for ARG2.
You are more then allowed (in fact, you are encouraged) to use the same callback states when monitoring multiple kernels. In this case, you will find ARG0 useful for telling them apart.
$kernel->post('IKC', 'monitor', '*',
{register=>'remote_register',
unregister=>'remote_unregister',
subscribe=>'remote_subscribe',
unsubscribe=>'remote_unsubscribe',
data=>'magic box'});
Now remote_{register,unregister,subscribe,unsubscribe} is called for any remote kernel.
$kernel->post('IKC', 'monitor', 'Pulse', {register=>'pulse_connected'});
pulse_connected will be called in current session when you succeed in
connecting to a kernel called 'Pulse'.
$kernel->post('IKC', 'monitor', '*');
Session is no longer monitoring all kernels, only 'Pulse'.
$kernel->post('IKC', 'monitor', 'Pulse', {});
Now we aren't even interested in 'Pulse';
create_ikc_responderThis function creates the Responder session and object. However, you don't need to call this directly, because POE::Component::IKC::Client or POE::Component::IKC::Server does this for you.
Deprecated, use spawn.
Event handlers invoked via IKC will have a proxy SENDER session. You may use it to post back to the remote session.
$poe_kernel->post( $_[SENDER], 'response', @args );
Normally this proxy session is available during the invocation of the event handler. You may claim it for longer by setting an external reference:
$heap->{remote} = $_[SENDER]->ID;
$poe_kernel->refcount_increment( $heap->{remote}, 'MINE' );
POE::Component::IKC will detect this and create a new proxy session for future calls. It will then be UP TO YOU to free the session:
$poe_kernel->refcount_decrement( $heap->{remote}, 'MINE' );
Note that you will have to publish any events that will be posted back.
Sending session references and coderefs to a foreign kernel is a bad idea. At some point it would be desirable to recurse through the paramerters and and turn any session references into state specifiers.
The rsvp state in call is a bit problematic. IKC allows it to be posted
to once, but doesn't check to see if the foreign kernel is the right one.
retract does not currently tell foreign kernels that have subscribed to a
session/state about the retraction.
call()ing a state in a proxied foreign session doesn't work, for obvious
reasons.
Philip Gwyn, <perl-ikc at pied.nu>
Copyright 1999-2009 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| POE-Component-IKC documentation | Contained in the POE-Component-IKC distribution. |
package POE::Component::IKC::Responder; ############################################################ # $Id: Responder.pm 495 2009-05-08 19:46:42Z fil $ # Based on tests/refserver.perl # Contributed by Artur Bergman <artur@vogon-solutions.com> # Revised for 0.06 by Rocco Caputo <troc@netrus.net> # Turned into a module by Philp Gwyn <fil@pied.nu> # # Copyright 1999-2009 Philip Gwyn. All rights reserved. # This program is free software; you can redistribute it and/or modify # it under the same terms as Perl itself. # # Contributed portions of IKC may be copyright by their respective # contributors. use strict; use vars qw($VERSION @ISA @EXPORT @EXPORT_OK $ikc); use Carp; use Data::Dumper; use POE qw(Session); use POE::Component::IKC::Specifier; use Scalar::Util qw(reftype); require Exporter; @ISA = qw(Exporter); @EXPORT = qw(create_ikc_responder $ikc); $VERSION = '0.2200'; sub DEBUG { 0 } ############################################################################## #---------------------------------------------------- # This is just a convenient way to create only one responder. sub create_ikc_responder { __PACKAGE__->spawn(); } sub spawn { my($package)=@_; return 1 if $ikc; POE::Session->create( package_states => [ $package, [qw( _start _stop _child request post call raw_message post2 remote_error register unregister register_local register_channel default publish retract subscribe unsubscribe published monitor inform_monitors shutdown do_you_have ping sig_INT )] ]); return 1; } #---------------------------------------------------- # Accept POE's standard _start message, and start the responder. sub _start { my($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION]; DEBUG and warn "$$: Responder started.\n"; $kernel->alias_set('IKC'); # allow it to be called by name # $kernel->signal(INT=>'sig_INT'); # sig_INT is empty, so don't bother $ikc=POE::Component::IKC::Responder::Object->new($kernel, $session); $heap->{self}=$ikc; } sub _stop { DEBUG and warn "$$: $_[HEAP] responder _stop\n"; # use YAML qw(Dump); # use Data::Denter; # warn Denter $poe_kernel; } sub _child { my( $heap, $reason, $session, $ret ) = @_[ HEAP, ARG0, ARG1, ARG2 ]; DEBUG and warn "$$: $_[HEAP] responder _child $reason, $session\n"; } #---------------------------------------------------- # Shutdown everything IKC related that we know about sub shutdown { my($kernel, $heap)=@_[KERNEL, HEAP]; $heap->{self}->shutdown($kernel); } #---------------------------------------------------- # Foreign kernel called something here sub request { my($kernel, $heap, $request) = @_[KERNEL, HEAP, ARG0]; $heap->{self}->request($request); } #---------------------------------------------------- # Register foreign kernels so that we can send states to them sub register { my($heap, $channel, $rid, $aliases) = @_[HEAP, SENDER, ARG0, ARG1]; $heap->{self}->register($channel, $rid, $aliases); } #---------------------------------------------------- # Register new aliases for local kernel sub register_local { my($heap, $aliases) = @_[HEAP, ARG0]; $heap->{self}->register_local($aliases); } #---------------------------------------------------- # Unregister foreign kernels when this disconnect (say) sub unregister { my($kernel, $heap, $channel, $rid, $aliases) = @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; $heap->{self}->unregister($channel, $rid, $aliases); } #---------------------------------------------------- # Set a default foreign channel to send messages to sub default { my($heap, $name) = @_[HEAP, ARG0]; $heap->{self}->default($name); } #---------------------------------------------------- # Register a channel. So we can tell it to shutdown before it finishes # negociating sub register_channel { my($heap, $channel, $rid, $aliases) = @_[HEAP, SENDER, ARG0, ARG1]; $heap->{self}->register_channel($channel); } ############################################################################## ## This state allows sessions to monitor a remote kernel #---------------------------------------------------- # Watch any activity regarding a foreign kernel sub monitor { my($heap, $name, $states, $sender) = @_[HEAP, ARG0, ARG1, SENDER]; $heap->{self}->monitor($sender, $name, $states); return } ############################################################################## ## These are the 4 states that interact with the foreign kernel #---------------------------------------------------- # Send a request to the foreign kernel sub post { my($heap, $to, $params, $sender) = @_[HEAP, ARG0, ARG1, SENDER]; $heap->{self}->post($to, $params, $sender); } #---------------------------------------------------- # Send a request to the foreign kernel sub post2 { my($heap, $to, $sender, $params) = @_[HEAP, ARG0, ARG1, ARG2]; # use Data::Dumper; # warn "post2 params=", Dumper $params; $heap->{self}->post($to, $params, $sender); } #---------------------------------------------------- # Send a request to the foreign kernel and ask it to provide # the state's return value back sub call { my($kernel, $heap, $sender, $to, $params, $rsvp) = @_[KERNEL, HEAP, SENDER, ARG0, ARG1, ARG2]; $heap->{self}->call($to, $params, $rsvp, $sender); return; } #---------------------------------------------------- # Send a raw message over. use at your own risk :) # This is useful for sending errors to remote ClientLite sub raw_message { my($heap, $msg, $sender) = @_[HEAP, ARG0, ARG1]; $heap->{self}->send_msg($msg, $sender); } #---------------------------------------------------- # Remote kernel had an error sub remote_error { my($heap, $msg) = @_[HEAP, ARG0]; warn "$$: Remote error: $msg\n"; } ############################################################################## # publish/retract/subscribe mechanism of setting up foreign sessions #---------------------------------------------------- sub publish { my($kernel, $heap, $sender, $session, $states)= @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; $session||=$sender; $heap->{self}->publish($session, $states); } #---------------------------------------------------- sub published { my($kernel, $heap, $which)=@_[KERNEL, HEAP, ARG0]; $heap->{self}->published($which); } #---------------------------------------------------- sub retract { my($heap, $sender, $session, $states)= @_[HEAP, SENDER, ARG0, ARG1]; $session||=$sender; $heap->{self}->retract($session, $states); } #---------------------------------------------------- sub subscribe { my($kernel, $heap, $sender, $sessions, $callback)= @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; $sessions=[$sessions] unless ref $sessions; return unless @$sessions; if($callback and 'CODE' ne ref $callback) { $sender = $sender->ID if ref $sender; my $state=$callback; $callback=sub { DEBUG and warn "Subscription callback to '$state'\n"; $kernel->post($sender, $state, @_); }; } $heap->{self}->subscribe($sessions, $callback, $sender->ID); } # Called by a foreign IKC session # We respond with the session, or with "NOT $specifier"; sub do_you_have { my($kernel, $heap, $param)=@_[KERNEL, HEAP, ARG0]; my $ses=specifier_parse($param->[0]); die "Bad state $param->[0]\n" unless $ses; my $self=$heap->{self}; DEBUG and warn "Wants to subscribe to ", specifier_name($ses), "\n"; if(exists $self->{'local'}{$ses->{session}} and (not $ses->{state} or exists $self->{'local'}{$ses->{session}}{$ses->{state}} )) { $ses->{kernel}||=$kernel->ID; # make sure we uniquely identify DEBUG and warn "Allowed (we are $ses->{kernel})\n"; return [$ses, $kernel->ID]; # this session } else { DEBUG and warn specifier_name($ses), " is not published in this kernel\n"; return "NOT ".specifier_name($ses); } } #---------------------------------------------------- sub unsubscribe { my($kernel, $heap, $sessions)=@_[KERNEL, HEAP, ARG0]; $heap->{self}->unsubscribe($sessions); } #---------------------------------------------------- sub ping { "PONG"; } #---------------------------------------------------- # User wants to kill process / kernel sub sig_INT { my ($heap, $kernel)=@_[HEAP, KERNEL]; DEBUG && warn "$$: Responder::sig_INT\n"; $kernel->sig_handled(); return; } #---------------------------------------------------- # User wants to kill process / kernel sub inform_monitors { my ($heap)=$_[HEAP]; $heap->{self}->inform_monitors(@_[ARG0..$#_]); } ############################################################################## ############################################################################## # Here is the object interface package POE::Component::IKC::Responder::Object; use strict; use Carp; use POE::Component::IKC::Specifier; use POE::Component::IKC::Proxy; use POE::Component::IKC::LocalKernel; use POE qw(Session); use Data::Dumper; sub DEBUG { 0 } sub DEBUG2 { 0 } sub DEBUGM { DEBUG or 0 } sub new { my($package, $kernel, $session)=@_; my $self=bless { 'local'=>{IKC=>{remote_error=>1, # these states are auto-published do_you_have=>1, ping=>1, }, }, remote=>{}, rsvp=>{}, kernel=>{}, channel=>{}, channel_startup=>{}, default=>{}, monitors=>{}, poe_kernel=>$kernel, # myself=>$session->ID, }, $package; } #---------------------------------------------------- # shutdown sub shutdown { my($self, $kernel)=@_; DEBUG and warn "$$: Some one wants us to go away... off we go\n"; # kill our alias $kernel->alias_remove('IKC'); # tell every channel to shutdown while(my($rid, $c)=each %{$self->{channel}}) { DEBUG and warn "$$: Posting shutdown to $rid (id=$c)\n"; $kernel->post($c, 'shutdown'); } $self->{channel} = {}; # even the channels that haven't negociated yet foreach my $c ( keys %{ $self->{channel_startup} } ) { DEBUG and warn "$$: Posting shutdown to channel (id=$c)\n"; $kernel->post( $c, 'shutdown' ); } $self->{channel_startup} = {}; # tell monitors to shutdown $self->inform_monitors('*', 'shutdown'); # kill pending subscription states foreach my $uevent (keys %{$self->{pending_subscription}}) { $self->_remove_state($uevent); } # use YAML qw(Dump); # warn Dump $kernel; } #---------------------------------------------------- # Foreign kernel called something here sub request { my($self, $request)=@_;; my($kernel)=@{$self}{qw(poe_kernel)}; DEBUG2 and warn "IKC request=", Dumper $request; # We ignore the kernel for now, but we should really use it to decide # weither we should run the request or not my $to=specifier_parse($request->{event}); eval { die "$request->{event} isn't a valid specifier" unless $to; my $args=$request->{params}; ### allow proxied states to have multiple ARGs if($to->{state} eq 'IKC:proxy') { $to->{state}=$args->[0]; $args=$args->[1]; DEBUG and warn "IKC proxied request for ", specifier_name($to), "\n"; } else { DEBUG and warn "IKC request for ", specifier_name($to), "\n"; $args=[$args]; } # this is where we'd catch a disconnect message # 2001/07 : eh? # find out if the state we want to get at has been published if(exists $self->{rsvp}{$to->{session}} and exists $self->{rsvp}{$to->{session}}{$to->{state}} and $self->{rsvp}{$to->{session}}{$to->{state}} ) { $self->{rsvp}{$to->{session}}{$to->{state}}--; DEBUG and warn "Allow $to->{session}/$to->{state} is now $self->{rsvp}{$to->{session}}{$to->{state}}\n"; } elsif(not exists $self->{'local'}{$to->{session}}) { my $p=$self->published; die "Session '$to->{session}' is not available for remote kernels:", join "\n", '', map({ " $_=>[" . join(', ', @{$p->{$_}}) . "]"} keys %$p), ''; } elsif(not exists $self->{'local'}{$to->{session}}{$to->{state}}) { die "Session '$to->{session}' has not published state '", $to->{state}, "'\n"; } # maybe caller specified #arg? This got into $msg->{rsvp}, which # went to the remote side, then came back here as $to if(exists $to->{args}) { push @$args, $to->{args}; # it goes on the end } my $session=$kernel->alias_resolve($to->{session}); die "Unknown session '$to->{session}'\n" unless $session; # warn "No FROM" unless $request->{from}; _thunked_post($request->{rsvp}, ["$session", $to->{state}, @$args], $request->{from}, $request->{wantarray}); }; # Error handling consists of posting a "remote_error" state to # the foreign kernel. # $request->{errors_to} is set by the local IKC::Channel if($@) { chomp($@); my $err=$@.' ['.specifier_name($to).']'; $err.=' sent by ['.specifier_name($request->{from}).']' if $request->{from}; warn "$err\n"; DEBUG && warn "$$: Error in request: $err\n"; unless($request->{is_error}) # don't send an error message back { # if this was an error itself $self->send_msg({ event=>$request->{errors_to}, params=>$err, is_error=>1, }); } else { warn $$, Dumper $request; } } } #---------------------------------------------------- # Register foreign kernels so that we can send states to them sub register { my($self, $channel, $rid, $aliases)=@_; $aliases=[$aliases] if not ref $aliases; my($kernel)=@{$self}{qw(poe_kernel)}; $channel = $channel->ID; delete $self->{channel_startup}{ $channel }; if($self->{channel}{$rid}) { warn "$$: Remote kernel '$rid' already exists\n"; return; } else { DEBUG and warn "$$: Registered remote kernel '$rid' (id=$channel)\n"; $self->{channel}{$rid}=$channel; $self->{remote}{$rid}=[]; # list of proxy sessions $self->{alias}{$rid}=$aliases; $self->{default}||=$rid; } foreach my $name (@$aliases) { unless(defined $name) { warn "$$: attempt to register undefined remote kernel alias\n"; next; } if($self->{kernel}{$name}) { DEBUG and warn "$$: Remote alias '$name' already exists\n"; next; } DEBUG and warn "$$: Registered alias '$name'\n"; $self->{kernel}{$name}=$rid; # find real remote ID $self->{remote}{$name}||=[]; # list of proxy sessions } $self->inform_monitors($rid, 'register'); return 1; } #---------------------------------------------------- # Register a new alias for the local kernel sub register_local { my($self, $aliases)=@_; $aliases=[$aliases] if not ref $aliases; my($kernel)=@{$self}{qw(poe_kernel)}; my $rid=$kernel->ID; DEBUG and warn "$$: Registering local kernel '$rid'\n"; $self->{local_channel}||=POE::Component::IKC::LocalKernel->spawn->ID; my $channel=$self->{local_channel}; $self->{channel}{$rid}||=$channel; $self->{remote}{$rid}||=[]; # list of proxy sessions $self->{alias}{$rid}||=[]; # use Data::Dumper; # die Dumper $aliases; foreach my $name (@$aliases) { unless(defined $name) { DEBUG and warn "$$: attempt to register undefined local kernel alias\n"; next; } if($self->{kernel}{$name}) { DEBUG and warn "$$: Local kernel alias '$name' already exists\n"; next; } DEBUG and warn "$$: Registered local alias '$name'\n"; $self->{kernel}{$name}=$rid; # find real remote ID $self->{remote}{$name}||=[]; # list of proxy sessions push @{$self->{alias}{$rid}}, $name; } return 1; } #---------------------------------------------------- # Register a starting channel sub register_channel { my( $self, $channel ) = @_; $channel = $channel->ID; DEBUG and warn "$$: Registered channel (id=$channel)\n"; $self->{channel_startup}{ $channel } = 1; return; } #---------------------------------------------------- sub default { my($self, $name) = @_; if(exists $self->{kernel}{$name}) { $self->{default}=$self->{kernel}{$name}; } elsif(exists $self->{channel}{$name}) { $self->{default}=$name; } else { carp "We do not know the kernel $name.\n"; return; } DEBUG and warn "Default kernel is on channel $name.\n"; } #---------------------------------------------------- # Unregister foreign kernels when they disconnect (say) sub unregister { my($self, $channel, $rid, $aliases)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; return unless $rid; $channel = $channel->ID; unless($aliases) { unless($self->{channel}{$rid}) { # unregister one alias only $aliases=[$rid]; undef $rid; } } elsif(not ref $aliases) { $aliases=[$aliases]; } my @todo; if($rid) { if($self->{channel}{$rid}) { # this is in fact the real name DEBUG and warn "Unregistered kernel '$rid'.\n"; $self->inform_monitors($rid, 'unregister'); $self->{'default'}='' if $self->{'default'} eq $rid; $kernel->post($self->{channel}{$rid}, 'close'); delete $self->{channel}{$rid}; # delete $self->{monitors}{$rid}; $aliases||=delete $self->{alias}{$rid}; push @todo, $rid; } else { warn "$rid isn't a channel???\n"; } } foreach my $name (@$aliases) { next unless defined $name; # delete $self->{monitors}{$name}; if($self->{kernel}{$name}) { DEBUG and warn "Unregistered kernel alias '$name'.\n"; delete $self->{kernel}{$name}; $self->{'default'}='' if $self->{'default'} eq $name; push @todo, $name; } else { DEBUG and warn "Already done: $name\n"; next; } } # tell the proxies they are no longer needed foreach my $name (@todo) { if($name) { foreach my $alias (@{$self->{remote}{$name}}) { $self->{poe_kernel}->post($alias, '_delete'); } delete $self->{remote}{$name}; } } return 1; } #---------------------------------------------------- # Internal function that does all the work of preparing a request to be sent sub send_msg { my($self, $msg, $sender)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; my $e=$msg->{rsvp} ? 'call' : 'post'; my $to=specifier_parse($msg->{event}); unless($to) { die "Bad state ", Dumper $msg; } unless($to) { warn "Bad or missing 'event' parameter '$msg->{event}' to IKC/$e\n"; return; } unless($to->{session}) { warn "Need a session name for IKC/$e\n", Dumper $to; return; } unless($to->{state}) { warn "Need a state name for IKC/$e\n", Dumper $to; return; } my $name=$to->{kernel}||$self->{'default'}; unless($name) { warn "Unable to decide which kernel to send state '$to->{state}' to."; return; } DEBUG and warn "send_msg poe://IKC/$e to '", specifier_name($to), "'\n"; # This way the thunk session will proxy a request back to us if($sender and not $msg->{from} and # <sungo> Leolo: question. doesnt .13 require bleadPOE? # <Leolo> sungo : i can put the offending code into a conditional # if you want $self->{poe_kernel}->can('alias_list')) { my $sid=$sender; $sid = $sender->ID if ref $sender; foreach my $a ($self->{poe_kernel}->alias_list($sender)) { $sid .= " ($a)"; if($self->{'local'}{$a}) { # SENDER published something $msg->{from}={ kernel=>$self->{poe_kernel}->ID, session=>$a, state=>'IKC:proxy', }; last; } } DEBUG2 and do { unless($msg->{from}) { warn "Session $sid didn't publish anything SENDER isn't set";#, Denter $self->{'local'}, $sender; } else { warn "Session $sid will be thunked"; } } } # This is where we should recurse $msg->{params} to turn anything # extravagant like a subref, $poe_kernel, session etc into a call back to # us. # $msg->{params}=$self->marshall($msg->{params}); # Get a list of channels to send the message to my @channels=$self->channel_list( $name ); unless(@channels) { warn "$$: MSG TO ", Dumper $to; warn (($name eq '*') ? "$$: Not connected to any foreign kernels.\n" : "$$: Unknown kernel '$name'.\n"); warn "$$: Known kernels: ". $self->channel_names; return 0; } # now send the message over the wire # hmmm.... i wonder if this could be stream-lined into a direct call my $count=0; my $rsvp; $rsvp=$msg->{rsvp} if exists $msg->{rsvp}; foreach my $channel (@channels) { # We need to be able to access this state w/out forcing folks # to use publish if($rsvp) { DEBUG and warn "Allow $rsvp->{session}/$rsvp->{state} once\n"; $self->{rsvp}{$rsvp->{session}}{$rsvp->{state}}++; } DEBUG2 and warn "Sending to '$channel'..."; if($kernel->call($channel, 'send', $msg)) { $count++; DEBUG2 and warn " done.\n"; } else { DEBUG2 and warn " failed.\n"; $self->{rsvp}{$rsvp->{session}}{$rsvp->{state}}-- if $rsvp; } } DEBUG2 and warn specifier_name($to), " sent to $count kernel(s).\n"; DEBUG and do {warn "$$: send_msg failed!\n" unless $count}; return $count; } #---------------------------------------------------- sub _true_type { my($self, $data, $can)=@_; my $r=ref $data; return unless $r; return reftype( $data ); } #---------------------------------------------------- sub marshall { my($self, $data)=@_; my $r=$self->_true_type($data, 'ikc_marshall'); return $data unless $r; if($r eq 'HASH') { foreach my $q (values %$data) { $data->{$q}=$self->marshall($data->{$q}) if ref $data->{$q}; } } elsif($r eq 'ARRAY') { foreach my $q (@$data) { $q=$self->marshall($q) if ref $q; } } elsif($r eq 'SCALAR') { $$data=$self->marshall($$data) if ref $$data; } elsif($r eq 'CODE') { my $q=Devel::Peek::CvGV($data); if($q=~/__ANON__$/) { warn "Can't marshall anonymous code ref $q\n"; return; } return "-IKC-CODEREF-$q"; } else { warn "Marshalling $r wouldn't be meaningful\n"; return; } return $data; } #---------------------------------------------------- sub demarshall { my($self, $data)=@_; my $r=$self->_true_type($data, 'ikc_demarshall'); unless($r) { if($r=~/^-IKC-CODEREF-(.+)-(\*[:\w]+)$/) { my $func=$2; my $rk=$1; die "need to call $func in $rk"; $data=sub {$poe_kernel->post(IKC=>'post', "poe://$rk/IKC/coderef"=>$func)}; } return $data; } if($r eq 'HASH') { foreach my $q (values %$data) { $data->{$q}=$self->demarshall($data->{$q}) if ref $data->{$q}; } } elsif($r eq 'ARRAY') { foreach my $q (@$data) { $q=$self->demarshall($q) if ref $q; } } elsif($r eq 'SCALAR') { $$data=$self->demarshall($$data) if ref $$data; } return $data; } #---------------------------------------------------- ## Turn a kernel name or alias into a list of possible channels sub channel_list { my($self, $name)=@_; if($name eq '*') { # all kernels return values %{$self->{channel}}; } if(exists $self->{kernel}{$name}) { # kernel alias my $t=$self->{kernel}{$name}; unless(exists $self->{channel}{$t}) { die "What happened to channel $t!"; } return ($self->{channel}{$t}) } if(exists $self->{channel}{$name}) { # kernel ID return ($self->{channel}{$name}) } return (); } #---------------------------------------------------- ## Get a list of all the channel names (for debugging) sub channel_names { my($self, $name) = @_; if( $name and $name ne '*' ) { return "$name (".join(', ', grep { $self->{kernel}{$_} eq $name } keys %{ $self->{kernel} } ) .")"; } my @ret; foreach $name ( keys %{ $self->{channel} } ) { push @ret, $self->channel_names( $name ); } return @ret if wantarray; return join ', ', @ret; } #---------------------------------------------------- # Send a request to the foreign kernel sub post { my($self, $to, $params, $sender) = @_; $to="poe://$to" unless ref $to or $to=~/^poe:/; # use Data::Dumper; # warn "params=", Dumper $params; $self->send_msg({params=>$params, 'event'=>$to}, $sender); } #---------------------------------------------------- # Send a request to the foreign kernel and ask it to provide # the state's return value back sub call { my($self, $to, $params, $rsvp, $sender)=@_; $to="poe://$to" if $to and not ref $to and $to!~/^poe:/; $rsvp="poe://$rsvp" if $rsvp and not ref $rsvp and $rsvp!~/^poe:/; unless($rsvp) { warn "$$: Missing 'rsvp' parameter in poe:IKC/call\n"; return; } my $t=specifier_parse($rsvp); unless($t) { warn "$$: Bad 'rsvp' parameter '$rsvp' in poe:IKC/call\n"; return; } $rsvp=$t; unless($rsvp->{state}) { DEBUG and warn Dumper $rsvp; warn "$$: rsvp state not set in poe:IKC/call\n"; return; } # Question : should $rsvp->{session} be forced to be the sender? # or will we allow people to point callbacks to other poe:kernel/sessions $rsvp->{session}||=$sender->ID if ref $sender; # maybe a session ID? if(not $rsvp->{session}) # no session alias { die "IKC call requires session IDs, please patch your version of POE\n"; } DEBUG2 and warn "RSVP is ", specifier_name($rsvp), "\n"; # use Data::Dumper; # warn "params=", Dumper $params; $self->send_msg({params=>$params, 'event'=>$to, rsvp=>$rsvp }, $sender ); } ############################################################################## # publish/retract/subscribe mechanism of setting up foreign sessions sub _aliases { my($kernel, $session)=@_; return $session unless ref $session; # make sure it's an object if($kernel->can('alias_list')) { # post-0.15 we register as all aliases for session my @a=$kernel->alias_list($session->ID); return @a if @a; } # pre-0.15 means that we register as session ID... which is less # then useful return $session->ID; } #---------------------------------------------------- sub publish { my($self, $session, $states)=@_; unless($session) { carp "You must specify the session that publishes these states"; return 0; } my @aliases =_aliases($self->{poe_kernel}, $session); foreach my $alias (@aliases) { $self->{'local'}{$alias}||={}; my $p=$self->{'local'}{$alias}; die "\$states isn't an array ref" unless ref($states) eq 'ARRAY'; foreach my $q (@$states) { DEBUG and print STDERR "Published poe:$alias/$q\n"; $p->{$q}=1; } } return 1; } #---------------------------------------------------- sub published { my($self, $session)=@_; if($session) { my $sid=$session; if(not ref $session) { $sid||=$self->{poe_kernel}->ID_lookup($session); } return [keys %{$self->{'local'}{$sid}}]; } my %ret; foreach my $sid (keys %{$self->{'local'}}) { $ret{$sid}=[keys %{$self->{'local'}{$sid}}]; } return \%ret; } #---------------------------------------------------- sub retract { my($self, $session, $states)=@_; unless($session) { warn "You must specify the session that publishes these states"; return 0; } my @aliases=_aliases($self->{poe_kernel}, $session); foreach my $alias (@aliases) { unless($self->{'local'}{$alias}) { warn "Session '$session' ($alias) didn't publish anything, can't retract"; return 0; } if($states) { my $p=$self->{'local'}{$alias}; foreach my $q (@$states) { delete $p->{$q}; } delete $self->{'local'}{$alias} unless keys %$p; } else { delete $self->{'local'}{$alias}; } } return 1; } #---------------------------------------------------- # Subscribing is in two phases # 1- we call a IKC/do_you_have to the foreign kernels # 2- the foreign responds with the session-specifier (if it has published it) # # We create a unique state for the callback for each subscription request # from the user session. It keeps count of how many subscription receipts # it receives and when they are all subscribed, it localy posts the callback # event. # # If more then one kernel sends a subscription receipt, first one is used. sub subscribe { my($self, $sessions, $callback, $s_id)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; $s_id||=join '-', caller; my($ses, $s, $fiddle); # unique identifier for this request $callback||=''; my $unique="IKC:receipt $s_id $callback"; my $id=$kernel->ID; my $count; foreach my $spec (@$sessions) { $ses=specifier_parse($spec); # Session specifier # Create the subscription receipt state $kernel->state($unique.$spec, sub { _subscribe_receipt($self, $unique, $spec, $_[ARG0]) }); $kernel->delay($unique.$spec, 60); # timeout $self->{pending_subscription}{$unique.$spec}=1; if($ses->{kernel}) { $count=$self->send_msg( {event=>{kernel=>$ses->{kernel}, session=>'IKC', state=>'do_you_have' }, params=>[$ses, $id], from=>{kernel=>$id, session=>'IKC'}, rsvp=>{kernel=>$id, session=>'IKC', state=>$unique.$spec}, }, ); # TODO What if this post failed? Session that posted this would # surely want to know } else { # Bleh. User shouldn't be that dumb die "You can't subscribe to a session within the current kernel."; } if($callback) # We need to keep some information around { # for when the subscription receipt comes in $self->{subscription_callback}{$unique}||= { callback=>$callback, sessions=>{}, yes=>[], count=>0, states=>{}, }; $fiddle=$self->{subscription_callback}{$unique}; $fiddle->{states}{$unique.$spec}=$count; $fiddle->{count}+=($count||0); $fiddle->{sessions}->{$spec}=1; if(not $count) { $fiddle->{count}++; $kernel->yield($unique.$spec); } else { DEBUG and warn "Sent $count subscription requests for [$spec]\n"; } } } return 1; } #---------------------------------------------------- # Subscription receipt # All foreign kernel's that have published the desired session # will send back a receipt. # Others will send a "NOT". # This will cause problems when the Proxy session creates an alias :( # # Callback is called we are "done". But what is "done"? When at least # one remote kernel has allowed us to subscribe to each session we are # waiting for. However, at some point we should give up. # # Scenarios : # one foreign kernel says 'yes', one 'no'. # - 'yes' creates a proxy # - 'no' decrements wait count # ... callback is called with session specifier # 2 foreign kernels says 'yes' # - first 'yes' creates a proxy # - 2nd 'yes' should also create a proxy! alias conflict (for now) # ... callback is called with session specifier # one foreign kernel says 'no', and after, another says no # - first 'no' decrements wait count # - second 'no' decrements wait count # ... Subscription failed! callback is called with specifier empty # no answers ever came... # - we wait forever :( sub _subscribe_receipt { my($self, $unique, $spec, $resp)=@_; my $accepted=1; my($ses, $rid)=@$resp if $resp and ref $resp and @$resp; my $del; if(not $ses or not ref $ses) { # REFUSED warn "$$: Refused to subscribe to $spec"; warn "$$: $resp" if $resp; $accepted=0; $del=$unique.$spec; } else { # accepted $ses=specifier_parse($ses); die "Bad state" unless $ses; my($kernel)=@{$self}{qw(poe_kernel)}; DEBUG and warn "Create proxy for ", specifier_name($ses), "\n"; my $proxy=POE::Component::IKC::Proxy->spawn( $ses->{kernel}, $ses->{session}, sub { $kernel->post(IKC=>'inform_monitors', $rid, 'subscribe', $ses)}, # 2002/04 monitor_stop is called in _stop, but we can't # can't post() from _stop, so we call() ourself sub { $kernel->call(IKC=>'inform_monitors', $rid, 'unsubscribe', $ses)}, ); push @{$self->{remote}{$ses->{kernel}}}, $proxy; } # cleanup the subscription request if(exists $self->{subscription_callback}{$unique}) { DEBUG and warn "Subscription [$unique] callback... "; my $fiddle=$self->{subscription_callback}{$unique}; if($fiddle->{sessions}->{$spec} and $accepted) { delete $fiddle->{sessions}->{$spec}; push @{$fiddle->{yes}}, $spec; } $fiddle->{count}-- if $fiddle->{count}; if(0==$fiddle->{count}) { DEBUG and warn "yes."; delete $self->{subscription_callback}{$unique}; # use Data::Denter; # warn "Fiddle =", Denter $fiddle; $fiddle->{callback}->($fiddle->{yes}); } else { DEBUG and warn "no, $fiddle->{count} left."; } $fiddle->{states}{$unique.$spec}--; if($fiddle->{states}{$unique.$spec}<=0) { # this state is no longer needed $del=$unique.$spec; } } else { # this state is no longer needed $del=$unique.$spec; } $self->_remove_state($del) if $del; } # clean-up sub _remove_state { my($self, $del)=@_; return unless $self->{pending_subscription}{$del}; my $kernel=$self->{poe_kernel}; $kernel->delay($del); $kernel->state($del); delete $self->{states}{$del}; delete $self->{pending_subscription}{$del}; } #---------------------------------------------------- sub unsubscribe { my($self, $sessions)=@_; $sessions=[$sessions] unless ref $sessions; return unless @$sessions; foreach my $ses (@$sessions) { $self->{poe_kernel}->post($ses, '_shutdown'); } } #---------------------------------------------------- sub ping { "PONG"; } #------------------------------------------------------------------ sub monitor { my($self, $sender, $name, $states)=@_; # <Leolo_1> dngor : also, if i keep a ref to $_[SENDER], does this mess # up stuff? # <dngxor> yeah, it will mess stuff up. take its ID instead; you can # post to an ID $sender=$sender->ID if ref $sender; my $spec=$name; $spec=specifier_part($spec, 'kernel') unless $spec eq '*'; undef($states) unless ref $states and keys %$states; if($states) { $states->{__name}=$name; DEBUGM and warn "$$: Session $sender is monitoring $spec\n"; $self->{monitors}{$spec} ||= {}; $self->{monitors}{$spec}{$sender}=$states; } else { DEBUGM and warn "$$: Session $sender is neglecting $spec\n"; delete $self->{monitors}{$spec}{$sender}; delete $self->{monitors}{$spec} if 0==keys %{$self->{monitors}{$spec}}; } return; } #---------------------------------------------------- # Tell monitors about something in foreign kernel # $rid == kernel name (in which case we ALSO inform about alises) or alias # or * (tell every monitor about something... future use) # $event == name of event we are informing about # @params == other stuff # NB : inform_monitors *MUST* post or call the monitors before exiting # because unregister will delete {monitors}{$rid} right after sub inform_monitors { my($self, $rid, $event, @params)=@_; my($kernel)=@{$self}{qw(poe_kernel)}; $rid=specifier_part($rid, 'kernel') unless $rid eq '*'; croak "$$: No kernel in $_[1]!" unless $rid; my $real=1 if $self->{channel}{$rid}; DEBUGM and do { warn "$$: inform $event $rid"; warn "$$: $rid is", ($real ? '' : "n't"), " real\n"; }; # got to be a better way of doing this... my @todo=($rid); push @todo, '*' unless $rid eq '*'; foreach my $n (@todo) { next unless $n; my $ms=$self->{monitors}{$n}; unless($ms and %$ms) { DEBUGM and warn "$$: No sessions care about $event $n\n"; next; } foreach my $sender (keys %$ms) { my $states=$ms->{$sender}; my $e=$states->{$event}; next unless $e; DEBUGM and warn "$$: Informing Session $sender/$e about $n/$event\n"; # ARG0 = what Session called the kernel # ARG1 = what kernel calls the kernel # ARG2 = true if kernel is name, false if alias # ARG3 = $states->{data} # ARG4.... = per-message info $kernel->post($sender, $e, $states->{__name}, $rid, $real, $states->{data}, @params); } } # $rid might be an alias to something else, inform about those as well if($self->{channel}{$rid}) { foreach my $ra (@{$self->{alias}{$rid}}) { $self->inform_monitors($ra, $event, @params); } } } ############################################################################## # These are Thunks used to post the actual state on behalf of the foreign # kernel. Currently, the thunks are used as a "proof of concept" and # to accur extra over head. :) # # The original idea was to make sure that $_[SENDER] would be something # valid to the posted state. However, garbage collection becomes a problem # If we are to allow the poste session to "hold on to" $_[SENDER]. # # On the first request, a thunk is created. It is kept alive with an alias. # On the next request, we check to see if the extref_count is zero. If it # is, we reuse the same request. If not, we create a new thunk and continue # using that. What's more, we tell the thunk that it is active, so it should # clear its alias. This way, when the user code decrements the extref_count # back to zero, the thunk they reserved can be cleared. # # Export thunk the quick way. *_thunked_post=\&POE::Component::IKC::Responder::Thunk::thunk; package POE::Component::IKC::Responder::Thunk; use strict; use Carp; use Data::Dumper; use POE::API::Peek; use POE::Component::IKC; use POE::Session; use POE; sub DEBUG { 0 } sub DEBUG2 { 0 } #---------------------------------------------------- { my $NAME=__PACKAGE__.'00000000'; $NAME=~s/\W+//g; my $current_thunk; my $API; #------------------------------ sub thunk { # my($rsvp, $call, $from, $wantarray)=@_; unless( __active_thunk() ) { __create_thunk(); } # we use call to make sure no other call to us could # happen between _start and __thunk $poe_kernel->call( $current_thunk => '__thunk', @_ ); } #------------------------------ sub __create_thunk { my $thunk = POE::Session->create( package_states => [ __PACKAGE__, [ qw(_start _stop _default __thunk __active ) ] ], args => [++$NAME] ); $current_thunk = "$thunk"; } #------------------------------ sub __active_thunk { return unless $current_thunk; $API ||= POE::API::Peek->new; if( $API->resolve_session_to_id( $current_thunk ) ) { if( 0==$API->get_session_extref_count( $current_thunk ) ) { DEBUG and warn "$$: $NAME reuse\n"; return 1; } $poe_kernel->post( $current_thunk => '__active' ); } undef( $current_thunk ); return; } } #---------------------------------------------------- sub _start { my($kernel, $heap, $name )= @_[KERNEL, HEAP, ARG0]; $heap->{alias} = $heap->{name} = $name; DEBUG and warn "$$: $name create\n"; $kernel->alias_set( $heap->{alias} ); } #---------------------------------------------------- sub __active { my($kernel, $heap) = @_[KERNEL, HEAP]; DEBUG and warn "$$: $heap->{name} active\n"; $kernel->alias_set( delete $heap->{alias} ) if $heap->{alias}; return 1; } #---------------------------------------------------- sub _stop { DEBUG and warn "$$: $_[HEAP]->{name} delete\n"; } #---------------------------------------------------- sub __thunk { my($kernel, $heap, $rsvp, $call, $from, $wantarray)= @_[KERNEL, HEAP, ARG0, ARG1, ARG2, ARG3]; $heap->{from} = $from; # warn "no FROM" unless $from; if($rsvp) { # foreign session wants returned value DEBUG2 and warn "Calling ", Dumper $call; DEBUG2 and do { warn "Wants an array" if $wantarray}; my(@ret, $yes); if($wantarray) { @ret=$kernel->call(@$call); $yes = 0<@ret; } else { $ret[0]=$kernel->call(@$call); $yes = defined $ret[0]; } if($yes) { DEBUG2 and do { local $"=', '; warn "Posted response '@ret' to ", Dumper $rsvp; }; # This is the POSTBACK $POE::Component::IKC::Responder::ikc->send_msg( {params=>($wantarray ? \@ret : $ret[0]), event=>$rsvp}, $call->[0]); } } else { # 2009/05 - use ->call() so that {from} can't be modified # before refcount_increment is called DEBUG2 and warn "Posting ", Dumper $call; $kernel->call(@$call); } } #---------------------------------------------------- sub _default { my($kernel, $heap, $sender, $state, $args)= @_[KERNEL, HEAP, SENDER, ARG0, ARG1]; return if $state =~ /^_/; unless($heap->{from}) { warn "$$: Attempt to respond to an anonymous foreign post with '$state'\n"; return; } if( not $heap->{from}{state} ) { my $event = { %{$heap->{from}} }; $event->{state} = $state; $POE::Component::IKC::Responder::ikc->send_msg( {params=>$args, event=>$event}, $sender ); } else { $POE::Component::IKC::Responder::ikc->send_msg( {params=>[$state, $args], event=>$heap->{from}}, $sender ); } } 1; __END__