/usr/local/CPAN/UR/UR/Service/DataSourceProxy.pm
package UR::Service::DataSourceProxy;
use strict;
use warnings;
use UR;
UR::Object::Type->define(
class_name => 'UR::Service::DataSourceProxy',
is => 'UR::Object',
properties => [
host => { type => 'String',
is_transient => 1,
default_value => '0.0.0.0',
doc => 'The local address to listen on'},
port => { type => 'String',
is_transient => 1,
default_value => 10293,
doc => 'The local port to listen on'},
listen_socket => { type => 'IO::Socket',
is_transient => 1,
is_optional => 1,
doc => 'The listen socket'},
all_select => { type => 'IO::Select',
is_transient => 1,
is_optional => 1,
doc => 'Select object for the listen socket and all the client sockets'},
sockets_select => { type => 'IO::Select',
is_transient => 1,
is_optional => 1,
doc => 'Select object for just the client connected sockets'},
use_sigio => { type => 'Boolean',
is_transient => 1,
default_value => 0,
doc => 'The program should use signals to handle requests automaticly' },
sync_through => { type => 'Boolean',
is_transient => 1,
is_optional => 1,
default_value => 0,
doc => 'When a client syncs changes to us, automaticly do a sync_database to propogate changes to our data sources',
},
],
id_by => ['host','port'],
doc => 'An object to manage connections from other processes and appear as a data source to them',
);
# FIXME needs real pod docs. In the mean time, here's how you'd create a program that's just
# a proxy server getting requests:
# use GSC;
# my $proxy = UR::Service::DataSourceProxy->create(host => '0.0.0.0', port => 10293);
# $proxy->process_messages(undef);
# For _read_message. When the messaging is refactored, this wont't be needed
use UR::DataSource::RemoteCache;
use IO::Socket;
use IO::Select;
use FreezeThaw;
use Time::HiRes;
use Fcntl;
sub create {
my $class = shift;
my %params = @_;
my $obj = $class->SUPER::create(@_);
return unless $obj;
return $obj if ($obj->listen_socket);
unless ($obj->_create_listen_socket()) {
$class->error_message("Failed to create listen socket");
return;
}
$obj->enable_sigio_processing(1) if ($obj->use_sigio);
return $obj;
}
sub _listen_queue_size { 5;}
# FIXME This might work better as a UDP socket?
sub _create_listen_socket {
my $self = shift;
my $listen = IO::Socket::INET->new(LocalHost => $self->host,
LocalPort => $self->port,
Listen => $self->_listen_queue_size,
ReuseAddr => 1);
unless ($listen) {
$self->error_message("Couldn't create socket: $!");
return;
}
$self->listen_socket($listen);
$self->all_select(IO::Select->new($listen));
$self->sockets_select(IO::Select->new());
return 1;
}
# Add the given socket to the list of connected clients.
# if socket is undef, it blocks waiting on an incoming connection
sub accept_connection {
my $self = shift;
my $socket = shift;
unless ($socket) {
my $listen = $self->listen_socket;
$socket = $listen->accept();
unless ($socket) {
$self->error_message("accept() failed: $!");
return;
}
}
$self->sockets_select->add($socket);
$self->all_select->add($socket);
$self->_enable_async_io_on_handle($socket) if ($self->use_sigio);
return $socket;
}
sub close_connection {
my $self = shift;
my $socket = shift;
unless ($self->sockets_select->exists($socket)) {
$self->error_message("Passed-in socket is not on the list of connected clients");
return;
}
$self->_disable_async_io_on_handle($socket) if ($self->use_sigio);
$self->sockets_select->remove($socket);
$self->all_select->remove($socket);
$socket->close();
return 1;
}
# process a message on the indicated socket. If socket is undef,
# then process a single message out of the many that may be ready
# to read
sub process_message_from_client {
my($self, $socket) = @_;
# FIXME this always picks the first in the list; it's not fair
$socket ||= ($self->sockets_select->can_read())[0];
return unless $socket;
my($string,$cmd) = UR::DataSource::RemoteCache::_read_message(undef, $socket);
if ($cmd == -1) { # The other end closed the socket
$self->close_connection($socket);
return 1;
}
# We only support get() for now - cmd == 1
my($return_command_value, @results);
if ($cmd == 1) {
# a remote get()
my $rule = (FreezeThaw::thaw($string))[0]->[0];
my $class = $rule->subject_class_name();
@results = $class->get($rule);
$return_command_value = $cmd | 128; # High bit set means a result code
} elsif ($cmd == 2) {
# a remote sync_database()
my($objects_by_class_name) = FreezeThaw::thaw($string);
@results = $self->_merge_object_changes($objects_by_class_name);
$return_command_value = $cmd | 128;
} else {
$self->error_message("Unknown command request ID $cmd");
$return_command_value = 255;
}
my $encoded = '';
if (@results) {
$encoded = FreezeThaw::safeFreeze(\@results);
}
$socket->print(pack("LL", length($encoded), $return_command_value), $encoded);
return 1;
}
sub _merge_object_changes {
my($self,$objects_by_class_name) = @_;
my $return_value = 0;
$DB::single=1;
eval {
# FIXME this would be way cooler if we could lean on UR::Context::Transaction. Before that could
# work, we'd need some way to ask a prior transaction what an object's state is. And while we're
# at it, why not just get rid of the db_saved_uncommitted/db_committed nonsense and just use the
# transaction for that
# Pass 1 - make sure any of the proposed changes isn't a conflict with any of
# our local changes
foreach my $class_name ( keys %$objects_by_class_name ) {
foreach my $their_obj ( @{$objects_by_class_name->{$class_name}} ) {
my $their_id = $their_obj->id;
my $my_obj;
if ($their_obj->isa('UR::Object::Ghost')) {
my($my_class) = substr($class_name, 0, length($class_name) - 7); # Remove ::Ghost
$my_obj = $my_class->get($their_id);
unless ($my_obj) {
$self->error_message("Rejecting commit from remote client. Remotely deleted $class_name id $their_id does not exist locally as $my_class");
$return_value = 0;
return 0;
}
} else {
$my_obj = $class_name->get($their_id);
}
if (! $my_obj && $their_obj->{'db_committed'}) {
$self->error_message("Rejecting commit from remote client. $class_name id $their_id is new locally, but transmitted as changed");
$return_value = 0;
return 0;
} elsif ($my_obj && $my_obj->__changes__()) {
# FIXME is it worth going through all the properties to see if they're compatible?
$self->error_message("Rejecting commit from remote client. $class_name id $their_id is already changed");
$return_value = 0;
return 0;
} else {
$their_obj->{'__my_obj'} = $my_obj;
}
}
}
# Pass 2 - Apply changes to our transaction
foreach my $class_name ( keys %$objects_by_class_name ) {
foreach my $their_obj ( @{$objects_by_class_name->{$class_name}} ) {
my @changes = $their_obj->__changes__;
my @properties = map { $_->properties } @changes; # Seems to be one Change for each changed property
my $my_obj = $their_obj->{'__my_obj'};
if ($their_obj->isa('UR::Object::Ghost')) {
$my_obj->delete();
} elsif ($my_obj) {
foreach my $property ( @properties ) {
$my_obj->$property($their_obj->$property);
}
} else {
my %create_params = map { ( $_, $their_obj->$_ ) } @properties;
$class_name->create(%create_params);
}
}
}
if ($self->sync_through) {
$return_value = UR::Context->commit();
} else {
$return_value = 1;
}
};
if ($@) {
$self->error_message($@);
}
return ($return_value, $self->error_text());
}
# go into a loop processing messages from all the connected sockets (and
# the listen socket), for the given time period in seconds. 0 seconds
# means do one pass through all that are readable and return, undef
# means stay in the loop forever
#
# FIXME socket communication needs to be refactored out to some common location
# for example, the listen socket and client sockets can both implement
# process_message(), where the listen socket does what is accept_connection()
sub process_messages {
my($self,$timeout) = @_;
my $select = $self->all_select;
my $start_time = Time::HiRes::time();
SELECT_LOOP:
while(1) {
my @ready = $select->can_read($timeout);
for (my $i = 0; $i < @ready; $i++) {
if ($ready[$i] eq $self->listen_socket) {
$self->accept_connection();
# If we're running as a signal handler for sigio, and the client has already sent
# data down the newly-connected socket before we can leave the handler, then we've
# already lost the signal for that new data since it was masked. This workaround
# gives the above select() a chance to see the data being ready.
next SELECT_LOOP;
} else {
$self->process_message_from_client($ready[$i]);
}
}
last if(defined($timeout) &&
( $timeout == 0 ||
(Time::HiRes::time() - $start_time > $timeout)
)
);
}
return 1;
}
# FIXME There's a more efficient method of determining which handles
# need attention during a sigio than select()ing. See the manpage of
# fcntl(2) and the section on F_SETSIG and setting SA_SIGINFO. Implement
# this later
# FIXME there should probably be a way to turn it off, too
sub enable_sigio_processing {
my $self = shift;
$self->use_sigio(1);
my $prev_sigio_handler = $SIG{'IO'};
# Step 1, set the closure for handling the signal
$SIG{'IO'} = sub {
$self->process_messages(0);
return unless $prev_sigio_handler;
$prev_sigio_handler->();
};
# Set up async IO stuff on all the active handles
foreach my $fh ( $self->all_select->handles ) {
$self->_enable_async_io_on_handle($fh);
}
}
sub _enable_async_io_on_handle {
my($self,$fh) = @_;
# Set the pid for processing the signal (that would be us)
# At one point, there was a bug in fcntl that could cause it to die
# with a "Modification of a read-only value attempted" exception
# if the F_SETOWN was right in fcntl()'s arg list. I don't know if
# or when the bug was fixed, but the workaround is to copy the constant
# value into a mutable scalar first
my $getowner = &Fcntl::F_GETOWN;
my $prev_owner = fcntl($fh, $getowner,0);
my $setowner = &Fcntl::F_SETOWN;
#my $pid = $$;
#$pid += 0; # if $$ was ever used in string context, fcntl does the wrong thing with it. This forces it back to numberic context
unless (fcntl($fh, $setowner, $$ + 0)) {
$self->error_message("fcntl F_SETOWN failed: $!");
return;
}
# my $setsig = &Fcntl::F_SETSIG;
# my $signum = 100;
# unless (fcntl($fh, $setsig, $signum)) {
# $self->error_message("fcntl F_SETSIG failed: $!");
# return;
# }
# Enable the async IO flag
my $flags = fcntl($fh, &Fcntl::F_GETFL,0);
unless ($flags) {
$self->error_message("fcntl F_GETFL failed: $!");
fcntl($fh, $setowner, $prev_owner);
return;
}
$flags |= &Fcntl::O_ASYNC;
unless (fcntl($fh, &Fcntl::F_SETFL, $flags)) {
$self->error_message("fcntl F_SETFL failed: $!");
fcntl($fh, $setowner, $prev_owner);
return;
}
return 1;
}
# This needs to be called when a handle closes or it'll generate an endless stream
# of signals to let us know that it's closed
sub _disable_async_io_on_handle {
my($self,$fh) = @_;
# I think it's good enough it just turn off the async flag, and not
# have to reset the FH's owner
my $flags = fcntl($fh, &Fcntl::F_GETFL,0);
unless ($flags) {
$self->error_message("fcntl F_GETFL failed: $!");
return;
}
$flags &= ~(&Fcntl::O_ASYNC);
unless (fcntl($fh, &Fcntl::F_SETFL, $flags)) {
$self->error_message("fcntl F_SETFL failed: $!");
return;
}
return 1;
}
1;