/usr/local/CPAN/Parallel-MapReduce/Parallel/MapReduce/Utils.pm
package Parallel::MapReduce::Utils;
use strict;
use warnings;
require Exporter;
use base qw(Exporter);
our @EXPORT = qw(Hchunk Hslice Hfetch Hstore chunk_n_store fetch_n_unchunk balance_keys);
use Data::Dumper;
use Storable qw(freeze thaw);
$Storable::Deparse = 1;
$Storable::Eval = 1;
# chunking a hash %H into chunks of size L
# size 'L' is more a rough estimate, than a hard limit
sub Hchunk {
my $h = shift;
my $L = shift;
my @H;
my $l = 0;
my %hh = ();
while (my ($k, $v) = each %$h) {
my $ll = bytes::length(freeze(\ $v));
if ($l + $ll > $L) {
push @H, { %hh } if keys %hh;
%hh = ();
$l = 0;
}
$hh{$k} = $v;
$l += $ll;
}
push @H, { %hh } if keys %hh;
return \@H;
}
sub chunk_n_store {
my $memd = shift;
my $hash = shift;
my $prefix = shift;
my $size = shift || 1000;
use Digest::MD5 qw(md5_hex);
return
map { $memd->set ( $_->[0], $_->[1]); $_->[0] }
map { [ $prefix . md5_hex($_), $_ ] }
map { freeze ($_) }
@{ Hchunk ( $hash, $size) };
}
sub fetch_n_unchunk { #-- find mapper slice
my $memd = shift;
my $chunks = shift;
use Storable qw(thaw);
return {
map { %{ thaw ($_) } }
map { $memd->get ($_) }
@$chunks
};
}
# slicing hash %H into M slices (key-wise)
sub Hslice {
my $h = shift;
my $M = shift;
my $H = {}; # will contain the result hash
my $i = 0;
map { $H->{ $i++ % $M }->{$_} = $h->{$_} } keys %$h;
return $H;
}
sub Hstore { # flush out hash onto memcacheds
my $memd = shift;
my $hash = shift;
my $slice = shift;
my $job = shift;
# my @cs;
# foreach my $k (keys %$hash) {
# $memd->set($prefix.$k, $hash->{$k});
# push @cs, $prefix.$k;
# }
# return @cs;
return
map { $memd->set($_->[0], $_->[1] ); $_->[0] }
map { [ $slice.$_, $hash->{$_} ]}
keys %$hash;
}
sub Hfetch {
my $memd = shift;
my $keys = shift;
my $jobs = shift;
#warn "Hfetch ".Dumper $keys;
my $h = $memd->get_multi (@$keys);
#warn "fetched ".Dumper $h;
my %h2;
map { push @{ $h2{ $_->[0] } }, @{ $h->{ $_->[1] } } } # aggregating all value lists
map { $_ =~ /^(slice\d+:)(.+)/; [ $2, $_ ]} # finding original keys
keys %$h;
#warn "after aggregation ".Dumper \%h2;
return \%h2;
}
sub fetch_n_consolidate {
my $memd = shift;
my $keys = shift;
my $h = $memd->get_multi (@$keys);
#warn "fetch_n ".Dumper $h;
# resorting of different slices, but same keys
my $h2;
foreach my $k (keys %$h) {
$k =~ /^\w+:(.+)/;
push @{ $h2->{$1} }, @{ $h->{$k} };
}
return $h2;
}
sub balance_keys {
my $keys = shift;
my $job = shift;
my $N = shift || 1;
my %R;
foreach my $r (@$keys) {
$r =~ /^(slice\d+:)(.+)/;
my $s = $2; # find relevant part in key
my $sum = 0;
grep { $sum += $_ } map { ord ($_) } split //, $s ; # convert critical part of key to hashed number
push @{ $R{ $sum % $N } }, $r; # add the original key to some reducer
}
return \%R;
#warn "reducer distri ".Dumper \%R;
}
1;