Spread::Queue::ManagedWorker - utility class for Spread::Queue::Manager


Spread-Queue documentation Contained in the Spread-Queue distribution.

Index


Code Index:

NAME

Top

  Spread::Queue::ManagedWorker - utility class for Spread::Queue::Manager

DESCRIPTION

Top

Tracks each worker that is registered with the queue manager. Retains worker state.

Eventually add activity metrics (# messages assigned, uptime, utilization, etc.).

States are:

ready - available for task assignment

Worker controls this by sending a 'ready' message to the queue manager.

assigned - allocated to a task

Set by manager, after a message is transmitted to a ready worker.

acknowledged - worker is working on a task

When a 'working' message is received from an assigned worker.

terminated - no longer available for task assignment

'terminated' message has been received from worker, or an expected status update has not been received so queue manager marks the worker as dead.

If an assigned worker is terminated, then the task that was assigned to that worker will be re-assigned to another worker.

METHODS

Top

AUTHOR

Top

Jason W. May <jmay@pobox.com>

COPYRIGHT

Top

SEE ALSO

Top

  L<Spread::Queue>


Spread-Queue documentation Contained in the Spread-Queue distribution.
package Spread::Queue::ManagedWorker;

require 5.005_03;
use strict;
use vars qw($VERSION);
$VERSION = '0.3';

my $AGE_THRESHOLD = 5;

sub new {
    my $proto = shift;
    my $class = ref ($proto) || $proto;

    my $self  = {};
    bless ($self, $class);

    $self->{PRIVATE} = shift;

    return $self;
}

sub private {
    my ($self) = shift;

    return $self->{PRIVATE};
}

sub status {
    my ($self) = shift;

    return $self->{STATUS};
}

sub is_ready {
    my ($self) = shift;

    return $self->{STATUS} eq "ready";
}

sub ready {
    my ($self) = shift;

    $self->{STATUS} = "ready";
    $self->{LAST_PING} = time;
}

# In this state, a message has been assigned to a worker
# but it hasn't confirmed yet that it is working on it.
sub assigned {
    my ($self) = shift;

    $self->{STATUS} = "assigned";
    $self->{LAST_PING} = time;
}

sub is_assigned {
    my ($self) = shift;

    return $self->{STATUS} eq "assigned";
}

sub working {
    my ($self) = shift;

    $self->{STATUS} = "working";
    delete $self->{LAST_PING};
}

sub is_working {
    my ($self) = shift;

    return $self->{STATUS} eq "working";
}

sub acknowledged {
    my ($self) = shift;

    $self->{STATUS} = "ack";
    $self->{LAST_PING} = time;
}

sub terminated {
    my ($self) = shift;

    $self->{STATUS} = "dead";
    delete $self->{LAST_PING};
}

sub is_terminated {
    my ($self) = shift;

    return $self->{STATUS} eq "terminated";
}

sub is_talking {
    my ($self) = shift;

    return $self->is_ready && ($self->{LAST_PING} > time-$AGE_THRESHOLD);
}

sub is_stuck {
    my $self = shift;

    # task was assigned to the worker, but it never acknowledged
    return $self->is_assigned && ($self->{LAST_PING} < time-$AGE_THRESHOLD);
}

1;