/usr/local/CPAN/Devel-RingBuffer/Devel/RingBuffer.pm
#/**
# Shared memory ring buffers for diagnosis/debug of Perl scripts.
# Uses IPC::Mmap to create/access/manage a memory mapped file (or namespace
# on Win32) as a ring buffer structure that can be used by "applications
# under test" that use an appropriate debug module (e.g., Devel::STrace)
# along with an external monitoring application (e.g., Devel::STrace::Monitor).
# <p>
# Note that significant functionality is written in XS/C in order to minimize
# tracing/debugging overhead.
# <p>
# Permission is granted to use this software under the same terms as Perl itself.
# Refer to the <a href='http://perldoc.perl.org/perlartistic.html'>Perl Artistic License</a>
# for details.
#
# @author D. Arnold
# @since 2006-05-01
# @self $self
#*/
package Devel::RingBuffer;
use Carp qw(cluck carp confess);
#use threads;
#use threads::shared;
use Config;
use IPC::Mmap;
use DynaLoader;
use Exporter;
BEGIN {
our @ISA = qw(Exporter DynaLoader);
#
# offset of global fields
#
use constant RINGBUF_SINGLE => 0;
use constant RINGBUF_MSGAREA_SZ => 4;
use constant RINGBUF_BUFFERS => 8;
use constant RINGBUF_SLOTS => 12;
use constant RINGBUF_SLOT_SZ => 16;
use constant RINGBUF_CREATE_STOP => 20;
use constant RINGBUF_CREATE_TRACE => 24;
use constant RINGBUF_GLOBAL_SZ => 28;
use constant RINGBUF_TOTALMSG_SZ => 32;
use constant RINGBUF_GLOBMSG_SZ => 36;
use constant RINGBUF_GLOBAL_MSG => 40;
use constant RINGBUF_RINGHDR_SZ => 40;
#
# offsets of watchlist members
#
use constant RINGBUF_WATCH_INUSE => 0;
use constant RINGBUF_WATCH_EXPRLEN => 4;
use constant RINGBUF_WATCH_EXPR => 8;
use constant RINGBUF_WATCH_READY => 264;
use constant RINGBUF_WATCH_RESLEN => 268;
use constant RINGBUF_WATCH_RESULT => 272;
use constant RINGBUF_WATCH_SZ => 784;
use constant RINGBUF_WATCH_CNT => 4;
use constant RINGBUF_WATCH_EXPRSZ => 256;
use constant RINGBUF_WATCH_RESSZ => 512;
#
# offsets of ring buffer members
#
use constant RINGBUF_PID => 0;
use constant RINGBUF_TID => 4;
use constant RINGBUF_CURRSLOT => 8;
use constant RINGBUF_DEPTH => 12;
use constant RINGBUF_TRACE => 16;
use constant RINGBUF_SIGNAL => 20;
use constant RINGBUF_BASEADDR => 24;
use constant RINGBUF_WATCH_OFFSET => 28;
use constant RINGBUF_BUFHDR_SZ => 28;
use constant RINGBUF_DFLT_SLOTSZ => 214;
use constant RINGBUF_ENTRY_SZ => 200;
use constant RINGBUF_SLOT_PACKSTR => 'l d S/a*';
#
# consts for member indexes
#
use constant RINGBUF_FILENAME => 0;
use constant RINGBUF_SIZE => 1;
use constant RINGBUF_COUNT => 2;
use constant RINGBUF_BUFSIZE => 3;
use constant RINGBUF_SLOT_CNT => 4;
use constant RINGBUF_FLD_TID => 5;
use constant RINGBUF_FLD_PID => 6;
use constant RINGBUF_RING => 7;
use constant RINGBUF_FH => 8;
use constant RINGBUF_FLD_MSGAREA_SZ => 9;
use constant RINGBUF_FLD_GLOBAL_SZ => 10;
use constant RINGBUF_MAP_OFFSET => 11;
use constant RINGBUF_RINGS_OFFSET => 12;
use constant RINGBUF_MAP_ADDR => 13;
use constant RINGBUF_RINGS_ADDR => 14;
use constant RINGBUF_ADDRESS => 15;
use constant RINGBUF_SLOT_SIZE => 16;
use constant RINGBUF_NEXT_IDX => 17;
use constant RINGBUF_RING_WAIT => 0.3;
our @EXPORT = ();
our @EXPORT_OK = ();
our %EXPORT_TAGS = (
ringbuffer_consts => [
qw/RINGBUF_SINGLE RINGBUF_MSGAREA_SZ RINGBUF_BUFFERS RINGBUF_SLOTS
RINGBUF_SLOT_SZ RINGBUF_CREATE_STOP RINGBUF_CREATE_TRACE RINGBUF_GLOBAL_SZ
RINGBUF_TOTALMSG_SZ RINGBUF_GLOBMSG_SZ
RINGBUF_GLOBAL_MSG RINGBUF_RINGHDR_SZ RINGBUF_WATCH_INUSE
RINGBUF_WATCH_EXPRLEN RINGBUF_WATCH_EXPR RINGBUF_WATCH_READY
RINGBUF_WATCH_RESLEN RINGBUF_WATCH_RESULT RINGBUF_WATCH_SZ
RINGBUF_WATCH_CNT RINGBUF_PID RINGBUF_TID RINGBUF_CURRSLOT
RINGBUF_DEPTH RINGBUF_TRACE RINGBUF_SIGNAL RINGBUF_WATCH_OFFSET
RINGBUF_BUFHDR_SZ RINGBUF_DFLT_SLOTSZ RINGBUF_ENTRY_SZ RINGBUF_SLOT_PACKSTR/
],
ringbuffer_members => [
qw/RINGBUF_FILENAME RINGBUF_SIZE RINGBUF_COUNT RINGBUF_BUFSIZE RINGBUF_SLOT_CNT
RINGBUF_FLD_TID RINGBUF_FLD_PID RINGBUF_RING RINGBUF_FH
RINGBUF_FLD_MSGAREA_SZ RINGBUF_FLD_GLOBAL_SZ RINGBUF_MAP_OFFSET
RINGBUF_RINGS_OFFSET RINGBUF_MAP_ADDR RINGBUF_RINGS_ADDR RINGBUF_ADDRESS
RINGBUF_SLOT_SIZE RINGBUF_NEXT_IDX/
],
);
Exporter::export_tags(keys %EXPORT_TAGS);
};
our $VERSION = '0.31';
our $hasThreads;
BEGIN {
if ($Config{useithreads} && (!$ENV{DEVEL_RINGBUF_NOTHREADS})) {
require Devel::RingBuffer::ThreadFacade;
$hasThreads = 1;
}
}
use threads::shared;
use strict;
use warnings;
bootstrap Devel::RingBuffer $VERSION;
use Devel::RingBuffer::Ring;
our $thrdlock = undef;
#/**
# Constructor. Using a combination of the optional C<%args> and
# various environment variables, creates and initializes a
# mmap'ed file in read/write mode with the ring buffer structures.
#
# @param File name of the file to be created for memory mapping.
# @param GlobalSize size of global monitor <=> AUT message buffer.
# @param MessageSize size of per-thread monitor <=> AUT message buffer.
# @param Rings Number of rings to create in the ring buffer.
# @param Slots Number of slots per ring.
# @param SlotSize Slot size in bytes.
# @param StopOnCreate Initial value for stop_on_create flag.
# @param TraceOnCreate Initial value for trace_on_create flag.
#
# @return Devel::RingBuffer object on success; undef on failure
#*/
sub new {
my $class = shift;
my %args = @_;
my $file = $args{File} || $ENV{DEVEL_RINGBUF_FILE};
my $anon;
unless (defined($file)) {
my @paths = split(/[\/\\]/, $0);
$file = pop @paths;
if ($^O eq 'MSWin32') {
$anon = 1;
}
else {
$file = defined($ENV{TEMP}) ? "$ENV{TEMP}/$file" : "/tmp/$file";
}
$file=~s/^(.+)\..+/$1/;
#
# use timestamp sans weekday and year
#
my @pieces = split(/\s+/, scalar localtime);
pop @pieces; # get rid of year
$pieces[0] = $$; # replace weekday w/ PID
$pieces[-1]=~tr/:/_/; # Win32 can't handle colons in filenames
$file .= '.' . join('_', @pieces);
}
#print STDERR "RingBuffer new: args:", join(', ', keys %args), "\n";
my $ringslots = $args{Slots} || $ENV{DEVEL_RINGBUF_SLOTS} || 10;
my $slotsz = $args{SlotSize} || $ENV{DEVEL_RINGBUF_SLOTSZ} || 200;
my $ringcount = $args{Rings} || $ENV{DEVEL_RINGBUF_BUFFERS} || 20;
my $ringmsgsz = $args{MessageSize} || $ENV{DEVEL_RINGBUF_MSGSZ} || 256;
my $globmsgsz = $args{GlobalSize} || $ENV{DEVEL_RINGBUF_GLOBALSZ} || (16 * 1024);
my $create_stop = $args{StopOnCreate} || $ENV{DEVEL_RINGBUF_SOC} || 0;
my $create_trace = $args{TraceOnCreate} || $ENV{DEVEL_RINGBUF_TOC} || 0;
#
# in order to avoid issues with word alignment, we'll always
# force slotsz, msg size, and global size to be word aligned
# (who knows, we may need to be 8 byte aligned on some platforms)
#
$slotsz += (4 - ($slotsz & 3)) if ($slotsz & 3);
$ringmsgsz += (4 - ($ringmsgsz & 3)) if ($ringmsgsz & 3);
$globmsgsz += (4 - ($globmsgsz & 3)) if ($globmsgsz & 3);
my $freemap_offs = RINGBUF_RINGHDR_SZ + $globmsgsz;
my $ringbufsz = _get_ring_size($ringslots, $slotsz, $ringmsgsz);
my $ringsize = _get_total_size($ringcount, $ringslots, $slotsz, $ringmsgsz, $globmsgsz) +
1024; # Win32 needs some extra room
my $self = bless [
$file,
$ringsize,
$ringcount,
$ringbufsz,
$ringslots,
($hasThreads ? Devel::RingBuffer::ThreadFacade->tid() : 0),
$$,
undef,
undef,
$ringmsgsz,
$globmsgsz,
$freemap_offs,
_get_rings_addr(0, $ringcount, $globmsgsz),
$freemap_offs,
_get_rings_addr(0, $ringcount, $globmsgsz),
0,
$slotsz
], $class;
#
# create the mmap'ed ring
#
#cluck "file is $file\n";
if ($anon) {
#
# on Win32 only...anonymous mmap is useless to us on POSIX
#
$self->[RINGBUF_RING] = IPC::Mmap->new($file, $ringsize,
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON)
or die "Can't open mmap file $file: $!";
}
else {
open(FH, ">$file") ||
confess "Can't open mmap file $file: $!";
print FH "\0" x $ringsize;
close FH;
$self->[RINGBUF_RING] = IPC::Mmap->new($file, $ringsize,
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FILE)
or die "Can't open mmap file $file: $!";
}
#
# share the thrdlock
#
if ($hasThreads) {
# print STDERR "we're shared\n";
share($thrdlock);
}
#
# clear the ringbuffer (Win32 needs this)
#
my $ringbuffer = $self->[RINGBUF_RING] ;
my $var = "\0" x ($ringsize - 1024);
$ringbuffer->write($var, 0, $ringsize - 1024);
my $ringslotsz = $ringslots * $slotsz;
#
# then init it
#
return undef
unless $ringbuffer->pack(0, 'l l l l l l l l l',
0, $ringmsgsz, $ringcount, $ringslots, $slotsz, $create_stop, $create_trace, $globmsgsz, 0);
my $addr = $self->[RINGBUF_ADDRESS] = $self->[RINGBUF_RING]->getAddress();
$self->[RINGBUF_MAP_ADDR] += $addr;
$self->[RINGBUF_RINGS_ADDR] += $addr;
my $mapaddr = $self->[RINGBUF_MAP_ADDR];
my $ringsaddr = $self->[RINGBUF_RINGS_ADDR];
#
# let XS do init
#
_free_ring($mapaddr, $ringsaddr, $ringbufsz, $_)
foreach (0..$ringcount-1);
#
# for unknown reasons, the first map doesn't take ... so remap
#
# $self->remmap();
return $self;
}
#/**
# Get the name of the mmap'ed file.
#
# @return the name of the mmap'ed file
#*/
sub getName { return $_[0]->[RINGBUF_FILENAME]; }
#/**
# Get base address of the mmap'ed file.
#
# @return the address of the mmap'ed file
#*/
sub getAddress { return $_[0]->[RINGBUF_ADDRESS]; }
#/**
# Allocate a ring buffer. Should only be used on ringbuffers created with new().
#
# @return a Devel::RingBuffer::Ring object on success.
# If no rings are available, returns undef.
#*/
sub allocate {
my $self = shift;
#
# allocate a ring buffer and init it
#
# unless (($self->[RINGBUF_FLD_TID] == threads->self()->tid()) ||
# ($self->[RINGBUF_FLD_PID] == $$)) {
# On Win32, the fork() emulation means we shouldn't remap!!!
#
if (0) {
unless ($self->[RINGBUF_FLD_PID] == $$) {
#
# this probably isn't needed anymore for threads, but may be for
# processes...
#
my $file = $self->[RINGBUF_FILENAME];
my $ringsize = $self->[RINGBUF_SIZE];
$self->[RINGBUF_RING] = IPC::Mmap->new($file, $ringsize,
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FILE) ||
die "Can't mmap file $file: $!";
$self->[RINGBUF_FLD_TID] = ($hasThreads ? Devel::RingBuffer::ThreadFacade->tid() : 0);
$self->[RINGBUF_FLD_PID] = $$;
}
}
my $ring = 0;
my $ringbuffer = $self->[RINGBUF_RING];
$ringbuffer->lock();
{
lock($thrdlock);
#
# use XS to find free ring (for performance reasons)
#
$ring = _alloc_ring($self->[RINGBUF_MAP_ADDR], $self->[RINGBUF_COUNT]);
}
$ringbuffer->unlock();
my $ringsaddr = $self->[RINGBUF_RINGS_ADDR];
return defined($ring) ?
Devel::RingBuffer::Ring->new(
$self,
_get_ring_addr($self->[RINGBUF_RINGS_ADDR],
$ring,
$self->[RINGBUF_SLOT_CNT],
$self->[RINGBUF_SLOT_SIZE],
$self->[RINGBUF_FLD_MSGAREA_SZ]),
$self->[RINGBUF_ADDRESS],
$ring,
$self->[RINGBUF_SLOT_CNT],
$self->[RINGBUF_FLD_MSGAREA_SZ],
) :
undef;
}
#/**
# Re-allocates a ring buffer. Required to handle threads' CLONE()
# of the existing ring buffer object when a new thread is created.
# C<reallocate()> simply allocates a ring buffer and returns its
# ring number, and its base address; the caller than updates
# an existing ring object with the returned values.
#
# @return the allocated ring index and address
#*/
sub reallocate {
my $self = shift;
my $newring = 0;
my $ringbuffer = $self->[RINGBUF_RING];
$ringbuffer->lock();
{
lock($thrdlock);
#
# use XS to find free ring (for performance reasons)
#
$newring = _alloc_ring($self->[RINGBUF_MAP_ADDR], $self->[RINGBUF_COUNT]);
}
$ringbuffer->unlock();
return defined($newring) ?
($newring,
_get_ring_addr(
$self->[RINGBUF_RINGS_ADDR],
$newring,
$self->[RINGBUF_SLOT_CNT],
$self->[RINGBUF_SLOT_SIZE],
$self->[RINGBUF_FLD_MSGAREA_SZ])) :
();
}
#/**
# Constructor. Opens an existing mmap'd file for read/write
# access (for interactive debuggers)
#
# @param $file optional name of mmap'ed file (or namespace for Win32)
#
# @return Devel::RingBuffer object on success; undef on failure
#*/
sub open {
return _lcl_open(@_, PROT_READ|PROT_WRITE);
}
#/**
# Constructor. Opens an existing mmap'd file for read-only
# access (for simple monitor applications)
#
# @param $file optional name of mmap'ed file (or namespace for Win32)
#
# @return Devel::RingBuffer object on success; undef on failure
#*/
sub monitor {
return _lcl_open(@_, PROT_READ);
}
sub _lcl_open {
my ($class, $file, $mode) = @_;
#
# open twice: first to get config params, then
# to map the whole file
#
# use anonymous open for Win32
#
my $flags = ($^O eq 'MSWin32') ?
MAP_SHARED | MAP_ANON :
MAP_SHARED | MAP_FILE;
my $ringbuffer =
IPC::Mmap->new($file, RINGBUF_RINGHDR_SZ, PROT_READ, $flags) or
die "Can't mmap file $file: $!";
my ($msgareasz, $count, $slots, $slotsz, $stop, $trace, $globmsgsz) =
$ringbuffer->unpack(4, 28, 'l7');
my $freemap_offs = RINGBUF_RINGHDR_SZ + $globmsgsz;
my $ringbufsz = _get_ring_size($slots, $slotsz, $msgareasz);
my $ringsize = _get_total_size($count, $slots, $slotsz, $msgareasz, $globmsgsz) +
1024; # Win32 needs some extra room
$ringbuffer->close();
$ringbuffer = IPC::Mmap->new($file, $ringsize, $mode, $flags)
or die "Can't mmap file $file: $!";
return bless [
$file,
$ringsize,
$count,
$ringbufsz,
$slots,
($hasThreads ? Devel::RingBuffer::ThreadFacade->tid() : 0),
$$,
$ringbuffer,
undef,
$msgareasz,
$globmsgsz,
$freemap_offs,
_get_rings_addr(0, $count, $globmsgsz),
$ringbuffer->getAddress() + $freemap_offs,
_get_rings_addr($ringbuffer->getAddress(), $count, $globmsgsz),
$ringbuffer->getAddress(),
$slotsz
], $class;
}
#/**
# Get the free buffer map
#
# @return list of bytes, one per ring; if and element is 'true', the associated
# ring is free; otherwise the ring is in use.
#*/
sub getMap {
return $_[0]->[RINGBUF_RING]->unpack(
$_[0]->[RINGBUF_MAP_OFFSET],
$_[0]->[RINGBUF_COUNT],
'C' . $_[0]->[RINGBUF_COUNT] );
}
#/**
# Get the RingBuffer global header fields. The fields
# returned include:
# <p>
# <ol>
# <li>single - global control variable
# <li>msgarea_sz - size of per-thread message area
# <li>max_buffer - number of configured rings
# <li>slots - number of slots per ring
# <li>slot_sz - size of each slot (excluding linenumber and timestamp header)
# <li>stop_on_create - 1 => new threads created with signal = 1
# <li>trace_on_create - 1 => new threads created with trace = 1
# <li>global_sz - size of global message buffer
# <li>globmsg_total - size of complete global message contents
# <li>globmsg_sz - size of current global message fragment
# </ol>
#
# @return list of the specified header values
#*/
sub getHeader {
return $_[0]->[RINGBUF_RING]->unpack(0, 40, 'l10');
}
#/**
# Open and return a Devel::RingBuffer::Ring object
# for the specified ring number.
#
# @param $ringnum number of ring to be opened
#
# @return Devel::RingBuffer::Ring object
#*/
sub getRing {
my ($self, $ringnum) = @_;
return Devel::RingBuffer::Ring->open(
$self,
_get_ring_addr(
$self->[RINGBUF_RINGS_ADDR],
$ringnum,
$self->[RINGBUF_SLOT_CNT],
$self->[RINGBUF_SLOT_SIZE],
$self->[RINGBUF_FLD_MSGAREA_SZ]),
$self->[RINGBUF_ADDRESS],
$ringnum,
$self->[RINGBUF_SLOT_CNT],
$self->[RINGBUF_FLD_MSGAREA_SZ]
);
}
#/**
# Get the configured number of slots per ring.
#
# @return the number of slots configured for the ring buffer.
#*/
sub getSlots { return $_[0]->[RINGBUF_SLOT_CNT]; }
#/**
# Get the configured size of slots.
#
# @return the slot size
#*/
sub getSlotSize { return $_[0]->[RINGBUF_SLOT_SIZE]; }
#/**
# Get the number of configured rings.
#
# @return the count of rings
#*/
sub getCount { return $_[0]->[RINGBUF_COUNT]; }
#/**
# Close the ring buffer.
#
# @deprecated
#*/
sub close {
my $self = shift;
my $ring = delete $self->[RINGBUF_RING];
return 1;
}
#/**
# Free a ring. Returns a ring to the free list
#
# @param $ring the ring object to be freed
#*/
sub free {
my ($self, $ring) = @_;
#print STDERR "freeing ring $ring\n";
return 1 unless $self->[RINGBUF_RING];
my $ringbuffer = $self->[RINGBUF_RING];
$ringbuffer->lock();
{
lock($thrdlock);
#
# XS handles everything but the locks
#
_free_ring($self->[RINGBUF_MAP_ADDR],
$self->[RINGBUF_RINGS_ADDR],
$self->[RINGBUF_BUFSIZE],
$ring);
}
$ringbuffer->unlock();
}
#/**
# Get the IPC::Mmap object used to store the ringbuffer.
#
# @return the IPC::Mmap object
#*/
sub getMmap { return $_[0]->[RINGBUF_RING]; }
#
# just check for the current thread/process's ring instance;
# note this can be a lengthy process, since we must
# scan the mmap'd ring buffer headers for matching PID/TID,
# and then free it
#
# !!!DPERECATED!!! We can't permit DESTROY if cloned versions
# might destroy things; just let process run down deal with
# closing the file
#
sub OLDDESTROY {
my $self = shift;
my $tid = ($hasThreads ? Devel::RingBuffer::ThreadFacade->tid() : 0);
return unless $self->[RINGBUF_RING];
print STDERR "RingBuffer DESTROYING in thread $tid\n";
my $ringbuffer = $self->[RINGBUF_RING];
$ringbuffer->lock();
{
lock($thrdlock);
#
# XS handles everything but the locks
#
my $ring = _find_ring($self->[RINGBUF_RINGS_ADDR],
$self->[RINGBUF_BUFSIZE], $self->[RINGBUF_COUNT], $$, $tid);
_free_ring($self->[RINGBUF_MAP_ADDR],
$self->[RINGBUF_RINGS_ADDR],
$self->[RINGBUF_BUFSIZE],
$ring)
if defined($ring);
}
$ringbuffer->unlock();
}
#/**
# Sets the value of the global single field.
#
# @param value to set
#
# @return the prior value of the field.
#*/
sub setSingle {
return $_[0]->[RINGBUF_RING]->pack(0, 'l', $_[1]);
}
#/**
# Gets the value of the global single field.
#
# @return the value of the field.
#*/
sub getSingle {
return $_[0]->[RINGBUF_RING]->unpack(0, 4, 'l');
}
#/**
# Sets the value of the stop_on_create field.
#
# @return the prior value of the field.
#*/
sub setStopOnCreate {
return $_[0]->[RINGBUF_RING]->pack(RINGBUF_CREATE_STOP, 'l', $_[1]);
}
#/**
# Get the value of the stop_on_create field.
#
# @return the current value of the field.
#*/
sub getStopOnCreate {
return $_[0]->[RINGBUF_RING]->unpack(RINGBUF_CREATE_STOP, 4, 'l');
}
#/**
# Sets the value of the trace_on_create field.
#
# @param $trace_on_create value to set
# @return the prior value of the field
#*/
sub setTraceOnCreate {
return $_[0]->[RINGBUF_RING]->pack(RINGBUF_CREATE_TRACE, 'l', $_[1]);
}
#/**
# Get the value of the trace_on_create field.
#
# @return the value of the field
#*/
sub getTraceOnCreate {
return $_[0]->[RINGBUF_RING]->unpack(RINGBUF_CREATE_TRACE, 4, 'l');
}
#/**
# Sets a message into the global message area. Note that
# this operation requires locking the entire ring buffer
# header until the message is completely transfered.
# Messages larger than the configured global message size
# will be transfered in chunks; each chunk must back ACK'd by
# the message receiver.
#
# @param $msg the message to send
#
# @return the RingBuffer object
#*/
sub setGlobalMsg {
my $self = shift;
my $ringbuffer = $self->[RINGBUF_RING];
my $globsz = $self->[RINGBUF_FLD_GLOBAL_SZ];
my $first = 1;
$ringbuffer->lock();
{
lock($thrdlock);
my ($t, $frag) = (0,0);
my $len = length($_[0]);
while ($len) {
#
# may need to fragment
#
$t = ($len > $globsz) ? $globsz : $len;
$ringbuffer->write(substr($_[0], $frag, $t), RINGBUF_GLOBAL_MSG, $t);
$ringbuffer->pack(RINGBUF_GLOBMSG_SZ, 'l', $t);
#
# set this last so reader doesn't read to soon
#
$ringbuffer->pack(RINGBUF_TOTALMSG_SZ, 'l', $len),
$first = undef
if $first;
$len -= $t;
$frag += $t;
#
# wait for ACK that its been read
#
sleep RINGBUF_RING_WAIT,
$t = $ringbuffer->unpack(RINGBUF_GLOBMSG_SZ, 4, 'l')
while $t;
}
$ringbuffer->pack(RINGBUF_TOTALMSG_SZ, 'l', 0);
}
$ringbuffer->unlock();
return $self;
}
#/**
# Gets a message from the global message area. Note that
# this operation B<does not> lock the entire ring buffer
# header, but instead relies on signalling of the message
# chunk lengths.
# Messages larger than the configured global message size
# will be received in chunks; each chunk must back ACK'd by
# the message receiver.
#
# @return the re-assembled global message buffer contents
#
#*/
sub getGlobalMsg {
my $self = shift;
my $ringbuffer = $self->[RINGBUF_RING];
my $globsz = $self->[RINGBUF_FLD_GLOBAL_SZ];
my $result = '';
my $frag;
my $t;
#
# wait for indication that msg is available
#
my $len = $ringbuffer->unpack(RINGBUF_TOTALMSG_SZ, 4, 'l');
sleep RINGBUF_RING_WAIT,
$len = $ringbuffer->unpack(RINGBUF_TOTALMSG_SZ, 4, 'l')
until $len;
while ($len) {
#
# may be fragmented
# wait for length field
#
sleep RINGBUF_RING_WAIT,
$t = $ringbuffer->unpack(RINGBUF_GLOBMSG_SZ, 4, 'l')
until $t;
$ringbuffer->read($frag, RINGBUF_GLOBAL_MSG, $t);
$len -= $t;
$result .= $frag;
#
# ACK it
#
$ringbuffer->pack(RINGBUF_GLOBMSG_SZ, 'l', 0);
}
return $result;
}
1;