Net::MemcacheQ - Net::MemcacheQ documentation


Net-MemcacheQ documentation Contained in the Net-MemcacheQ distribution.

Index


Code Index:

NAME

Top

Net::MemcacheQ

VERSION

Top

$LastChangedRevision$

SYNOPSIS

Top

  my $oNMQ = Net::MemcacheQ->new({
    host => '192.168.0.1',
    port => 22202,
  });

  $oNMQ->push('myqueue', '{"some data":"abcdefg"}');

  my $message = $oNMQ->shift('myqueue');

DESCRIPTION

Top

MemcacheQ implements a BerkeleyDB-backed FIFO message queue service serviced using the Memcache protocol. Net::MemcacheQ provides a simple interface against a single memcacheq instance.

For more information about MemcacheQ, please see: http://memcachedb.org/memcacheq/

SUBROUTINES/METHODS

Top

new - constructor

  my $oNMQ = Net::MemcacheQ->new({...});

  Optional arguments:
  host => 'localhost'  # memcacheq server hostname
  port => 22201        # memcacheq server port

queues - arrayref of queue names

  my $arQueueNames = $oNMQ->queues();

delete_queue - delete a queue, messages and all

  $oNMQ->delete_queue($sQueueName);

push - push a message onto a given queue

  $oNMQ->push($sQueueName, $sQueueMessage);

shift - pull a message from a given queue

  my $sMessage = $oNMQ->shift($sQueueName);

DESTROY - disconnect socket on destruction

DIAGNOSTICS

Top

CONFIGURATION AND ENVIRONMENT

Top

Debugging messages are available by setting:

  $Net::MemcacheQ::DEBUG = $Net::MemcacheQ::DEBUG_INFO;

DEPENDENCIES

Top

strict
warnings
IO::Socket::INET
Readonly
Carp
English -no_match_vars

INCOMPATIBILITIES

Top

BUGS AND LIMITATIONS

Top

See those of memcacheq, in particular about message size.

AUTHOR

Top

$Author: Roger Pettett$

LICENSE AND COPYRIGHT

Top


Net-MemcacheQ documentation Contained in the Net-MemcacheQ distribution.

#########
# Author:        rmp
# Last Modified: $Date$
# Id:            $Id$
# Source:        $Source$
# $HeadURL$
#
package Net::MemcacheQ;
use strict;
use warnings;
use IO::Socket::INET;
use Readonly;
use Carp;
use English qw(-no_match_vars);

Readonly::Scalar our $DEFAULT_HOST    => '127.0.0.1';
Readonly::Scalar our $DEFAULT_PORT    => 22_201;

our $DEBUG      = 0;
our $DEBUG_INFO = 1;
our $VERSION    = '1.04';

sub new {
  my ($class, $ref) = @_;

  if(!$ref) {
    $ref = {};
  }

  bless $ref, $class;
  return $ref;
}

sub _host {
  my ($self) = @_;
  if($self->{host}) {
    return $self->{host};
  }
  return $DEFAULT_HOST;
}

sub _port {
  my ($self) = @_;
  if($self->{port}) {
    return $self->{port};
  }
  return $DEFAULT_PORT;
}

sub _sock {
  my ($self) = @_;

  if($self->{_sock}) {
    return $self->{_sock};
  }

  $self->{_sock} = IO::Socket::INET->new(
					 PeerAddr  => $self->_host,
					 PeerPort  => $self->_port,
					 Proto     => 'tcp',
					) or croak $EVAL_ERROR;
  return $self->{_sock};
}

sub _request {
  my ($self, $txt) = @_;

  my $sock = $self->_sock;
  ($DEBUG == $DEBUG_INFO) and carp q[Socket connected];

  print {$sock} $txt or croak $EVAL_ERROR;
  ($DEBUG == $DEBUG_INFO) and carp qq[Sent '$txt'];

  my $response = q[];

  ($DEBUG == $DEBUG_INFO) and carp q[Going to read response];
  while(my $buf = <$sock>) {
    ($DEBUG == $DEBUG_INFO) and carp qq[Read '$buf'];
    $buf =~ s/[\r\n]+$//smx;
    ($DEBUG == $DEBUG_INFO) and carp qq[Processed '$buf'];

    if($buf =~ /^STAT/smx) {
      #########
      # retain the rest of the line
      #
      $buf      =~ s/^.*?\s//smx;
      if(!ref $response) {
	$response = [];
      }
      push @{$response}, $buf;

    } elsif($buf =~ /^VALUE/smx) {
      #########
      # retain the expected number of bytes from the next line onwwards
      #
      my ($size) = $buf =~ /(\d+)$/smx;
      my $tmp = q[];

      while(my $buf2 = <$sock>) {
	($DEBUG == $DEBUG_INFO) and carp qq[Read '$buf2'];
	if($buf2 =~ /^END/smx) {
	  last;
	}

	$tmp .= $buf2;
      }
      $response = substr $tmp, 0, $size;
      $buf      = 'END';
    }

    if($buf eq 'END' ||
       $buf eq 'STORED') {
      last;
    }
  }

  ($DEBUG == $DEBUG_INFO) and carp q[Finished request];

  return $response;
}

sub queues {
  my ($self)   = @_;
  my $response = $self->_request("stats queue\r\n");
  if(!$response) {
    $response = [];
  }
  return $response;
}

sub delete_queue {
  my ($self, $queuename) = @_;
  my $response = $self->_request("delete $queuename\r\n");
  return $response;
}

sub push { ## no critic (Homonym)
  my ($self, $queuename, $message) = @_;
  my $len = length $message;
  return $self->_request("set $queuename 0 0 $len\r\n$message\r\n");
}

sub shift { ## no critic (Homonym)
  my ($self, $queuename) = @_;
  return $self->_request("get $queuename\r\n");
}

sub DESTROY {
  my ($self) = @_;
  if($self->{_sock}) {
    $self->{_sock}->close;
    delete $self->{_sock};
  }
  return 1;
}

1;
__END__