| Data-Consumer documentation | Contained in the Data-Consumer distribution. |
Data::Consumer::Dir - Data::Consumer implementation for a directory of files resource
Version 0.09
use Data::Consumer::Dir;
my $consumer = Data::Consumer::Dir->new(
root => '/some/dir',
create => 1,
open_mode => '+<',
);
$consumer->consume( sub {
my $id = shift;
print "processed $id\n";
} );
Constructor for a Data::Consumer::Dir instance.
Either the root option must be provided or both unprocessed and
processed arguments must be defined. Will die if the directories do
not exist unless the create option is set to a true value.
Directory within which unprocessed files will be found.
May also be a callback which is responsible for marking the item as
unprocessed. This will be called with the arguments ($consumer,
'unprocessed', $spec, $fh, $name).
Files will be moved to this directory prior to be processed.
May also be a callback which is responsible for marking the item as
working. This will be called with the arguments ($consumer,
'working', $spec, $fh, $name).
Once sucessfully processed the files will be moved to this directory.
May also be a callback which is responsible for marking the item as
processed. This will be called with the arguments ($consumer,
'processed', $spec, $fh, $name).
If processing fails then the files will be moved to this directory.
May also be a callback which is responsible for marking the item as
failed. This will be called with the arguments ($consumer, 'failed',
$spec, $fh, $name).
Automatically creates any of the unprocessed, working,
processed, or failed directories below a specified root. Only
those directories not explicitly defined will be automatically created
so this can be used in conjunction with the other options.
If true then directories specified by not existing will be created.
If create_mode is specified then the directories will be created with that mode.
In order to lock a file a filehandle must be opened, normally in
read-only mode (<), however it may be useful to open with other
modes.
Reset the state of the object.
Aquire an item to be processed.
Returns an identifier to be used to identify the item acquired.
Release any locks on the currently held item.
Normally there is no need to call this directly.
Return a filehandle to the currently acquired item. See the open_mode
argument in new() for details on how to control the mode that the
filehandle is opened with.
Return the full filespec for the currently acquired item.
Return the filename (without path) of the currently acquired item.
Note that this is an alias for $object->last_id().
Yves Orton, <YVES at cpan.org>
Please report any bugs or feature requests to
bug-data-consumer at rt.cpan.org, or through the web interface at
http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Data-Consumer.
I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.
Igor Sutton <IZUT@cpan.org> for ideas, testing and support
Copyright 2008 Yves Orton, all rights reserved.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| Data-Consumer documentation | Contained in the Data-Consumer distribution. |
package Data::Consumer::Dir; use warnings; use strict; use DBI; use Carp qw(confess); use warnings FATAL => 'all'; use base 'Data::Consumer'; use File::Spec; use File::Path; use Fcntl; use Fcntl ':flock'; use vars qw/$Debug $VERSION $Cmd $Fail/; # This code was formatted with the following perltidy options: # -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis # If you patch it please use the same options for your patch. *Debug= *Data::Consumer::Debug; *Cmd= *Data::Consumer::Cmd; *Fail= *Data::Consumer::Fail; BEGIN { __PACKAGE__->register(); }
$VERSION= '0.09';
BEGIN { my @keys= qw(unprocessed working processed failed); my %m= ( '<' => O_RDONLY, '+<' => O_RDWR, '>>' => O_APPEND | O_WRONLY, '+>>' => O_APPEND | O_RDWR, ); $_= $_ | O_NONBLOCK for values %m; sub new { my ( $class, %opts )= @_; my $self= $class->SUPER::new(); # let Data::Consumer bless the hash if ( $opts{root} ) { my ( $v, $p )= File::Spec->splitpath( $opts{root}, 'nofile' ); for my $type (@keys) { $opts{$type} ||= File::Spec->catpath( $v, File::Spec->catdir( $p, $type ), '' ); } } ( $opts{unprocessed} and $opts{processed} ) or confess "Arguments 'unprocessed' and 'processed' are mandatory"; if ( $opts{create} ) { for (@keys) { next unless exists $opts{$_}; next if -d $opts{$_}; mkpath( $opts{$_}, $Debug, $opts{create_mode} || () ); } } if ( $opts{open_mode} ) { exists $m{ $opts{open_mode} } or confess "Illegal open mode '$opts{open_mode}' legal options are " . join( ',', map { "'$_'" } sort keys %m ) . "\n"; $opts{open_mode}= $m{ $opts{open_mode} }; } else { $opts{open_mode}= O_RDONLY | O_NONBLOCK; } %$self= %opts; return $self; } }
sub reset { my $self= shift; $self->debug_warn( 5, "reset (scanning $self->{unprocessed})" ); $self->release(); opendir my $dh, $self->{unprocessed} or die "Failed to opendir '$self->{unprocessed}': $!"; my @files= map { /(.*)/s && $1 } readdir($dh); #print for @files; @files= sort grep { -f _cf( $self->{unprocessed}, $_ ) } @files; $self->{files}= \@files; return $self; } sub _cf { # cat file my ( $r, $f )= @_; my ( $v, $p )= File::Spec->splitpath( $r, 'nofile' ); return File::Spec->catpath( $v, $p, $f ); } sub _do_callback { my ( $self, $callback )= @_; local $Fail; if ( eval { $callback->( $self, @{$self}{qw(lock_spec lock_fh last_id)} ); 1; } ) { if ($Fail) { return "Callback reports an error: $Fail"; } return; } else { return "Callback failed: $@"; } } sub acquire { my $self= shift; my $dbh= $self->{dbh}; $self->reset if !@{ $self->{files} || [] }; my $files= $self->{files}; while (@$files) { my $file= shift @$files; next if $self->is_ignored($file); my $spec= _cf( $self->{unprocessed}, $file ); my $fh; if ( sysopen $fh, $spec, $self->{open_mode} and flock( $fh, LOCK_EX | LOCK_NB ) ) { $self->{lock_fh}= $fh; $self->{lock_spec}= $spec; $self->debug_warn( 5, "acquired '$file': $spec" ); $self->{last_id}= $file; return $file; } } $self->debug_warn( 5, "acquire failed -- resource has been exhausted" ); return; } sub release { my $self= shift; flock( $self->{lock_fh}, LOCK_UN ) if $self->{lock_fh}; delete $self->{lock_fh}; delete $self->{lock_spec}; delete $self->{last_id}; return 1; }
sub fh { $_[0]->{lock_fh} } sub spec { $_[0]->{lock_spec} } sub file { $_[0]->{last_id} } sub _mark_as { my ( $self, $key, $id )= @_; if ( $self->{$key} ) { if ( ref $self->{$key} ) { # assume it must be a callback $self->debug_warn( 5, "executing mark_as callback for '$key'" ); $self->{$key}->( $self, $key, $self->{lock_spec}, $self->{lock_fh}, $self->{last_id} ); return; } my $spec= _cf( $self->{$key}, $self->{last_id} ); rename $self->{lock_spec}, $spec or confess "$$: Failed to rename '$self->{lock_spec}' to '$spec':$!"; $self->{lock_spec}= $spec; } } sub DESTROY { my $self= shift; $self->release() if $self; }
1; # End of Data::Consumer::Dir