Sort::External - Sort huge lists.


Sort-External documentation Contained in the Sort-External distribution.

Index


Code Index:

NAME

Top

Sort::External - Sort huge lists.

SYNOPSIS

Top

    my $sortex = Sort::External->new( mem_threshold => 1024**2 * 16 );
    while (<HUGEFILE>) {
        $sortex->feed($_);
    }
    $sortex->finish;
    while ( defined( $_ = $sortex->fetch ) ) {
        do_stuff_with($_);
    }

DESCRIPTION

Top

Problem: You have a list which is too big to sort in-memory.

Solution: "feed, finish, and fetch" with Sort::External, the closest thing to a drop-in replacement for Perl's sort() function when dealing with unmanageably large lists.

How it works

Cache sortable items in memory. Periodically sort the cache and flush it to disk, creating a sorted "run". Complete the sort by sorting the input cache and any existing runs into an output stream.

Note that if Sort::External hasn't yet flushed the cache to disk when finish() is called, the whole operation completes in-memory.

In the CompSci world, "internal sorting" refers to sorting data in RAM, while "external sorting" refers to sorting data which is stored on disk, tape, punchcards, or any storage medium except RAM -- hence, this module's name.

Stringification

Items fed to Sort::External will be returned in stringified form (assuming that the cache gets flushed at least once): $foo = "$foo". Since this is unlikely to be desirable when objects or deep data structures are involved, Sort::External throws an error if you feed it anything other than simple scalars.

Expert note: Sort::External does a little extra bookkeeping to sustain each item's taint and UTF-8 flags through the journey to disk and back.

METHODS

Top

new()

    my $sortscheme = sub { $Sort::External::b <=> $Sort::External::a };
    my $sortex = Sort::External->new(
        mem_threshold   => 1024**2 * 16,     # default: 1024**2 * 8 (8 MiB)
        cache_size      => 100_000,          # default: undef (disabled) 
        sortsub         => $sortscheme,      # default sort: standard lexical
        working_dir     => $temp_directory,  # default: see below
    );

Construct a Sort::External object.

mem_threshold - Allow the input cache to consume approximately mem_threshold bytes before sorting it and flushing to disk. Experience suggests that the optimum setting is somewhere in the range of 1-16 MiB.

cache_size - Specify a hard limit for the input cache in terms of sortable items. If set, overrides mem_threshold.

sortsub -- A sorting subroutine. Be advised that you MUST use $Sort::External::a and $Sort::External::b instead of $a and $b in your sub. Before deploying a sortsub, consider using a GRT instead, as described in the Sort::External::Cookbook (Sort::External::Cookbook) -- it's probably a lot faster.

working_dir - The directory where the temporary sortfile will reside. By default, the location of the sortfile is determined by the behavior of File::Temp's constructor.

feed()

    $sortex->feed(@items);

Feed one or more sortable items to your Sort::External object. It is normal for occasional pauses to occur during feeding as caches are flushed.

finish()

    # if you intend to call fetch...
    $sortex->finish; 

    # otherwise....
    use Fcntl;
    $sortex->finish( 
        outfile => 'sorted.txt',
        flags => ( O_CREAT | O_WRONLY ),
    );

Prepare to output items in sorted order.

If you specify the parameter outfile, Sort::External will attempt to write your sorted list to that location. By default, Sort::External will refuse to overwrite an existing file; if you want to override that behavior, you can pass Fcntl flags to finish() using the optional flags parameter.

Note that you can either finish() to an outfile, or finish() then fetch()... but not both.

fetch()

    while ( defined( $_ = $sortex->fetch ) ) {
        do_stuff_with($_);
    }

Fetch the next sorted item.

BUGS

Top

Please report any bugs or feature requests to bug-sort-external@rt.cpan.org, or through the web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Sort-External.

SEE ALSO

Top

The Sort::External::Cookbook (Sort::External::Cookbook).

File::Sort, File::MergeSort, and Sort::Merge as possible alternatives.

AUTHOR

Top

Marvin Humphrey <marvin at rectangular dot com> http://www.rectangular.com

COPYRIGHT AND LICENSE

Top


Sort-External documentation Contained in the Sort-External distribution.

package Sort::External;
use strict;
use warnings;

