/usr/local/CPAN/RPC-Object/RPC/Object/Broker.pm


package RPC::Object::Broker;
use constant DEFAULT_PORT => 8049;
use constant DEFAULT_LISTENER => 5;
use constant DEFAULT_WORKER => 5;
use constant DEFAULT_HEART_BEAT => 1000;
use constant DEFAULT_WORKER_PENDING_SIZE => 10;
use strict;
use threads;
use threads::shared;
use warnings;
use Carp;
use IO::Socket;
use Scalar::Util qw(blessed refaddr);
use Storable qw(nfreeze thaw);
use Thread::Semaphore;
use Thread::Queue;
use RPC::Object::Container;
use RPC::Object::Transport;

sub new {
    my ($class, %arg) = @_;
    my $self = &share({});
    $self->{config} = &share({});
    my $preload = delete $arg{preload};
    $self->{config}{port} = DEFAULT_PORT;
    $self->{config}{listener} = DEFAULT_LISTENER;
    $self->{config}{worker} = DEFAULT_WORKER;
    $self->{config}{heart_beat} = DEFAULT_HEART_BEAT;
    $self->{config}{worker_pending_size} = DEFAULT_WORKER_PENDING_SIZE;
    for (keys %arg) {
        $self->{config}{$_} = $arg{$_};
    }
    $self->{listener_state} = &share({});
    $self->{worker_state} = &share({});
    my $accept_lock = 0;
    $self->{accept_lock} = &share(\$accept_lock);
    $self->{job_pending} = Thread::Queue->new();
    $self->{job_done} = &share({});
    $self->{container} = &share(RPC::Object::Container->new());
    $self->{preload} = &share({});
    my @preload;
    @preload = @{$self->{config}{preload}} if ref $self->{config}{preload} eq 'ARRAY';
    for (@preload) {
        eval { $self->load_module($_) };
        $@ ? carp $@ : ($self->{preload}{$_} = 1);
    }
    bless $self, $class;
    return $self;
}

sub start {
    my $self = shift;
    my %config = %{$self->{config}};

    my $trans = RPC::Object::Transport->new({LocalPort => $config{port},
                                             Listen => SOMAXCONN,
                                             ReuseAddr => 1,
                                            },
                                            Thread::Semaphore->new(),
                                           );

    $self->add_listener($config{listener}, $trans);
    $self->add_worker($config{worker});

    while (1) {
        $self->add_listener(1, $trans) if $self->need_add_listener();
        $self->add_worker(1) if $self->need_add_worker();
        sleep 1;
    }
}

sub need_add_listener {
    my ($self) = @_;
    lock %{$self->{listener_state}};
    return $self->{listener_state}{count} < $self->{config}{listener}
      || $self->{listener_state}{busy};
}

sub need_add_worker {
    my ($self) = @_;
    lock %{$self->{worker_state}};
    return $self->{worker_state}{count} < $self->{config}{worker}
      || $self->{job_pending}->pending() > $self->{worker_state}{count} * $self->{config}{worker_pending_size};
}

sub add_listener {
    my ($self, $n, $trans) = @_;
    while ($n--) {
        warn "adding new listener\n";
        eval {
            lock %{$self->{listener_state}};
            threads->create(\&listener_handler, $self, $trans);
            ++$self->{listener_state}{count};
            $self->{listener_state}{busy} = 0;
        };
        carp "failed to add new listener: $@" if $@;
    }
}

sub add_worker {
    my ($self, $n) = @_;
    while ($n--) {
        warn "adding new worker\n";
        eval {
            lock %{$self->{worker_state}};
            threads->create(\&worker_handler, $self);
            ++$self->{worker_state}{count};
        };
        carp "failed to add new worker: $@" if $@;
    }
}

sub listener_handler {
    my ($self, $trans) = @_;
    threads->detach();
    my $heart_beat = $self->{config}{heart_beat};
    while ($heart_beat--) {
        eval {
            $trans->response(sub{
                                 my ($retry, $req) = @_;
                                 my $cmd = substr $req, 0, 1;
                                 my $arg = substr $req, 1;
                                 my $ret;
                                 if ($retry) {
                                     lock %{$self->{listener_state}};
                                     $self->{listener_state}{busy} = 1;
                                 }
                                 if ($cmd eq 'a') {
                                     $ret = $self->add_job($arg);
                                 }
                                 elsif ($cmd eq 'r') {
                                     $ret = $self->remove_job($arg);
                                 }
                                 return $ret;
                             });
        };
        carp $@ if $@;
    }
    lock %{$self->{listener_state}};
    --$self->{listener_state}{count};
}

sub worker_handler {
    my ($self) = @_;
    threads->detach();
    my $heart_beat = $self->{config}{heart_beat};
    while ($heart_beat--) {
        eval {
            my ($id, $job);
            $job = $self->{job_pending}->dequeue();
            ($id, $job) = unpack('Na*', $job);
            my $arg = thaw($job);
            my $ret = $self->handle_method_call($arg);
            {
                lock %{$self->{job_done}};
                $self->{job_done}{$id} = nfreeze($ret);
            }
        };
        carp $@ if $@;
    }
    lock %{$self->{worker_state}};
    --$self->{worker_state}{count};
}

sub handle_method_call {
    my ($self, $arg) = @_;
    my $context = shift @$arg;
    my $func = shift @$arg;
    my $ref = shift @$arg;
    my $container = $self->{container};

    if ($func eq '_rpc_object_find_instance' && $ref eq __PACKAGE__) {
        my $ret = $container->find($arg->[0]);
        return ['r', $ret];
    }

    my $obj = $container->get($ref);
    $obj = $ref unless $obj;
    my $pack = blessed($obj);
    $pack = $ref unless $pack;
    eval { $self->load_module($pack) };
    return ['e', $@] if $@;

    no strict;
    no warnings 'uninitialized';
    my @ret = ();
    if ($context) {
       @ret = eval { $obj->$func(@$arg) };
    } elsif (defined $context) {
        $ret[0] = eval { $obj->$func(@$arg) };
    } else {
        eval { $obj->$func(@$arg) };
    }
    if (blessed $ret[0]) {
        $ret[0] = $container->insert($ret[0]);
    }
    return $@ ? ['e', $@] : ['o', @ret];
}

sub load_module {
    my ($self, $pack) = @_;
    return if $pack eq __PACKAGE__;
    return if !$pack || $self->{preload}{$pack};
    eval qq(require $pack);
    croak $@ if $@;
}

sub add_job {
    my ($self, $arg) = @_;
    my $id;
    while (1) {
        $id = int(rand(time()));
        lock %{$self->{job_done}};
        last unless exists $self->{job_done}{$id};
    }
    $self->{job_pending}->enqueue(pack('Na*', $id, $arg));
    return $id
}

sub remove_job {
    my ($self, $id) = @_;
    lock %{$self->{job_done}};
    my $ret = delete $self->{job_done}{$id};
    return $ret;
}

1;