| Beanstalk-Client documentation | Contained in the Beanstalk-Client distribution. |
Beanstalk::Client - Client class to talk to beanstalkd server
use Beanstalk::Client;
my $client = Beanstalk::Client->new(
{ server => "localhost",
default_tube => 'mine',
}
);
# Send a job with explicit data
my $job = $client->put(
{ data => "data",
priority => 100,
ttr => 120,
delay => 5,
}
);
# Send job, data created by encoding @args. By default with YAML
my $job2 = $client->put(
{ priority => 100,
ttr => 120,
delay => 5,
},
@args
);
# Send job, data created by encoding @args with JSON
use JSON::XS;
$client->encoder(sub { encode_json(\@_) });
my $job2 = $client->put(
{ priority => 100,
ttr => 120,
delay => 5,
},
@args
);
# fetch a job
my $job3 = $client->reserve;
Beanstalk::Client provides a Perl API of protocol version 1.0 to the beanstalkd server, a fast, general-purpose, in-memory workqueue service by Keith Rarick.
The constructor accepts a single argument, which is a reference to a hash containing options. The options can be any of the accessor methods listed below.
Get/set the hostname, and port, to connect to. The port, which defaults to 11300, can be
specified by appending it to the hostname with a : (eg "localhost:1234").
(Default: localhost:11300)
Get the socket connection to the server.
Set/get a default value, in seconds, for job delay. A job with a delay will be
placed into a delayed state and will not be placed into the ready queue until
the time period has passed. This value will be used by put and release as
a default. (Default: 0)
Set/get a default value, in seconds, for job ttr (time to run). This value will
be used by put as a default. (Default: 120)
Set/get a default value for job priority. The highest priority job is the job
where the priority value is the lowest (ie jobs with a lower priority value are
run first). This value will be used by put, release and bury as a
default. (Default: 10000)
Set/get serialization encoder. $encoder is a reference to a subroutine
that will be called when arguments to put need to be encoded to send
to the beanstalkd server. The subroutine should accept a list of arguments and
return a string representation to pass to the server. (Default: YAML::Syck::Dump)
Set/get the serialization decoder. $decoder is a reference to a
subroutine that will be called when data from the beanstalkd server needs to be
decoded. The subroutine will be passed the data fetched from the beanstalkd
server and should return a list of values the application can use.
(Default: YAML::Syck::Load)
Fetch the last error that happened.
Get/set timeout, in seconds, to use for the connect to the server.
Set/get the name of a default tube to put jobs into and fetch from.
By default a connection to a beanstalkd server will put into the default
queue and also watch the default queue. If default_tube is set when
connect is called the connection will be initialized so that put will put
into the given tube and reserve will fetch jobs from the given tube.
(Default: none)
Set/get debug value. If set to a true value then all communication with the server will be
output with warn
These methods are used by clients that are placing work into the queue
Insert job into the currently used tube. Options may be
priority to use to queue the job. Jobs with smaller priority values will be scheduled before jobs with larger priorities. The most urgent priority is 0
Defaults to $self-priority>
An integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time
Defaults to $self-delay>
"time to run" - An integer number of seconds to allow a worker
to run this job. This time is counted from the moment a worker reserves
this job. If the worker does not delete, release, or bury the job within
ttr seconds, the job will time out and the server will release the job.
The minimum ttr is 1. If the client sends 0, the server will silently
increase the ttr to 1.
The job body. Defaults to the result of calling the current encoder passing @args
Change tube that new jobs are inserted into
Reserve a job from the list of tubes currently being watched.
Returns a Beanstalk::Job on success. $timeout is the maximum number
of seconds to wait for a job to become ready. If $timeout is not given then the client
will wait indefinitely.
Returns undef on error or if $timeout expires.
Delete the specified job.
Release the specified job. Valid options are
New priority to assign to the job
An integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time
The bury command puts a job into the "buried" state. Buried jobs are put into a FIFO linked list and will not be touched by the server again until a client kicks them with the "kick" command.
Valid options are
New priority to assign to the job
Calling touch with the id of a reserved job will reset the time left for the job to complete
back to the original ttr value.
Specifies a tube to add to the watch list. If the tube doesn't exist, it will be created
Stop watching $tube
Watch only the list of given tube names
Connect to server. If sucessful, set the tube to use and tube to watch if
a default_tube was specified.
Disconnect from server. socket method will return undef.
Disconnect from server. socket method will return undef.
Peek at the job id specified. If the job exists returns a Beanstalk::Job object. Returns
undef on error or if job does not exist.
Peek at the first job that is in the ready queue. If there is a job in the
ready queue returns a Beanstalk::Job object. Returns undef
on error or if there are no ready jobs.
Peek at the first job that is in the delayed queue. If there is a job in the
delayed queue returns a Beanstalk::Job object. Returns undef
on error or if there are no delayed jobs.
Peek at the first job that is in the buried queue. If there is a job in the
buried queue returns a Beanstalk::Job object. Returns undef
on error or if there are no buried jobs.
The kick command applies only to the currently used tube. It moves jobs into
the ready queue. If there are any buried jobs, it will only kick buried jobs.
Otherwise it will kick delayed jobs. The server will not kick more than $bound
jobs. Returns the number of jobs kicked, or undef if there was an error.
Return stats for the specified job $id. Returns undef on error.
If the job exists, the return will be a Stats object with the following methods
Return stats for the specified tube $tube. Returns undef on error.
If the tube exists, the return will be a Stats object with the following methods
Returns a list of tubes
Returns the current tube being used. This is the tube which put will place jobs.
Returns a list of tubes being watched, or the number of tubes being watched in a scalar context.
These are the tubes that reserve will check to find jobs. On error an empty list, or undef in
a scalar context, will be returned.
Pause from reserving any jobs in $tube for $delay seconds.
Returns true on success and undef on error.
More tests
Large parts of this documention were lifted from the documention that comes with beanstalkd
http://xph.us/software/beanstalkd/
Beanstalk::Pool, Beanstalk::Job, Beanstalk::Stats
Graham Barr <gbarr@pobox.com>
Copyright (C) 2008 by Graham Barr.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| Beanstalk-Client documentation | Contained in the Beanstalk-Client distribution. |
package Beanstalk::Client; use strict; use warnings; use base qw(Class::Accessor::Fast); use YAML::Syck; use Socket; use IO::Socket::INET; use Beanstalk::Job; use Beanstalk::Stats; our $VERSION = "1.06"; # use namespace::clean; our $CRLF = "\015\012"; our $MSG_NOSIGNAL = eval { Socket::MSG_NOSIGNAL() } || 0; BEGIN { __PACKAGE__->mk_accessors( qw( connect_timeout debug decoder default_tube delay encoder error priority server socket ttr _watching _using ) ); } # no namespace::clean; sub _interact { my ($self, $cmd, $data) = @_; my $sock = $self->socket || $self->connect or return; local $SIG{PIPE} = "IGNORE" unless $MSG_NOSIGNAL; my $debug = $self->debug; warn $cmd ."\n" if $debug; $cmd .= $CRLF; $cmd .= $data . $CRLF if defined $data; my $offset = 0; WRITE: { my $sent = send($sock, substr($cmd, $offset), $MSG_NOSIGNAL); if ($sent) { $offset += $sent; redo WRITE if $offset < length($cmd); } else { $self->error("$!"); redo WRITE if $!{EINTR} and fileno($sock); return $self->disconnect; } } my $buffer; $offset = 0; READ: { my $read = sysread($sock, $buffer, 1024, $offset); if ($read) { if ($buffer =~ /^([^\015\012]+)\015\012/) { $self->{_recv_buffer} = substr($buffer, 2 + length($1)); warn $1,"\n" if $debug; return split(' ', $1); } $offset += length $buffer; redo READ; } else { $self->error("$!"); redo READ if $!{EINTR} and fileno($sock); } } $self->disconnect; return; } sub _recv_data { my ($self, $bytes) = @_; my $sock = $self->socket; my $need = $bytes + 2; # count CRLF my $offset = length($self->{_recv_buffer}); my $more = $need - $offset; READ: while ($more > 0) { my $read = sysread($sock, $self->{_recv_buffer}, $more, $offset); if ($read) { $offset += $read; $more -= $read; last if $more == 0; redo READ; } else { redo READ if $!{EINTR}; $self->error("$!"); return $self->disconnect; } } warn substr($self->{_recv_buffer}, 0, $bytes),"\n" if $self->debug; return substr($self->{_recv_buffer}, 0, $bytes); } sub _interact_yaml_resp { my ($self, $cmd) = @_; my @resp = _interact($self, $cmd) or return; if ($resp[0] eq 'OK') { my $data = _recv_data($self, $resp[1]) or return undef; return YAML::Syck::Load($data); } $self->error(join ' ', @resp); return undef; } sub _interact_stats { my $ret = _interact_yaml_resp(@_) or return undef; return Beanstalk::Stats->new($ret); } sub _peek { my $self = shift; my $cmd = shift; my @resp = _interact($self, $cmd) or return undef; if ($resp[0] eq 'FOUND') { my $data = _recv_data($self, $resp[2]); return undef unless defined $data; return Beanstalk::Job->new( { id => $resp[1], client => $self, data => $data, } ); } $self->error(join ' ', @resp); return undef; } sub __watching { my $self = shift; my $watching = $self->_watching; return $watching if $watching; $self->list_tubes_watched; $self->_watching; } # use namespace::clean; sub new { my $proto = shift; my $fields = shift || {}; my $self = $proto->SUPER::new( { delay => 0, ttr => 120, priority => 10_000, encoder => \&YAML::Syck::Dump, decoder => \&YAML::Syck::Load, %$fields, } ); $self->{_recv_buffer} = ''; $self; } sub connect { my $self = shift; my $server = $self->server || "127.0.0.1"; $server .= ":11300" unless $server =~ /:/; my $timeout = $self->connect_timeout; my $sock = IO::Socket::INET->new( PeerAddr => $server, Timeout => $timeout, ); unless ($sock) { $self->error("connect: $@"); return $self->disconnect; } $self->socket($sock); my $was_watching = $self->_watching; my $was_using = $self->_using; $self->list_tubes_watched; if ($was_watching) { $self->watch_only(keys %$was_watching) or return $self->disconnect; } elsif (my $default_tube = $self->default_tube) { $self->use($default_tube) && $self->watch_only($default_tube) or return $self->disconnect; } if (defined $was_using) { $self->use($was_using) or return $self->disconnect; } $sock; } sub disconnect { my $self = shift; if (my $sock = $self->socket) { close($sock); } $self->socket(undef); } sub quit { shift->disconnect; return 1; } sub put { my $self = shift; my $opt = shift || {}; my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority; my $ttr = exists $opt->{ttr} ? $opt->{ttr} : $self->ttr; my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay; my $data = exists $opt->{data} ? $opt->{data} : $self->encoder->(@_); utf8::encode($data) if utf8::is_utf8($data); # need bytes my $bytes = length($data); my @resp = _interact($self, "put $pri $delay $ttr $bytes", $data) or return undef; if ($resp[0] =~ /( INSERTED | BURIED )/x) { return Beanstalk::Job->new( { id => $resp[1], client => $self, buried => $1 eq 'BURIED' ? 1 : 0, data => $data, } ); } $self->error(join ' ', @resp); return undef; } sub stats { my $self = shift; _interact_stats($self, "stats"); } sub stats_tube { my $self = shift; my $tube = @_ ? shift: 'default'; _interact_stats($self, "stats-tube $tube"); } sub stats_job { my $self = shift; my $id = shift || 0; _interact_stats($self, "stats-job $id"); } sub kick { my $self = shift; my $bound = shift || 1; my @resp = _interact($self, "kick $bound") or return undef; return $resp[1] if $resp[0] eq 'KICKED'; $self->error(join ' ', @resp); return undef; } sub use { my $self = shift; my $tube = shift; my @resp = _interact($self, "use $tube") or return undef; return $self->_using($resp[1]) if $resp[0] eq 'USING'; $self->error(join ' ', @resp); return undef; } sub reserve { my $self = shift; my $timeout = shift; my $cmd = defined($timeout) ? "reserve-with-timeout $timeout" : "reserve"; my @resp = _interact($self, $cmd) or return undef; if ($resp[0] eq 'RESERVED') { my $data = _recv_data($self, $resp[2]); return undef unless defined $data; return Beanstalk::Job->new( { id => $resp[1], client => $self, reserved => 1, data => $data, } ); } $self->error(join ' ', @resp); return undef; } sub delete { my $self = shift; my $id = shift; my @resp = _interact($self, "delete $id") or return undef; return 1 if $resp[0] eq 'DELETED'; $self->error(join ' ', @resp); return undef; } sub touch { my $self = shift; my $id = shift; my @resp = _interact($self, "touch $id") or return undef; return 1 if $resp[0] eq 'TOUCHED'; $self->error(join ' ', @resp); return undef; } sub release { my $self = shift; my $id = shift; my $opt = shift || {}; my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority; my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay; my @resp = _interact($self, "release $id $pri $delay") or return undef; return 1 if $resp[0] eq 'RELEASED'; $self->error(join ' ', @resp); return undef; } sub bury { my $self = shift; my $id = shift; my $opt = shift || {}; my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority; my @resp = _interact($self, "bury $id $pri") or return undef; return 1 if $resp[0] eq 'BURIED'; $self->error(join ' ', @resp); return undef; } sub watch { my $self = shift; my $tube = shift; my $watching = $self->__watching or return undef; return scalar keys %$watching if $watching->{$tube}; my @resp = _interact($self, "watch $tube") or return undef; if ($resp[0] eq 'WATCHING') { $watching->{$tube}++; return $resp[1]; } $self->error(join ' ', @resp); return undef; } sub ignore { my $self = shift; my $tube = shift; my $watching = $self->__watching or return undef; return scalar keys %$watching unless $watching->{$tube}; my @resp = _interact($self, "ignore $tube") or return undef; if ($resp[0] eq 'WATCHING') { delete $watching->{$tube}; return $resp[1]; } $self->error(join ' ', @resp); return undef; } sub watch_only { my $self = shift; my $watching = $self->__watching or return undef; my %watched = %$watching; my $ret; foreach my $watch (@_) { next if delete $watched{$watch}; $ret = $self->watch($watch) or return undef; } foreach my $ignore (keys %watched) { $ret = $self->ignore($ignore) or return undef; } return $ret || scalar keys %$watching; } sub peek { _peek($_[0], "peek $_[1]") } sub peek_ready { _peek(shift, "peek-ready") } sub peek_delayed { _peek(shift, "peek-delayed") } sub peek_buried { _peek(shift, "peek-buried") } sub list_tubes { my $self = shift; my $ret = _interact_yaml_resp($self, "list-tubes") or return undef; return @$ret; } sub list_tube_used { my $self = shift; my @resp = _interact($self, "list-tube-used") or return undef; return $resp[1] if $resp[0] eq 'USING'; $self->error(join ' ', @resp); return undef; } sub list_tubes_watched { my $self = shift; my $ret = _interact_yaml_resp($self, "list-tubes-watched") or return; $self->_watching( { map { ($_,1) } @$ret }); @$ret; } sub pause_tube { my $self = shift; my $tube = shift; my $delay= shift || 0; my @resp = _interact($self, "pause-tube $tube $delay") or return undef; return 1 if $resp[0] eq 'PAUSED'; $self->error(join ' ', @resp); return undef; } 1; __END__