| Queue-Q4M documentation | Contained in the Queue-Q4M distribution. |
Queue::Q4M - Simple Interface To q4m
use Queue::Q4M;
my $q = Queue::Q4M->connect(
connect_info => [
'dbi:mysql:dbname=mydb',
$username,
$password
],
);
for (1..10) {
$q->insert($table, \%fieldvals);
}
while ($q->next($table)) {
my ($col1, $col2, $col3) = $q->fetch($table, \@fields);
print "col1 = $col1, col2 = $col2, col3 = $col3\n";
}
while ($q->next($table)) {
my $cols = $q->fetch_arrayref($table, \@fields);
print "col1 = $cols->[0], col2 = $cols->[1], col3 = $cols->[2]\n";
}
while ($q->next($table)) {
my $cols = $q->fetch_hashref($table, \@fields);
print "col1 = $cols->{col1}, col2 = $cols->{col2}, col3 = $cols->{col3}\n";
}
# to use queue_wait(table_cond1,table_cond2,timeout)
while (my $which = $q->next(@table_conds)) {
# $which contains the table name
}
$q->disconnect;
Queue::Q4M is a simple wrapper to q4m, which is an implementation of a queue using mysql.
Creates a new Queue::Q4M instance. Normally you should use connect() instead.
Connects to the target database.
my $q = Queue::Q4M->connect(
connect_info => [
'dbi:mysql:dbname=q4m',
]
);
Blocks until the next item is available. This is equivalent to calling queue_wait() on the given table.
my $which = $q->next( $table_cond1, $table_cond2, $table_cond3 );
Fetches the next available row. Takes a table name and the list of columns to be fetched.
my ($col1, $col2, $col3) = $q->fetch( $table, [ qw(col1 col2 col3) ] );
Same as fetch_array, but fetches using fetchrow_arrayref()
my $arrayref = $q->fetch_arrayref( $table, [ qw(col1 col2 col3) ] );
Same as fetch_array, but fetches using fetchrow_hashref()
my $hashref = $q->fetch_hashref( $table, [ qw(col1 col2 col3) ] );
Inserts into the queue. The first argument should be a scalar specifying a table name. The second argument is a hashref that specifies the mapping between column names and their respective values.
$q->insert($table, { col1 => $val1, col2 => $val2, col3 => $val3 });
For backwards compatibility, you may omit $table if you specified $table in the constructor.
Deletes everything the specified queue. Be careful!
Returns an instance of Queue::Q4M::Status (actually, a subclass there of).
Returns the database handle after making sure that it's connected.
Disconnects.
These are defined as part of Moose infrastructure
The minimum version of q4m that Queue::Q4M supports
Copyright (c) 2008 Daisuke Maki <daisuke@endeworks.jp>
Taro Funaki
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
See http://www.perl.com/perl/misc/Artistic.html
| Queue-Q4M documentation | Contained in the Queue-Q4M distribution. |
# $Id: /mirror/coderepos/lang/perl/Queue-Q4M/trunk/lib/Queue/Q4M.pm 103794 2009-04-13T11:38:30.159603Z daisuke $ # # Copyright (c) 2008 Daisuke Maki <daisuke@endeworks.jp> # All rights reserved. package Queue::Q4M; use Any::Moose; use Any::Moose '::Util::TypeConstraints'; use Carp(); use DBI; use SQL::Abstract; use Queue::Q4M::Status; class_type 'Queue::Q4M::Result'; has 'auto_reconnect' => ( is => 'rw', isa => 'Bool', required => 1, default => 1, ); has 'owner_mode' => ( is => 'rw', isa => 'Bool', default => 0 ); has '_connect_pid' => ( is => 'rw', isa => 'Int' ); has 'connect_info' => ( is => 'rw', isa => 'ArrayRef', required => 1, ); has 'sql_maker' => ( is => 'rw', isa => 'SQL::Abstract', required => 1, default => sub { SQL::Abstract->new } ); has '_dbh' => ( is => 'rw', ); has '__table' => ( is => 'rw', ); has '__res' => ( is => 'rw', # isa => 'Maybe[Queue::Q4M::Result]' ); __PACKAGE__->meta->make_immutable; no Any::Moose; no Any::Moose '::Util::TypeConstraints'; our $AUTHORITY = 'cpan:DMAKI'; our $VERSION = '0.00019'; use constant Q4M_MINIMUM_VERSION => '0.8'; sub connect { my $self = shift; if (! ref $self) { $self = $self->new(@_); } if (my $old = $self->_dbh()) { $old->disconnect(); } my $dbh = $self->_connect(); $self->_dbh( $dbh ); # Make sure we have the minimum supported API version # (or, a Q4M enabled mysql, for that matter) my $version; eval { my $sth = $dbh->prepare(<<' EOSQL'); SELECT PLUGIN_VERSION from information_schema.plugins WHERE plugin_name = ? EOSQL $sth->execute('QUEUE'); $sth->bind_columns(\$version); $sth->fetchrow_arrayref; $sth->finish; }; warn if $@; if (! $version || $version < Q4M_MINIMUM_VERSION) { Carp::confess( "Connected database does not meet the minimum required q4m version (" . Q4M_MINIMUM_VERSION . "). Got version " . (defined $version ? $version : '(undef)' ) ); } $self; } sub _connect { my $self = shift; return DBI->connect(@{ $self->connect_info }); } sub dbh { my $self = shift; my $dbh = $self->_dbh; my $pid = $self->_connect_pid; if ( ($pid || '') ne $$ || ! $dbh || ! $dbh->ping) { $self->auto_reconnect or die "not connect"; $dbh = $self->_connect(); $self->_dbh( $dbh ); $self->_connect_pid($$); } return $dbh; } sub next { my $self = shift; my @args = @_; # First, undef any cached table name that we might have had $self->__table(undef); my @tables = grep { !/^\d+$/ } map { (my $v = $_) =~ s/:.*$//; $v } @args ; # Cache this statement handler so we don't unnecessarily create # string or handles my $dbh = $self->dbh; my $sql = sprintf( "SELECT queue_wait(%s)", join(',', (('?') x scalar(@args))) ); my ($index) = $dbh->selectrow_array($sql, undef, @args); my $table = defined $index && $index > 0 ? $tables[$index - 1] : undef; my $res = Queue::Q4M::Result->new( rv => defined $table, table => $table, on_release => sub { $self->__table(undef) } ); if (defined $table) { $self->__table($table); } $self->__res($res) if $res; $self->owner_mode(1); return $res; } *fetch = \&fetch_array; BEGIN { foreach my $type qw(array arrayref hashref) { eval sprintf( <<'EOSUB', $type, $type ); sub fetch_%s { my $self = shift; my $table = shift; $table ||= $self->__table; if (Scalar::Util::blessed $table && $table->isa('Queue::Q4M::Result')) { $table = $table->[1]; } $table or die "no table"; my ($sql, @bind) = $self->sql_maker->select($table, @_); my $dbh = $self->dbh; $self->owner_mode(0); return $dbh->selectrow_%s($sql, undef, @bind); } EOSUB die if $@; } } sub insert { my $self = shift; my $table = shift; my ($sql, @bind) = $self->sql_maker->insert($table, @_); my $dbh = $self->dbh; my $sth = $dbh->prepare($sql); my $rv = $sth->execute(@bind); $sth->finish; return $rv; } sub disconnect { my $self = shift; my $dbh = $self->dbh; if ($dbh) { $dbh->do("select queue_end()"); $dbh->disconnect; $self->_dbh(undef); } } sub clear { my ($self, $table) = @_; return $self->dbh->do("DELETE FROM $table"); } sub status { return Queue::Q4M::Status->fetch( shift->dbh ); } sub DEMOLISH { my $self = shift; local $@; eval { $self->dbh->do("SELECT queue_abort()") if $self->owner_mode; $self->disconnect; }; } package Queue::Q4M::Result; use overload bool => \&as_bool, '""' => \&as_string, fallback => 1 ; use Scope::Guard; sub new { my $class = shift; my %args = @_; return bless [ $args{rv}, $args{table}, Scope::Guard->new( $args{on_release} ) ], $class; } sub as_bool { $_[0]->[0] } sub as_string { $_[0]->[1] } sub DESTROY { $_[0]->[2]->dismiss(1) if $_[0]->[2] } 1; __END__