Hadoop::Streaming::Reducer::Input - Parse input stream for reducer


Hadoop-Streaming documentation Contained in the Hadoop-Streaming distribution.

Index


Code Index:

NAME

Top

Hadoop::Streaming::Reducer::Input - Parse input stream for reducer

VERSION

Top

version 0.110030

METHODS

Top

next_key

    $Input->next_key();

Parses the next line into key/value (splits on tab) and returns the key portion.

Returns undef if there is no next line.

next_line

    $Input->next_line();

Reads the next line into buffer and returns it.

Returns undef if there are no more lines (end of file).

getline

    $Input->getline();

Returns the next available line. Clears the internal line buffer if set.

iterator

    $Input->iterator();

Returns a new Hadoop::Streaming::Reducer::Input::Iterator for this object.

each

    $Input->each();

Grabs the next line and splits on tabs. Returns an array containing the output of the split.

AUTHORS

Top

COPYRIGHT AND LICENSE

Top


Hadoop-Streaming documentation Contained in the Hadoop-Streaming distribution.

package Hadoop::Streaming::Reducer::Input;
BEGIN {
  $Hadoop::Streaming::Reducer::Input::VERSION = '0.110030';
}
use Moose;
use Hadoop::Streaming::Reducer::Input::Iterator;

#ABSTRACT: Parse input stream for reducer

has handle => (
    is       => 'ro',
    does     => 'FileHandle',
    required => 1,
);

has buffer => (
    is   => 'rw',
);


sub next_key
{
    my $self = shift;
    my $line = $self->buffer ? $self->buffer : $self->next_line;
    return if not defined $line;
    my ( $key, $value ) = split /\t/, $line, 2;
    return $key;
}


sub next_line {
    my $self = shift;
    return if $self->handle->eof;
    $self->buffer( $self->handle->getline );
    $self->buffer;
}


sub getline {
    my $self = shift;
    if (defined $self->buffer) {
        my $buf = $self->buffer;
        $self->buffer(undef);
        return $buf;
    } else {
        return $self->next_line;
    }
}


sub iterator {
    my $self = shift;
    Hadoop::Streaming::Reducer::Input::Iterator->new( input => $self );
}


sub each
{
    my $self = shift;
    my $line = $self->getline or return;
    chomp $line;
    split /\t/, $line, 2;
}

__PACKAGE__->meta->make_immutable;

1;

__END__