Queue::Worker - Database based work queue abstraction.


Queue-Worker documentation Contained in the Queue-Worker distribution.

Index


Code Index:

NAME

Top

Queue::Worker - Database based work queue abstraction.

SYNOPSIS

Top

    package MyWorker;
    use base 'Queue::Worker';

    sub name { 'my_work'; }

    sub process {
            my ($self, $item) = @_;
            # do your work here
    }

    # create worker table in db
    MyWorker->create_table($dbh);

    # and somewhere else
    MyWorker->enqueue($dbh, 'some work order string');

    # and finally to run the queue
    MyWorker->run($dbh);

DESCRIPTION

Top

This module provides simple, database queue based, worker abstraction. It provides locking between worker instances using POSIX::RT::Semaphore.

Strings representing work orders are enqueued with enqueue function. Those items are removed from the queue by run function.

METHODS

Top

$class->create_table($dbh, $name)

Creates table queue_worker_$name table in the database. $name parameter is optional: if undef name accessor is used.

$class->enqueue($dbh, $msg)

Enqueues work order $msg into the queue.

$class->new

Creates new instance of the worker. Also creates underlying semaphore.

$class->run($dbh)

Runs the queue. Calls process method on each work item.

$class->get_semaphore

Returns underlying semaphore.

ABSTRACT METHODS

Top

The following methods should be implemented by inherited class.

$class->name

Should return the name of the worker.

$self->process($msg)

Callback to process the work order.

AUTHOR

Top

	Boris Sukholitko
	CPAN ID: BOSU

	boriss@gmail.com




COPYRIGHT

Top

SEE ALSO

Top

POSIX::RT::Semaphore


Queue-Worker documentation Contained in the Queue-Worker distribution.
use strict;
use warnings FATAL => 'all';

package Queue::Worker;
our $VERSION = '0.01';
use POSIX::RT::Semaphore;
use Fcntl;            # O_CREAT, O_EXCL for named semaphore creation

sub create_table {
	my ($class, $dbh, $name) = @_;
	$name ||= $class->name;
	$dbh->do(sprintf(<<'ENDS', $name));
create table queue_worker_%s (id serial primary key, msg text) without oids
ENDS
}

sub enqueue {
	my ($class, $dbh, $msg) = @_;
	$dbh->do(sprintf('insert into queue_worker_%s (msg) values (?)'
				, $class->name), undef, $msg);
}

sub new { 
	my $class = shift;
	my $sem = POSIX::RT::Semaphore->open("/" . $class->name
		, O_CREAT, 0660, 1);
	return bless({ semaphore => $sem }, $class);
}

sub run {
	my ($self, $dbh) = @_;
	my $t = 'queue_worker_' . $self->name;
	my $sql = <<ENDS;
delete from $t where id in (select id from $t order by id limit 1)
	returning msg;
ENDS
	my $cnt = 0;
AGAIN:
	(!$self->{semaphore}->trywait) and goto OUT; # has 0 but true
	eval { for (;;) {
		# delete should not be in transaction: we should always make
		# progress, even if we die. Otherwise, we'll be crashing again
		# and again.
		my $res = $dbh->selectcol_arrayref($sql);
		last unless @$res;
		$self->process($res->[0]);
		$cnt++;
	} };
	$self->{semaphore}->post;
	die "Retrowing: $@" if $@;

	my $more = $dbh->selectcol_arrayref("select id from $t limit 1");
	goto AGAIN if @$more;
OUT:
	return $cnt;
}

sub unlink_semaphore { 
	my $class = shift;
	POSIX::RT::Semaphore->unlink('/' . $class->name);
}

sub get_semaphore {
	return shift()->new->{semaphore};
}

1;