/usr/local/CPAN/DSlib/DS/Transformer/Buffer.pm


#!perl

# ########################################################################## #
# Title:         Batch-of-rows processor
# Creation date: 2007-03-05
# Author:        Michael Zedeler
#                Henrik Andreasen
# Description:   Process batches of rows in a data stream
#                Data Stream class
#                Data transformer
#                Buffers rows
# File:          $Source: /data/cvs/lib/DSlib/lib/DS/Transformer/Buffer.pm,v $
# Repository:    kronhjorten
# State:         $State: Exp $
# Documentation: inline
# Recepient:     -
# ########################################################################## #

package DS::Transformer::Buffer;

use base qw{ DS::Transformer::TypePassthrough };

use strict;
use Carp;
use Carp::Assert;

our ($VERSION) = $DS::VERSION;
our ($REVISION) = '$Revision: 1.1 $' =~ /(\d+\.\d+)/;


# new
#
# Class constructor
#
sub new {
    my( $class, $source, $target ) = @_;

    my $self = $class->SUPER::new( $source, $target );

    $self->{buffer} = [];   # Holds copies (?) of the currently buffered rows (stream is N elements long, indexed 0 .. N-1)
    $self->{first} = 0;     # Range: [0..N]    Inv: Always at first buffered element (one past "last" when buffer is empty)
    $self->{last} = -1;     # Range: [-1..N-1] Inv: Always at last element (one before "first" when buffer is empty)
    $self->{current} = -1;  # Range: [-1..N]   Inv: Always at current element, initially -1, finally N (past end)

    return $self;
}

# receive_row
#
# Processes a row in stream.
# Returns undef when no more rows are available.
# It is allowed to call fetch after the stream has ended, each call returning undef.
#
sub process {
    my ($self, $row) = @_;
    $self->push( $row );
    return $self->shift;
}

sub shift {
    my( $self ) = @_;

    my $last = $self->{last};
    my $current = $self->{current};
    my $result = undef;

    # Find and return the "next" element, if any
    ++$current if ($current <= $last);
    $result = ${$self->{buffer}}[$current] if ($current <= $last);
    
    $self->{current} = $current;

    return $result;
}

sub push {
    my( $self, $row ) = @_;
 
    my $last = $self->{last};

    # Put row in buffer if not EOF or EOF not already registered in buffer
    if ( $row or ${$self->{buffer}}[$last] ) {
        ++$last;
        $row = {%$row} if $row; # Make a copy if not EOF
        ${$self->{buffer}}[$last] = $row;
    }

    $self->{last} = $last;

    return;
}

# fetch
#
# Re-fetches rows that has been unfetched.
# It is a fatal error to try fetching beyond last row in the buffer
#TODO Implement some kind of end of stream indicator that will allow fetch to return undef indicating end of stream
sub fetch {
    my ($self) = @_;

    my $last = $self->{last};
    my $current = $self->{current};
    my $result = undef;

    # Make sure we're not beyond end of buffer
    if( $current < $last ) {
        # Find and return the "next" element
        ++$current;
        $result = ${$self->{buffer}}[$current];
    } else {
        croak("Can't fetch past buffer end.");
    }
    
    $self->{last} = $last;
    $self->{current} = $current;

    return $result;
}


# unreceive_row
#
# Moves the "current" position one step backwards within the buffered rows.
# It is a fatal error to try to move before the start of the currently buffered rows.
#
sub unfetch {
    my ($self) = @_;
    
    my $first = $self->{first};
    my $last = $self->{last};
    my $current = $self->{current};

    # Validate the request
    if ($current < $first) {
        die "Cannot unfetch beyond buffer start (frame starts at row number $first and current record is $current)\n";
    } elsif( $current > $last ) {
        # If EOF reached, return to the element that contains EOF
        assert( $current == $last + 1, '$current must never be more than one past $last' );
        --$current;
    }

    # Move back one step
    --$current;

    $self->{current} = $current;
    return 1;
}


# flush
#
# Clears the buffered rows up to and including the given point.
# If the "current" position is flush, it is moved forward such that it will
# return the first available element at next fetch, if any.
# It is a fatal error to flush non-existent rows.
#
sub flush {
    my ($self, $point) = @_;

    my $first = $self->{first};
    my $last = $self->{last};
    my $current = $self->{current};
    
    # Use current position to flush in no point provided
    $point ||= $current;

    # Validate the request
    if ($point < $first || $point > $last) {
        croak("Cannot flush non-existent elements. ($point is not within valid range: $first .. $last)");
    }

    # Delete buffer elements and adjust pointers
    for (my $i = $first; $i <= $point; ++$i) {
        delete ${$self->{buffer}}[$i];
    }
    $self->{first} = $point + 1;
    $self->{current} = $point if ($self->{current} < $point);

    return 1;
}

1;