| Queue-Worker documentation | Contained in the Queue-Worker distribution. |
Queue::Worker - Database based work queue abstraction.
package MyWorker;
use base 'Queue::Worker';
sub name { 'my_work'; }
sub process {
my ($self, $item) = @_;
# do your work here
}
# create worker table in db
MyWorker->create_table($dbh);
# and somewhere else
MyWorker->enqueue($dbh, 'some work order string');
# and finally to run the queue
MyWorker->run($dbh);
This module provides simple, database queue based, worker abstraction. It provides locking between worker instances using POSIX::RT::Semaphore.
Strings representing work orders are enqueued with enqueue function. Those
items are removed from the queue by run function.
Creates table queue_worker_$name table in the database. $name parameter
is optional: if undef name accessor is used.
Enqueues work order $msg into the queue.
Creates new instance of the worker. Also creates underlying semaphore.
Runs the queue. Calls process method on each work item.
Unlinks semaphore.
Returns underlying semaphore.
The following methods should be implemented by inherited class.
Should return the name of the worker.
Callback to process the work order.
Boris Sukholitko CPAN ID: BOSU boriss@gmail.com
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
The full text of the license can be found in the LICENSE file included with this module.
| Queue-Worker documentation | Contained in the Queue-Worker distribution. |
use strict; use warnings FATAL => 'all'; package Queue::Worker; our $VERSION = '0.01'; use POSIX::RT::Semaphore; use Fcntl; # O_CREAT, O_EXCL for named semaphore creation
sub create_table { my ($class, $dbh, $name) = @_; $name ||= $class->name; $dbh->do(sprintf(<<'ENDS', $name)); create table queue_worker_%s (id serial primary key, msg text) without oids ENDS }
sub enqueue { my ($class, $dbh, $msg) = @_; $dbh->do(sprintf('insert into queue_worker_%s (msg) values (?)' , $class->name), undef, $msg); }
sub new { my $class = shift; my $sem = POSIX::RT::Semaphore->open("/" . $class->name , O_CREAT, 0660, 1); return bless({ semaphore => $sem }, $class); }
sub run { my ($self, $dbh) = @_; my $t = 'queue_worker_' . $self->name; my $sql = <<ENDS; delete from $t where id in (select id from $t order by id limit 1) returning msg; ENDS my $cnt = 0; AGAIN: (!$self->{semaphore}->trywait) and goto OUT; # has 0 but true eval { for (;;) { # delete should not be in transaction: we should always make # progress, even if we die. Otherwise, we'll be crashing again # and again. my $res = $dbh->selectcol_arrayref($sql); last unless @$res; $self->process($res->[0]); $cnt++; } }; $self->{semaphore}->post; die "Retrowing: $@" if $@; my $more = $dbh->selectcol_arrayref("select id from $t limit 1"); goto AGAIN if @$more; OUT: return $cnt; }
sub unlink_semaphore { my $class = shift; POSIX::RT::Semaphore->unlink('/' . $class->name); }
sub get_semaphore { return shift()->new->{semaphore}; } 1;