| Tibco-Rv documentation | Contained in the Tibco-Rv distribution. |
Tibco::Rv::Queue - Tibco Queue event-managing object
$queue = new Tibco::Rv::Queue;
$queue->name( 'myQueue' );
$queue->createListener( subject => 'ABC', callback => sub { } );
while ( 1 ) { $queue->dispatch }
A Tibco::Rv::Queue manages events waiting to be dispatched.
%args:
policy => $policy,
maxEvents => $maxEvents,
discardAmount => $discardAmount,
name => $name,
priority => $priority,
hook => undef
Creates a Tibco::Rv::Queue. If not specified, policy defaults to
Tibco::Rv::Queue::DEFAULT_POLICY (discard none), maxEvents defaults to
0 (unlimited), discardAmount defaults to 0, name defaults to 'tibrvQueue',
priority defaults to Tibco::Rv::Queue::DEFAULT_PRIORITY (1), and hook
defaults to undef (no hook).
The settings policy, maxEvents, and discardAmount are described under the limitPolicy method. The name setting is described under the name method, priority is described under the priority method, and hook is described under the hook method.
%args:
transport => $transport,
subject => $subject,
callback => sub { ... }
Creates a Tibco::Rv::Listener with this $queue as
the queue. See the Listener constructor
for more details.
%args:
interval => $interval,
callbcak => sub { ... }
Creates a Tibco::Rv::Timer with this $queue as the
queue. See the Timer constructor for more
details.
%args:
socketId => $socketId,
ioType => $ioType,
callback => sub { ... }
Creates a Tibco::Rv::IO with this $queue as the queue.
See the IO constructor for more details.
%args:
name => $name,
idleTimeout => $idleTimeout
Creates a Tibco::Rv::Dispatcher with this $queue
as the dispatchable. See the
Dispatcher constructor for more
details.
Dispatch a single event. If there are no events currently on the queue, then this method blocks until an event arrives.
Dispatch a single event if there is at least one event waiting on the queue. If there are no events on the queue, then this call returns immediately. Returns a Tibco::Rv::OK Status object if an event was dispatched, or Tibco::Rv::TIMEOUT if there were no events on the queue.
Dispatches a single event if there is at least one event waiting on the
queue, or if an event arrives before $timeout seconds have passed. In
either case, returns Tibco::Rv::OK. If $timeout is reached before
dispatching an event, returns Tibco::Rv::TIMEOUT. If Tibco::Rv::WAIT_FOREVER
is passed as $timeout, behaves the same as dispatch. If
Tibco::Rv::NO_WAIT is passed as $timeout, behaves the same as poll.
Returns the number of events waiting on the queue.
Returns or sets the three limitPolicy parameters. $policy is described
in the Constants|"CONSTANTS" section below. $maxEvents is the maximum
number of events allowed in the queue. 0 represents unlimited events. If
$maxEvents is greater than 0, then events are discarded according to the
other two limitPolicy parameters. $discardAmount is the number of events
to discard when $queue reaches its $maxEvents limit.
Returns the name of $queue.
Sets $queue's name to $name. The queue's name appears in advisory
messages concerning queues, so it should be set to a unique value in order
to assist troubleshooting. If $name is undef, sets name to ''.
Returns the $queue's priority.
Sets the $queue's priority. Within a queue group, queues with higher
priorities have their events dispatched before queues with lower priorities.
The default setting is 1. 0 is the lowest possible priority.
Returns the $queue's event arrival hook.
Set the $queue's event arrival hook to the given sub reference. This
hook is called every time an event is added to the queue.
Destroys this queue and discards all events left on the queue. If the
optional $callback is specified, then it (which should be a sub
reference) will be called after all event callbacks currently being
dispatched from this queue finish. Called automatically when $queue
goes out of scope. Calling DESTROY more than once has no effect.
These constants control the queue's behaviour when it overflows and in how different queues are dispatched relative to each other.
The DISCARD_* policies determine which events will be discarded when the
queue reaches its maxEvents limit (set by limitPolicy). DISCARD_NONE
should be used when the queue has no limit. DISCARD_NEW causes the event
that would otherwise cause the queue to overflow its limit to be discarded.
DISCARD_FIRST causes the oldest event (the one that would otherwise be
dispatched next) to be discarded. DISCARD_LAST casues the youngest event
to be discarded.
DEFAULT_PRIORITY is the default priority given to queues when they are created. Queues with higher priorities are dispatched before queues with lower priorities, when multiple priorities are in the same queue group.
The Default Queue is a queue that is automatically created when a new
Tibco::Rv object is created. It is available as
$Tibco::Rv::Queue::DEFAULT. It never discards events and has a priority
of 1. Advisories pertaining to queue overflow are placed on this queue.
Paul Sturm <sturm@branewave.com>
| Tibco-Rv documentation | Contained in the Tibco-Rv distribution. |
package Tibco::Rv::Queue; use vars qw/ $VERSION $DEFAULT @CARP_NOT /; $VERSION = '1.14'; use constant DEFAULT_QUEUE => 1; use constant DISCARD_NONE => 0; use constant DISCARD_NEW => 1; use constant DISCARD_FIRST => 2; use constant DISCARD_LAST => 3; use constant DEFAULT_POLICY => 0; use constant DEFAULT_PRIORITY => 1; use Tibco::Rv::Listener; use Tibco::Rv::Timer; use Tibco::Rv::IO; use Tibco::Rv::Cm::Listener; use Tibco::Rv::Dispatcher; @CARP_NOT = qw/ Tibco::Rv::Listener Tibco::Rv::Timer Tibco::Rv::IO Tibco::Rv::Cm::Listener Tibco::Rv::Dispatcher /; use Inline with => 'Tibco::Rv::Inline'; use Inline C => 'DATA', NAME => __PACKAGE__, VERSION => $Tibco::Rv::Inline::VERSION; my ( @limitProperties, %defaults ); BEGIN { @limitProperties = qw/ policy maxEvents discardAmount /; %defaults = ( policy => DEFAULT_POLICY, maxEvents => 0, discardAmount => 0, name => 'tibrvQueue', priority => DEFAULT_PRIORITY, hook => undef ); } sub new { my ( $proto ) = shift; my ( %args ) = @_; map { Tibco::Rv::die( Tibco::Rv::INVALID_ARG ) unless ( exists $defaults{$_} ) } keys %args; my ( %params ) = ( %defaults, %args ); my ( $class ) = ref( $proto ) || $proto; my ( $self ) = $class->_new; my ( $status ) = Queue_Create( $self->{id} ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); $self->limitPolicy( @params{ qw/ policy maxEvents discardAmount / } ) if ( $params{policy} != DISCARD_NONE or $params{maxEvents} != 0 or $params{discardAmount} != 0 ); $self->name( $params{name} ) if ( $params{name} ne 'tibrvQueue' ); $self->priority( $params{priority} ) if ( $params{priority} != 1 ); $self->hook( $params{hook} ) if ( defined $params{hook} ); return $self; } sub _new { my ( $class, $id ) = @_; return bless { id => $id, %defaults }, $class; } sub _adopt { my ( $proto, $id ) = @_; my ( $class ) = ref( $proto ); return $proto->_new( $id ) unless ( $class ); $proto->DESTROY; @$proto{ 'id', keys %defaults } = ( $id, values %defaults ); } sub createListener { my ( $self, %args ) = @_; return new Tibco::Rv::Listener( queue => $self, %args ); } sub createTimer { my ( $self, %args ) = @_; return new Tibco::Rv::Timer( queue => $self, %args ); } sub createIO { my ( $self, %args ) = @_; return new Tibco::Rv::IO( queue => $self, %args ); } sub createCmListener { my ( $self, %args ) = @_; return new Tibco::Rv::Cm::Listener( queue => $self, %args ); } sub createDispatcher { my ( $self, %args ) = @_; return new Tibco::Rv::Dispatcher( dispatchable => $self, %args ); } sub dispatch { my ( $self ) = @_; $self->timedDispatch( Tibco::Rv::WAIT_FOREVER ); } sub poll { my ( $self ) = @_; return $self->timedDispatch( Tibco::Rv::NO_WAIT ); } sub timedDispatch { my ( $self, $timeout ) = @_; my ( $status ) = tibrvQueue_TimedDispatch( $self->{id}, $timeout ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK or $status == Tibco::Rv::TIMEOUT ); return new Tibco::Rv::Status( status => $status ); } sub count { my ( $self ) = @_; my ( $count ); my ( $status ) = Queue_GetCount( $self->{id}, $count ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); return $count; } sub limitPolicy { my ( $self ) = shift; return @_ ? $self->_setLimitPolicy( @_ ) : @$self{ @limitProperties }; } sub _setLimitPolicy { my ( $self, %policy ); ( $self, @policy{ @limitProperties } ) = @_; my ( $status ) = tibrvQueue_SetLimitPolicy( $self->{id}, @policy{ @limitProperties } ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); return @$self{ @limitProperties } = @policy{ @limitProperties }; } sub name { my ( $self ) = shift; return @_ ? $self->_setName( @_ ) : $self->{name}; } sub _setName { my ( $self, $name ) = @_; $name = '' unless ( defined $name ); my ( $status ) = tibrvQueue_SetName( $self->{id}, $name ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); return $self->{name} = $name; } sub priority { my ( $self ) = shift; return @_ ? $self->_setPriority( @_ ) : $self->{priority}; } sub _setPriority { my ( $self, $priority ) = @_; my ( $status ) = tibrvQueue_SetPriority( $self->{id}, $priority ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); return $self->{priority} = $priority; } sub hook { my ( $self ) = shift; return @_ ? $self->_setHook( @_ ) : $self->{hook}; } sub _setHook { my ( $self, $hook ) = @_; die "hook not supported"; # see BUGS $self->{hook} = $hook; my ( $status ) = Tibco::Rv::Event::Queue_SetHook( $self->{id}, $self->{hook} ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); return $self->{hook}; } sub DESTROY { my ( $self, $callback ) = @_; return unless ( exists $self->{id} ); my ( $id ) = $self->{id}; delete @$self{ keys %$self }; return if ( $id == DEFAULT_QUEUE ); my ( $status ) = Tibco::Rv::Event::Queue_DestroyEx( $id, $callback ); Tibco::Rv::die( $status ) unless ( $status == Tibco::Rv::OK ); } BEGIN { $DEFAULT = Tibco::Rv::Queue->_adopt( DEFAULT_QUEUE ) } 1;
__DATA__ __C__ tibrv_status tibrvQueue_TimedDispatch( tibrvQueue queue, tibrv_f64 timeout ); tibrv_status tibrvQueue_SetLimitPolicy( tibrvQueue queue, tibrvQueueLimitPolicy policy, tibrv_u32 maxEvents, tibrv_u32 discardAmount );tibrv_status tibrvQueue_SetName( tibrvQueue queue, const char * name ); tibrv_status tibrvQueue_SetPriority( tibrvQueue queue, tibrv_u32 priority ); tibrv_status Queue_Create( SV * sv_queue ) { tibrvQueue queue = (tibrvQueue)NULL; tibrv_status status = tibrvQueue_Create( &queue ); sv_setiv( sv_queue, (IV)queue ); return status; } tibrv_status Queue_GetCount( tibrvQueue queue, SV * sv_count ) { tibrv_u32 count; tibrv_status status = tibrvQueue_GetCount( queue, &count ); sv_setuv( sv_count, (UV)count ); return status; }