use 5.006_001;

our $VERSION = '0.18';

use XSLoader;
XSLoader::load( 'Sort::External', $VERSION );

use File::Temp;
use Fcntl qw( :DEFAULT );
use Carp;

our %instance_vars = (
    sortsub        => undef,
    working_dir    => undef,
    cache_size     => 0,
    mem_threshold  => 1024**2 * 8,
    line_separator => undef,         # no-op, backwards compatibility only
);

# Maximum number of runs that SortEx object is allowed without triggering
# multi-level consolidation at finish().
our $MAX_RUNS = 30;

sub new {
    my $class = shift;
    $class = ref($class) || $class;

    # Process labeled params.
    my %args = %instance_vars;
    while (@_) {
        my ( $var, $val ) = ( shift, shift );
        # Back compat: labeled params used to start with a dash.
        $var =~ s/^-//;
        croak("Illegal parameter: '$var'") unless exists $args{$var};
        $args{$var} = $val;
    }

    # Verify that supplied working directory is valid.
    if ( defined $args{working_dir} ) {
        croak("Invalid working_dir '$args{working_dir}'")
            unless -d $args{working_dir};
    }

    # Open a temp file.
    my $tempfile_fh = File::Temp->new(
        DIR    => $args{working_dir},
        UNLINK => 1,
    );

    return _new( $class, $args{working_dir}, $args{sortsub}, $args{cache_size},
        $args{mem_threshold}, $tempfile_fh );
}

my %finish_defaults = (
    outfile => undef,
    flags   => ( O_CREAT | O_EXCL | O_WRONLY ),
);

sub finish {
    my $self = shift;
    my $runs = $self->_get_runs;

    # If we've never flushed the cache, perform the final sort in-memory.
    if ( !@$runs ) {
        my $item_cache = $self->_get_item_cache;
        my $sortsub    = $self->_get_sortsub;
        @$item_cache
            = $sortsub ? sort $sortsub @$item_cache : sort @$item_cache;
    }
    else {
        $self->_write_item_cache_to_tempfile;
        while ( @$runs > $MAX_RUNS ) { 
            $self->_consolidate;
        }
    }

    # If called with arguments, we must be printing everything to an outfile.
    if (@_) {
        # Verify args.
        my %args = %finish_defaults;
        while (@_) {
            my ( $var, $val ) = ( shift, shift );
            # Back compat: labeled params used to start with a dash.
            $var =~ s/^-//;
            croak("Illegal parameter: '$var'")
                unless exists $finish_defaults{$var};
            $args{$var} = $val;
        }

        # Get an outfile and print everything to it.
        croak('Argument outfile is required') unless defined $args{outfile};
        sysopen( my $out_fh, $args{outfile}, $args{flags} )
            or croak("Couldn't open outfile '$args{outfile}': $!");
        $self->_finish_to_filehandle($out_fh);
        close $out_fh or croak("Couldn't close '$args{outfile}': $!");
    }
}

sub _finish_to_filehandle {
    my ( $self, $fh ) = @_;
    my $item_cache = $self->_get_item_cache;
    do {
        print $fh $_ for @$item_cache;
        @$item_cache = ();
        $self->_gatekeeper; # Refreshes @$item_cache.
    } while (@$item_cache);
}

sub _consolidate {
    my $self = shift;
    my $item_cache = $self->_get_item_cache;
    my $runs = $self->_get_runs;
    my $fh   = File::Temp->new(
        DIR    => $self->_get_working_dir,
        UNLINK => 1,
    );
    my @to_consolidate = @$runs;
    @$runs = ();
    my @consolidated;
    while (@to_consolidate) {
        my $num_to_splice = @to_consolidate < 10 ? @to_consolidate : 10;
        push @$runs, splice( @to_consolidate, 0, 10 );
        my $start = tell $fh;
        do {
            $self->_print_to_sortfile( $item_cache, $fh );
            @$item_cache = ();
            $self->_gatekeeper;    # Refreshes @$item_cache.
        } while (@$item_cache);
        my $run = Sort::External::SortExRun->_new( $fh, $start, tell($fh) );
        push @consolidated, $run;
        @$runs = ();
    }
    @$runs = @consolidated;
    $self->_set_temp_fh($fh);
}

