Mvalve::Base - Base Class For Mvalve Reader/Writer


Mvalve documentation Contained in the Mvalve distribution.

Index


Code Index:

NAME

Top

Mvalve::Base - Base Class For Mvalve Reader/Writer

METHODS

Top

defer

Inserts in the the retry_wait queue.

clear_all

Clears all known queues that are listed under the registered QueueSet

queue

queue is the actual queue instance that we'll be dealing with. While the architecture is such that you can replace the queue with your custom object, we currently only support Q4M

  $self->queue( {
    module => "Q4M",
    connect_info => [ 'dbi:mysql:...', ..., ... ]
 } );


Mvalve documentation Contained in the Mvalve distribution.

# $Id: /mirror/coderepos/lang/perl/Mvalve/trunk/lib/Mvalve/Base.pm 72443 2008-09-08T14:21:42.664054Z daisuke  $

package Mvalve::Base;
use Moose;
use Mvalve;
use Mvalve::QueueSet;
use Mvalve::Logger;
use Mvalve::Types;
use Time::HiRes;
use Scalar::Util ();

with 'MooseX::KeyedMutex';

has 'logger' => (
    is       => 'rw',
    does     => 'Mvalve::Logger',
    coerce   => 1
);

has 'queue' => (
    is       => 'rw',
    does     => 'Mvalve::Queue',
    required => 1,
    coerce   => 1,
    handles => {
        map { ( "q_$_" => $_ ) }
            qw(next fetch insert clear)
    },
);

{
    my $default = sub {
        my $class = shift;
        return sub {
            Class::MOP::load_class($class);
            $class->new;
        };
    };

    has 'queue_set' => (
        is  => 'rw',
        isa => 'Mvalve::QueueSet',
        required => 1,
        default => $default->( 'Mvalve::QueueSet' )
    );

    has 'state' => (
        is => 'rw',
        does => 'Mvalve::State',
        coerce => 1,
        required => 1,
        default => $default->( 'Mvalve::State::Memory' ),
        handles => {
            map { ("state_$_" => $_) } qw(get set remove incr decr)
        }
    );
}

__PACKAGE__->meta->make_immutable;

no Moose;

sub log {
    my $self = shift;
    my $logger = $self->logger ;
    return () unless $logger;

    $logger->log(@_);
}

sub clear_all {
    my $self = shift;

    foreach my $table ($self->queue_set->all_tables) {
        $self->q_clear($table);
    }
}

sub defer
{
    my( $self, %args ) = @_;

    my $message  = $args{message};
    my $interval = $args{interval} || 0;
    my $duration = $args{duration} ||
        $message->header( &Mvalve::Const::DURATION_HEADER ) ||
        0;

    my $factor = 100_000;
    $interval *= $factor;
    $duration *= $factor;

    if ( ! Scalar::Util::blessed($message) || ! $message->isa( 'Mvalve::Message' ) ) {
        return () ;
    }

    my $qs          = $self->queue_set;
    my $destination = $message->header( &Mvalve::Const::DESTINATION_HEADER );
    my $time_key    = [ $destination, 'retry time' ];
    my $retry_key   = [ $destination, 'retry' ];

    my $done = 0;
    my $rv;
    while (! $done) {
        my $lock = $self->lock( join('.', @$time_key ) );
        next unless $lock;

        $done = 1;

        my $now    = Time::HiRes::time() * $factor;
        my $retry  = int($self->state_get($time_key) || $now);

        # we always prefer duration
        my $offset = $duration || $interval;
        my $myturn = 0;

        if ($retry > $now) {
            $myturn = $retry;
        } else {
            if ( $retry + $offset >= $now ) {
                $myturn = $retry + $offset;
            } else {
                $myturn = $now;
            }
        }
        my $next   = $myturn + $offset;

        $message->header( &Mvalve::Const::RETRY_HEADER, $myturn );

        Mvalve::trace( "defer (retry = $retry)" ) if &Mvalve::Const::MVALVE_TRACE;
        $rv = $self->q_insert( 
            table => $qs->choose_table('timed'),
            data => {
                destination => $destination,
                ready       => $myturn,
                message     => $message->serialize,
            }
        );

        Mvalve::trace( "q_insert results in $rv" ) if &Mvalve::Const::MVALVE_TRACE;

        if ($rv) {
            $self->state_set($time_key, $next);
        }
    }

    return $rv;
}

1;

__END__