Spread::Queue::Sender - submit messages to Spread::Queue message queues


Spread-Queue documentation Contained in the Spread-Queue distribution.

Index


Code Index:

NAME

Top

Spread::Queue::Sender - submit messages to Spread::Queue message queues

SYNOPSIS

Top

  use Spread::Queue::Sender;

  my $sender = new Spread::Queue::Sender(QUEUE => "myqueue");

  $sender->submit({ name => "value" });
  my $response = $sender->receive;

or

  my $response = $sender->rpc({ name => "value" });

DESCRIPTION

Top

A Spread::Queue::Sender can submit messages for queued delivery to the first available Spread::Queue::Worker. The sqm queue manager must be running to receive and route messages.

Spread::Queue messages are Perl hashes, serialized by Data::Serializer, by default using Data::Denter.

Spread::Queue does not enforce structure on message contents.

METHODS

Top

  my $serlzr = new Data::Serialization(serializer => "YAML");
  my $sender = new Spread::Queue::Sender(QUEUE => "myqueue",
					 SERIALIZER => $serlzr);

Establish Spread session for transmitting messages to a queue of workers. The SERIALIZER parameter is optional, by default using Data::Denter.

  $sender->submit($data);

$data should be a hashref, which will be serialized and published as a Spread message.

  my $response = $sender->receive($timeout);

Wait for an incoming message on the sender's private Spread address. This is just a pass-through to Spread::Session::receive.

  my $response = $sender->rpc($data [, $timeout]);

RPC-style invocation of a remote operation. Waits $timeout seconds for a response (returns undef if no response arrives).

  $sender->status;

Transmit an administrative status request to the queue manager.


Spread-Queue documentation Contained in the Spread-Queue distribution.
package Spread::Queue::Sender;

require 5.005_03;
use strict;
use vars qw($VERSION);
$VERSION = '0.4';

use Spread::Session;
use Data::Serializer;
use Carp;
use Log::Channel;
use Digest::MD5;
use Time::HiRes;

my $DEFAULT_TIMEOUT = 5;

BEGIN {
    my $sqslog = new Log::Channel;
    sub sqslog { $sqslog->(@_) }
}

my $SingleSession;

sub new {
    my $proto = shift;
    my $class = ref ($proto) || $proto;

    my %config = @_;
    my $self  = \%config;
    bless ($self, $class);

    # configuration options: override default timeout

    $self->{QUEUE} = $ENV{SPREAD_QUEUE} unless $self->{QUEUE};
    croak "Queue name is required" unless $self->{QUEUE};

    $self->{MQNAME} = "MQ_$self->{QUEUE}";

    if ($SingleSession) {
	$self->{SESSION} = $SingleSession;
    } else {
	$self->{SESSION} = new Spread::Session (
						MESSAGE_CALLBACK => \&_message_callback,
						TIMEOUT_CALLBACK => \&_timeout_callback,
					       );
	$SingleSession = $self->{SESSION};
    }

    if (! $self->{SERIALIZER}) {
	$self->{SERIALIZER} = new Data::Serializer(serializer => "Data::Denter");
    }
    my $serlzr = $self->{SERIALIZER}->serializer;

    sqslog "Message queue submitter initialized on $self->{QUEUE}, using $serlzr\n";

    return $self;
}

my $Outbound;

sub submit {
    my ($self, $payload) = @_;

    my $content = $self->{SERIALIZER}->serialize($payload);
#    my $digest = Digest::MD5::md5($content);
    $self->{SESSION}->publish($self->{MQNAME},
			      $content);
    $Outbound = Time::HiRes::gettimeofday;
}

sub receive {
    my $self = shift;
				# a 0-sec timeout is not the same as undef
    my $timeout = defined $_[0] ? shift : $DEFAULT_TIMEOUT;

    my $msg = $self->{SESSION}->receive($timeout, $self);

    if ($msg->{type} eq "ack") {
	# this is an acknowledgement for an earlier outbound msg
	my $elapsed = Time::HiRes::gettimeofday - $Outbound;
	sqslog "Elapsed seconds: $elapsed\n";
	return $self->receive($timeout);
    } else {
	return $msg->{body};
    }
}

sub rpc {
    my ($self, $payload, $timeout) = @_;

    $self->submit($payload);
    return $self->receive($timeout);
}


sub _message_callback {
    my ($msg, $self) = @_;

    return $self->{SERIALIZER}->deserialize($msg->{BODY});
}

sub _timeout_callback {
    my ($self) = @_;

    return;
}

sub status {
    my ($self, $content) = @_;

    $self->{SESSION}->publish($self->{MQNAME},
			      "^^status");
    $Outbound = Time::HiRes::gettimeofday;
}


1;