# Reload the main cache using elements from the individual run caches.
# 
# Examine all SortExRun objects, making sure that they have at least one
# recovered element in memory.  Find the element among the run caches which we
# can guarantee sorts before any element yet to be recovered from disk.  Move
# all elements from the run caches which sort before or equal to this cutoff
# into the main object's cache.
sub _gatekeeper {
    my $self       = shift;
    my $runs       = $self->_get_runs;
    my $item_cache = $self->_get_item_cache;
    my $sortsub    = $self->_get_sortsub;

    # Discard exhausted runs.
    @$runs = grep { $#{ $_->_get_buffarray } != -1 or $_->_refill_buffer } @$runs;

    if ( @$runs == 0 ) {
        @$item_cache = ();
    }
    elsif ( @$runs == 1 ) {
        # If there's only one SortExRun, no need to sort.
        my $run       = $runs->[0];
        my $buffarray = $run->_get_buffarray;
        $run->_set_buffarray( [] );
        @$item_cache = @$buffarray;
    }
    else {
        # Choose the cutoff from among the lowest elements present in each 
        # run's cache.
        my @on_the_bubble = map { $_->_get_buffarray->[-1] } @$runs;
        @on_the_bubble
            = $sortsub
            ? sort $sortsub @on_the_bubble
            : sort @on_the_bubble;
        my $cutoff = $on_the_bubble[0];

        # Let all qualified items into the out_batch.
        my @out_batch;
        for my $run (@$runs) {
            my $buffarray = $run->_get_buffarray;
            my $tick = $self->_define_range( $buffarray, $cutoff );
            next if $tick == -1;
            my $num_to_splice = $tick + 1;
            push @out_batch, splice( @$buffarray, 0, $num_to_splice );
        }

        # Refresh the item cache and prepare to return elements.
        @$item_cache = $sortsub ? sort $sortsub @out_batch : sort @out_batch;
    }

    $self->_set_fetch_tick(0);
    return;
}

# Compare two elements using either standard lexical comparison or the sortsub
# provided to the object's constructor.
sub _compare {
    my ( $self, $item_a, $item_b ) = @_;
    my $sortsub = $self->_get_sortsub;
    if ( defined $sortsub ) {
        local $a = $item_a;
        local $b = $item_b;
        return $sortsub->( $a, $b );
    }
    else {
        return $item_a cmp $item_b;
    }
}

# Flush the items in the input cache to a tempfile, sorting as we go.
sub _write_item_cache_to_tempfile {
    my $self       = shift;
    my $item_cache = $self->_get_item_cache;
    my $sortsub    = $self->_get_sortsub;

    return unless @$item_cache;

    # Print the sorted cache to the tempfile.
    @$item_cache = $sortsub ? sort $sortsub @$item_cache : sort @$item_cache;
    my $tempfile_fh = $self->_get_tempfile_fh;
    my $start       = tell($tempfile_fh);
    $self->_print_to_sortfile( $item_cache, $tempfile_fh );

    # Add a SortExRun object to the runs array.
    my $run = Sort::External::SortExRun->_new( $tempfile_fh, $start,
        tell($tempfile_fh) );
    push @{ $self->_get_runs }, $run;

    # Reset cache variables.
    $self->_set_mem_bytes(0);
    $#$item_cache = -1;
}

# Return the highest index in an array representing an element lower than
# a given cutoff.
sub _define_range {
    my ( $self, $array, $target ) = @_;
    my ( $lo, $mid, $hi ) = ( 0, 0, $#$array );

    # Binary search.
    while ( $hi - $lo > 1 ) {
        $mid = ( $lo + $hi ) >> 1;
        my $delta = $self->_compare( $array->[$mid], $target );
        if    ( $delta < 0 ) { $lo = $mid }
        elsif ( $delta > 0 ) { $hi = $mid }
        elsif ( $delta == 0 ) { $lo = $hi = $mid }
    }

    # Get that last item in...
    while ( $mid < $#$array
        and $self->_compare( $array->[ $mid + 1 ], $target ) < 1 )
    {
        $mid++;
    }
    while ( $mid >= 0 and $self->_compare( $array->[$mid], $target ) > 0 ) {
        $mid--;
    }

    return $mid;
}

1;

__END__