/usr/local/CPAN/DSlib/DS/Transformer/Buffer.pm
#!perl
# ########################################################################## #
# Title: Batch-of-rows processor
# Creation date: 2007-03-05
# Author: Michael Zedeler
# Henrik Andreasen
# Description: Process batches of rows in a data stream
# Data Stream class
# Data transformer
# Buffers rows
# File: $Source: /data/cvs/lib/DSlib/lib/DS/Transformer/Buffer.pm,v $
# Repository: kronhjorten
# State: $State: Exp $
# Documentation: inline
# Recepient: -
# ########################################################################## #
package DS::Transformer::Buffer;
use base qw{ DS::Transformer::TypePassthrough };
use strict;
use Carp;
use Carp::Assert;
our ($VERSION) = $DS::VERSION;
our ($REVISION) = '$Revision: 1.1 $' =~ /(\d+\.\d+)/;
# new
#
# Class constructor
#
sub new {
my( $class, $source, $target ) = @_;
my $self = $class->SUPER::new( $source, $target );
$self->{buffer} = []; # Holds copies (?) of the currently buffered rows (stream is N elements long, indexed 0 .. N-1)
$self->{first} = 0; # Range: [0..N] Inv: Always at first buffered element (one past "last" when buffer is empty)
$self->{last} = -1; # Range: [-1..N-1] Inv: Always at last element (one before "first" when buffer is empty)
$self->{current} = -1; # Range: [-1..N] Inv: Always at current element, initially -1, finally N (past end)
return $self;
}
# receive_row
#
# Processes a row in stream.
# Returns undef when no more rows are available.
# It is allowed to call fetch after the stream has ended, each call returning undef.
#
sub process {
my ($self, $row) = @_;
$self->push( $row );
return $self->shift;
}
sub shift {
my( $self ) = @_;
my $last = $self->{last};
my $current = $self->{current};
my $result = undef;
# Find and return the "next" element, if any
++$current if ($current <= $last);
$result = ${$self->{buffer}}[$current] if ($current <= $last);
$self->{current} = $current;
return $result;
}
sub push {
my( $self, $row ) = @_;
my $last = $self->{last};
# Put row in buffer if not EOF or EOF not already registered in buffer
if ( $row or ${$self->{buffer}}[$last] ) {
++$last;
$row = {%$row} if $row; # Make a copy if not EOF
${$self->{buffer}}[$last] = $row;
}
$self->{last} = $last;
return;
}
# fetch
#
# Re-fetches rows that has been unfetched.
# It is a fatal error to try fetching beyond last row in the buffer
#TODO Implement some kind of end of stream indicator that will allow fetch to return undef indicating end of stream
sub fetch {
my ($self) = @_;
my $last = $self->{last};
my $current = $self->{current};
my $result = undef;
# Make sure we're not beyond end of buffer
if( $current < $last ) {
# Find and return the "next" element
++$current;
$result = ${$self->{buffer}}[$current];
} else {
croak("Can't fetch past buffer end.");
}
$self->{last} = $last;
$self->{current} = $current;
return $result;
}
# unreceive_row
#
# Moves the "current" position one step backwards within the buffered rows.
# It is a fatal error to try to move before the start of the currently buffered rows.
#
sub unfetch {
my ($self) = @_;
my $first = $self->{first};
my $last = $self->{last};
my $current = $self->{current};
# Validate the request
if ($current < $first) {
die "Cannot unfetch beyond buffer start (frame starts at row number $first and current record is $current)\n";
} elsif( $current > $last ) {
# If EOF reached, return to the element that contains EOF
assert( $current == $last + 1, '$current must never be more than one past $last' );
--$current;
}
# Move back one step
--$current;
$self->{current} = $current;
return 1;
}
# flush
#
# Clears the buffered rows up to and including the given point.
# If the "current" position is flush, it is moved forward such that it will
# return the first available element at next fetch, if any.
# It is a fatal error to flush non-existent rows.
#
sub flush {
my ($self, $point) = @_;
my $first = $self->{first};
my $last = $self->{last};
my $current = $self->{current};
# Use current position to flush in no point provided
$point ||= $current;
# Validate the request
if ($point < $first || $point > $last) {
croak("Cannot flush non-existent elements. ($point is not within valid range: $first .. $last)");
}
# Delete buffer elements and adjust pointers
for (my $i = $first; $i <= $point; ++$i) {
delete ${$self->{buffer}}[$i];
}
$self->{first} = $point + 1;
$self->{current} = $point if ($self->{current} < $point);
return 1;
}
1;