/usr/local/CPAN/Stem/Stem/Portal.pm
# File: Stem/Portal.pm
# This file is part of Stem.
# Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
# Stem is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# Stem is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Stem; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# For a license to use the Stem under conditions other than those
# described here, to purchase support for this software, or to purchase a
# commercial warranty contract, please contact Stem Systems at:
# Stem Systems, Inc. 781-643-7504
# 79 Everett St. info@stemsystems.com
# Arlington, MA 02474
# USA
package Stem::Portal ;
use strict ;
use Carp ;
use Data::Dumper ;
use Stem::AsyncIO ;
use Stem::Vars ;
use Stem::Packet ;
use Stem::Socket ;
use Stem::Trace 'log' => 'stem_status', 'sub' => 'TraceStatus' ;
use Stem::Trace 'log' => 'stem_error' , 'sub' => 'TraceError' ;
my %name_to_portal ;
my %portal_to_names ;
my $default_portal ;
Stem::Route::register_class( __PACKAGE__, 'port' ) ;
my $attr_spec_portal = [
{
'name' => 'reg_name',
'default' => '',
'help' => <<HELP,
This is a unique name used to register this instance of a Portal.
HELP
},
{
'name' => 'server',
'env' => 'server',
'help' => <<HELP,
This determines if we are a server or a client.
If it is true, we are a server. Otherwise, we are a client.
HELP
},
{
'name' => 'sync',
'type' => 'boolean',
'default' => 1,
'help' => <<HELP,
Mark this as a synchronously connecting Portal. Default is syncronous
connections, set to 0 for asynchronous. In both cases the same method
callbacks are used.
HELP
},
{
'name' => 'port',
'default' => 10_000,
'env' => 'port',
'help' => <<HELP,
This determines which port we bind to if we are a server.
This determines which port we connect to if we are a client.
The default value is 10,000.
HELP
},
{
'name' => 'host',
'default' => 'localhost',
'env' => 'host',
'help' => <<HELP,
This determines which host we attach to when we are a client.
The default value is localhost.
HELP
},
{
'name' => 'spawn_conf_file',
'help' => <<HELP,
This tells the portal to fork another Stem Hub and pass this value as
the configuration file argument to run_stem. The new Hub will be
connected to this Portal and be private to it.
HELP
},
{
'name' => 'spawn_conf_args',
'help' => <<HELP,
This tells the portal to fork another Stem Hub and pass (via a
message) this data to the Stem::Conf as a configuration
The new Hub will be connected to this Portal and be private
to it.
HELP
},
{
'name' => 'run_stem_args',
'help' => <<HELP,
These are the command line arguments to run_stem for the spawned Hub.
HELP
},
{
'name' => 'codec',
'help' => <<HELP,
This is the sub-class that is used to convert messages to/from a byte
stream for this portal
HELP
},
{
'name' => 'disable',
'env' => 'disable',
'type' => 'boolean',
'help' => <<HELP,
This flag will disable this Portal. It will not construct an object and
no errors will be returned.
HELP
},
] ;
sub new {
my( $class ) = shift ;
my $self = Stem::Class::parse_args( $attr_spec_portal, @_ ) ;
return $self unless ref $self ;
return if $self->{ 'disable' } ;
my $name = $Stem::Vars::Hub_name ;
if ( $Env{'portal_use_stdio'} ) {
return $self->new_child_portal() ;
}
if ( $self->{'spawn_conf_file'} || $self->{'spawn_conf_args'} ) {
return $self->new_parent_portal() ;
}
if ( $self->{'server'} ) {
$self->{'type'} = 'listener' ;
$self->{'server_name'} = $name ;
}
else {
$self->{'type'} = 'client' ;
$self->{'name'} = $name ;
}
#print "REG new [$self->{'reg_name'}]\n" ;
my $sock_obj = Stem::Socket->new(
'object' => $self,
'host' => $self->{'host'},
'port' => $self->{'port'},
'server' => $self->{'server'},
'sync' => $self->{'sync'},
) ;
ref $sock_obj or return <<ERR ;
Stem::Portal '$self->{'reg_name'}' tried to connect/listen to $self->{'host'}:$self->{'port'}
$sock_obj
ERR
$self->{'sock_obj'} = $sock_obj ;
return ;
}
sub connected {
my( $self, $connected_sock ) = @_ ;
my( $portal ) ;
TraceStatus "Portal Connected" ;
$self->{'read_fh'} = $connected_sock ;
$self->{'write_fh'} = $connected_sock ;
my $type = $self->{'type'} ;
if ( $type eq 'listener' ) {
# fork off a new portal by making a clone of the listener portal
$portal = bless { %$self } ;
$portal->{'type'} = 'accepted' ;
my $name = $portal->{'server_name'} ;
$portal->{'name'} = $name ;
delete( $portal->{'sock_obj'} ) ;
}
else {
#print "Portal Connected\n" ;
# a client portal is just itself
$portal = $self ;
#print "REG [$self->{'reg_name'}]\n" ;
if ( my $name = $self->{'reg_name'} ) {
$portal->register( $name ) ;
}
unless ( $default_portal ) {
$portal->register( 'DEFAULT' ) ;
$default_portal = $portal ;
}
}
my $err = $portal->_activate() ;
die $err if $err ;
}
my $run_stem_path ;
sub new_parent_portal {
my( $self ) = @_ ;
$run_stem_path ||= do {
require Stem::Proc ;
require Stem::InstallConfig ;
$Stem::InstallConfig{ run_stem_path } ;
} ;
my $conf_file = $self->{'spawn_conf_file'} || 'portal_child' ;
my @run_stem_args = @{$self->{'run_stem_args'} || []} ;
my $proc = Stem::Proc->new(
path => $run_stem_path,
proc_args => [
'portal_use_stdio=1',
@run_stem_args,
$conf_file,
],
spawn_now => 1,
cell_attr => [
no_io => 1,
],
) ;
$self->{'proc'} = $proc ;
TraceStatus "Portal Paren" ;
$self->{'read_fh'} = $proc->read_fh() ;
$self->{'write_fh'} = $proc->write_fh() ;
#print "REG [$self->{'reg_name'}]\n" ;
my $err = $self->_activate() ;
die $err if $err ;
###########
# $self->{'spawn_conf_args'} ) {
#### when can we send the conf data?
##########
}
sub new_child_portal {
my( $self ) = @_ ;
$self->{'type'} = 'child' ;
TraceStatus "Portal Child" ;
$self->{'read_fh'} = \*STDIN ;
$self->{'write_fh'} = \*STDOUT ;
#print "REG [$self->{'reg_name'}]\n" ;
unless ( $default_portal ) {
$self->register( 'DEFAULT' ) ;
$default_portal = $self ;
}
if ( my $portal_name = $Env{'portal_name'} ) {
$self->register( $portal_name ) ;
}
my $err = $self->_activate() ;
die $err if $err ;
}
sub _activate {
my( $self ) = @_ ;
TraceStatus "Active portal" ;
my $aio = Stem::AsyncIO->new(
'object' => $self,
'read_fh' => $self->{'read_fh'},
'write_fh' => $self->{'write_fh'},
'read_method' => 'portal_data',
'closed_method' => 'portal_closed',
) ;
return $aio unless ref $aio ;
$self->{'aio'} = $aio ;
my $packet = Stem::Packet->new( 'codec' => $self->{'codec'} ) ;
return $packet unless ref $packet ;
$self->{'packet'} = $packet ;
my $msg = Stem::Msg->new( 'from' => "${Stem::Vars::Hub_name}:port",
'type' => 'register',
) ;
return $msg unless ref $msg ;
$self->write_msg( $msg ) ;
return ;
}
# this is not a method, but a class sub
sub send_msg {
my ( $msg, $to_hub ) = @_ ;
$to_hub ||= 'DEFAULT' ;
my $self = $name_to_portal{ $to_hub } ;
return "unknown Portal '$to_hub'" unless $self ;
$msg->from_hub( $self->{'name'} ) unless $msg->from_hub() ;
# $msg->from_hub( $self->{'name'} ) ;
unless( $self->{'remote_hub'} ) {
push( @{$self->{'queued_msgs'}}, $msg ) ;
return ;
}
$self->write_msg( $msg ) ;
return ;
}
# this is a regular method called by the above sub.
sub write_msg {
my( $self, $msg ) = @_ ;
my $packet_text = $self->{'packet'}->to_packet( $msg ) ;
#print "PACK SEND [$packet_text]\n" ;
$self->{'aio'}->write( $packet_text ) ;
}
sub portal_data {
my( $self, $packet_text ) = @_ ;
my $packet = $self->{'packet'} ;
# parse out all messages that may be in the input data
while( my $msg = $packet->to_data( $packet_text ) ) {
$self->_portal_msg_in( $msg ) ;
# no more incoming data in this callback
$packet_text = '' ;
}
}
sub _portal_msg_in {
my( $self, $msg ) = @_ ;
if ( $msg->type() eq 'register' ) {
# register the other hub and mark this hub as connecting to it.
$self->{'remote_hub'} = $msg->from_hub() ;
warn( caller(), $msg->dump() ) and die
'Msg Has No Remote Hub' unless $self->{'remote_hub'} ;
$self->register( $self->{'remote_hub'} ) ;
# handle messages that got queued while the portal was down
while( my $queued_msg = shift @{$self->{'queued_msgs'}} ) {
#print $queued_msg->dump( 'QUEUED' ) ;
$self->write_msg( $queued_msg ) ;
}
return ;
}
$msg->in_portal( $self->{'remote_hub'} ) ;
$msg->dispatch() ;
}
sub portal_closed {
my( $self ) = @_ ;
#TraceStatus "Portal closed" ;
Stem::Route::unregister_cell( $self ) ;
my $names = $self->unregister() ;
if ( $self->{'type'} eq 'accepted' ) {
# TraceStatus "client hub '$self->{'name'}' closed" ;
$self->shut_down() ;
return ;
}
my @hub_names = ref $names ? @{$names} : 'UNKNOWN' ;
Stem::Event::end_loop() ;
die "server hub [@hub_names] died" ;
}
sub shut_down {
my( $self ) = @_ ;
TraceStatus "SHUT DOWN port : ". Dumper($self);
$self->{'aio'}->shut_down() ;
delete @{$self}{qw( object aio )} ;
}
# this is for messages directly to this portal. messages are sent out
# the portal via the send class method
#
# UNUSED so far
sub msg_in {
my( $self, $msg ) = @_ ;
TraceStatus "portal msg in" ;
}
sub register {
my( $self, $name ) = @_ ;
#print "NAME [$name]: ", caller(), "\n" ;
TraceStatus "portal arg: [$self] [$name]\n\t",
map( "<$_>", caller() ), "\n" ;
$name_to_portal{ $name } = $self ;
push( @{$portal_to_names{ $self }}, $name ) ;
}
sub unregister {
my( $name ) = @_ ;
# convert a name to its object ;
my $portal = ref $name ? $name : $name_to_portal{ $name } ;
if ( $portal ) {
delete $name_to_portal{ $portal } ;
my $names = delete $portal_to_names{ $portal } ;
return $names ;
}
return ;
}
sub status_cmd {
my ($self, $msg ) = @_ ;
#print $msg->dump( 'PORT' ) ;
my $status = <<STATUS ;
Portal Status for Hub '$Stem::Vars::Hub_name'
STATUS
foreach my $port_name ( sort keys %name_to_portal ) {
my $portal = $name_to_portal{ $port_name } ;
$status .= <<STATUS ;
$port_name
Hub: $portal->{'remote_hub'}
Type: $portal->{'type'}
STATUS
}
return $status ;
}
1 ;