| RDF-Trine documentation | Contained in the RDF-Trine distribution. |
new ( \@results, \@names, %args )new ( \&results, \@names, %args )materializeproject ( @columns )join_streams ( $stream, $stream )nested_loop_join ( $outer, $inner )sorted_bybinding_value_by_name ( $name )binding_value ( $i )binding_valuesbinding_namesbinding_name ( $i )bindings_countis_bindingsas_json ( $max_size )as_xml ( $max_size )as_string ( $max_size [, \$count] )as_statements ( @names )print_xml ( $fh, $max_size )construct_args
RDF::Trine::Iterator::Bindings - Stream (iterator) class for bindings query results
This document describes RDF::Trine::Iterator::Bindings version 0.135
use RDF::Trine::Iterator::Bindings;
my $iterator = RDF::Trine::Iterator::Bindings->new( \&data, \@names );
while (my $row = $iterator->next) {
# $row is a HASHref containing variable name -> RDF Term bindings
my @vars = keys %$row;
print $row->{ 'var' }->as_string;
}
Beyond the methods documented below, this class inherits methods from the RDF::Trine::Iterator class.
new ( \@results, \@names, %args )new ( \&results, \@names, %args )Returns a new SPARQL Result interator object. Results must be either a reference to an array containing results or a CODE reference that acts as an iterator, returning successive items when called, and returning undef when the iterator is exhausted.
materializeReturns a materialized version of the current binding iterator.
project ( @columns )Returns a new stream that projects the current bindings to only the given columns.
join_streams ( $stream, $stream )Performs a natural, nested loop join of the two streams, returning a new stream of joined results.
nested_loop_join ( $outer, $inner )Performs a natural, nested loop join of the two streams, returning a new stream of joined results.
Note that the values from the $inner iterator are fully materialized for
this join, and the results of the join are in the order of values from the
$outer iterator. This suggests that:
* If sorting needs to be preserved, the $outer iterator should be used to
determine the result ordering.
* If one iterator is much smaller than the other, it should likely be used as
the $inner iterator since materialization will require less total memory.
sorted_bybinding_value_by_name ( $name )Returns the binding of the named variable in the current result.
binding_value ( $i )Returns the binding of the $i-th variable in the current result.
binding_valuesReturns a list of the binding values from the current result.
binding_namesReturns a list of the binding names.
binding_name ( $i )Returns the name of the $i-th result column.
bindings_countReturns the number of variable bindings in the current result.
is_bindingsReturns true if the underlying result is a set of variable bindings.
as_json ( $max_size )Returns a JSON serialization of the stream data.
as_xml ( $max_size )Returns an XML serialization of the stream data.
as_string ( $max_size [, \$count] )Returns a string table serialization of the stream data.
as_statements ( @names )print_xml ( $fh, $max_size )Prints an XML serialization of the stream data to the filehandle $fh.
construct_argsReturns the arguments necessary to pass to the stream constructor _new to re-create this stream (assuming the same closure as the first argument).
Gregory Todd Williams <gwilliams@cpan.org>
Copyright (c) 2006-2010 Gregory Todd Williams. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| RDF-Trine documentation | Contained in the RDF-Trine distribution. |
# RDF::Trine::Iterator::Bindings # -----------------------------------------------------------------------------
package RDF::Trine::Iterator::Bindings; use utf8; use strict; use warnings; no warnings 'redefine'; use Data::Dumper; use JSON 2.0; use Text::Table; use Log::Log4perl; use Scalar::Util qw(blessed reftype); use RDF::Trine::Iterator::Bindings::Materialized; use RDF::Trine::Serializer::Turtle; use RDF::Trine::Iterator qw(smap); use base qw(RDF::Trine::Iterator); our ($VERSION); BEGIN { $VERSION = '0.135'; }
sub new { my $class = shift; my $stream = shift || sub { undef }; my $names = shift || []; my %args = @_; my $type = 'bindings'; my $self = $class->SUPER::new( $stream, $type, $names, %args ); my $s = $args{ sorted_by } || []; $self->{sorted_by} = $s; return $self; } sub _new { my $class = shift; my $stream = shift; my $type = shift; my $names = shift; my %args = @_; return $class->new( $stream, $names, %args ); }
sub materialize { my $self = shift; my @data = $self->get_all; my @args = $self->construct_args; return $self->_mclass->_new( \@data, @args ); } sub _mclass { return 'RDF::Trine::Iterator::Bindings::Materialized'; }
sub project { my $self = shift; my $class = ref($self); my @proj = @_; my $sub = sub { my $row = $self->next; return unless ($row); my $p = { map { $_ => $row->{ $_ } } @proj }; return $p; }; my $args = $self->_args(); my $proj = $class->new( $sub, [@proj], %$args ); return $proj; }
sub join_streams { my $self = shift; my $a = shift; my $b = shift; # my $bridge = shift; my %args = @_; # Carp::confess unless ($a->isa('RDF::Trine::Iterator::Bindings')); # Carp::confess unless ($b->isa('RDF::Trine::Iterator::Bindings')); my $l = Log::Log4perl->get_logger("rdf.trine.iterator.bindings"); my @join_sorted_by; if (my $o = $args{ orderby }) { my $req_sort = join(',', map { $_->[1]->name => $_->[0] } @$o); my $a_sort = join(',', $a->sorted_by); my $b_sort = join(',', $b->sorted_by); if ($l->is_debug) { $l->debug('---------------------------'); $l->debug('REQUESTED SORT in JOIN: ' . Dumper($req_sort)); $l->debug('JOIN STREAM SORTED BY: ' . Dumper($a_sort)); $l->debug('JOIN STREAM SORTED BY: ' . Dumper($b_sort)); } my $actual_sort; if (substr( $a_sort, 0, length($req_sort) ) eq $req_sort) { $l->debug("first stream is already sorted. using it in the outer loop."); } elsif (substr( $b_sort, 0, length($req_sort) ) eq $req_sort) { $l->debug("second stream is already sorted. using it in the outer loop."); ($a,$b) = ($b,$a); } else { my $a_common = join('!', $a_sort, $req_sort); my $b_common = join('!', $b_sort, $req_sort); if ($a_common =~ qr[^([^!]+)[^!]*!\1]) { # shared prefix between $a_sort and $req_sort? $l->debug("first stream is closely sorted ($1)."); } elsif ($b_common =~ qr[^([^!]+)[^!]*!\1]) { # shared prefix between $b_sort and $req_sort? $l->debug("second stream is closely sorted ($1)."); ($a,$b) = ($b,$a); } } @join_sorted_by = ($a->sorted_by, $b->sorted_by); } my $stream = $self->nested_loop_join( $a, $b, %args ); $l->debug("JOINED stream is sorted by: " . join(',', @join_sorted_by)); $stream->{sorted_by} = \@join_sorted_by; return $stream; }
sub nested_loop_join { my $self = shift; my $astream = shift; my $bstream = shift; # my $bridge = shift; my %args = @_; # Carp::confess unless ($astream->isa('RDF::Trine::Iterator::Bindings')); # Carp::confess unless ($bstream->isa('RDF::Trine::Iterator::Bindings')); my $l = Log::Log4perl->get_logger("rdf.trine.iterator.bindings"); ################################################ ### BNODE MAP STUFF my $a_extra = $astream->extra_result_data || {}; my $b_extra = $bstream->extra_result_data || {}; my (%a_map, %b_map); foreach my $h (@{ $a_extra->{'bnode-map'} || [] }) { foreach my $id (keys %$h) { my @values = @{ $h->{ $id } }; push( @{ $a_map{ $id } }, @values ); } } foreach my $h (@{ $b_extra->{'bnode-map'} || [] }) { foreach my $id (keys %$h) { my @values = @{ $h->{ $id } }; push( @{ $b_map{ $id } }, @values ); } } my $a_map = (%a_map) ? \%a_map : undef; my $b_map = (%b_map) ? \%b_map : undef; ################################################ my @names = RDF::Trine::_uniq( map { $_->binding_names() } ($astream, $bstream) ); my $a = $astream->project( @names ); my $b = $bstream->project( @names ); my @data = $b->get_all(); no warnings 'uninitialized'; my $inner_index; my $rowa; my $need_new_a = 1; my $sub = sub { OUTER: while (1) { if ($need_new_a) { $l->debug("### fetching new outer tuple"); $rowa = $a->next; $inner_index = 0; $need_new_a = 0; } $l->debug("OUTER: " . Dumper($rowa)); return undef unless ($rowa); LOOP: while ($inner_index <= $#data) { my $rowb = $data[ $inner_index++ ]; $l->debug("- INNER[ $inner_index ]: " . Dumper($rowb)); $l->debug("[--JOIN--] " . join(' ', map { my $row = $_; '{' . join(', ', map { join('=', $_, ($row->{$_}) ? $row->{$_}->as_string : '(undef)') } (keys %$row)) . '}' } ($rowa, $rowb))); my %keysa = map {$_=>1} (keys %$rowa); my @shared = grep { $keysa{ $_ } } (keys %$rowb); foreach my $key (@shared) { my $val_a = $rowa->{ $key }; my $val_b = $rowb->{ $key }; my $defined = 0; foreach my $n ($val_a, $val_b) { $defined++ if (defined($n)); } if ($defined == 2) { my $equal = $val_a->equal( $val_b ); if (not $equal) { my $query = $args{ query }; my $bridge = $args{ bridge }; if ($query and $bridge) { warn 'join values and bnode maps: ' . Dumper($val_a, $val_b, $a_map, $b_map) if ($a_map or $b_map); if ($a_map and $val_a->isa('RDF::Trine::Node::Blank')) { my $anames = Set::Scalar->new( @{ $a_map{ $val_a->blank_identifier } } ); my $bnames = Set::Scalar->new( RDF::Query::Algebra::Service->_names_for_node( $val_b, $query, $bridge, {} ) ); if (my $int = $anames->intersection( $bnames )) { warn "node equality based on $int"; $equal = 1; } } elsif ($b_map and $val_b->isa('RDF::Trine::Node::Blank')) { my $bnames = Set::Scalar->new( @{ $b_map{ $val_b->blank_identifier } } ); my $anames = Set::Scalar->new( RDF::Query::Algebra::Service->_names_for_node( $val_a, $query, $bridge, {} ) ); warn "anames: $anames\n"; warn "bnames: $bnames\n"; if (my $int = $anames->intersection( $bnames )) { warn "node equality based on $int"; $equal = 1; } } } else { if ($l->is_debug) { $l->logcluck("no query,bridge in args"); } } } unless ($equal) { $l->debug("can't join because mismatch of $key (" . join(' <==> ', map {$_->as_string} ($val_a, $val_b)) . ")"); next LOOP; } } } my $row = { (map { $_ => $rowa->{$_} } grep { defined($rowa->{$_}) } keys %$rowa), (map { $_ => $rowb->{$_} } grep { defined($rowb->{$_}) } keys %$rowb) }; if ($l->is_debug) { $l->debug("JOINED:"); foreach my $key (keys %$row) { $l->debug("$key\t=> " . $row->{ $key }->as_string); } } return $row; } $need_new_a = 1; } }; my $args = $astream->_args; return $astream->_new( $sub, 'bindings', \@names, %$args ); }
sub sorted_by { my $self = shift; my $sorted = $self->{sorted_by}; return @$sorted; }
sub binding_value_by_name { my $self = shift; my $name = shift; my $row = ($self->open) ? $self->current : $self->next; if (exists( $row->{ $name } )) { return $row->{ $name }; } else { # warn "No variable named '$name' is present in query results.\n"; return; } }
sub binding_value { my $self = shift; my $val = shift; my @names = $self->binding_names; return $self->binding_value_by_name( $names[ $val ] ); }
sub binding_values { my $self = shift; my $row = ($self->open) ? $self->current : $self->next; return @{ $row }{ $self->binding_names }; }
sub binding_names { my $self = shift; my $names = $self->{_names}; return @$names; }
sub binding_name { my $self = shift; my $val = shift; my $names = $self->{_names}; return $names->[ $val ]; }
sub bindings_count { my $self = shift; my $names = $self->{_names}; return scalar( @$names ) if (scalar(@$names)); return 0; }
sub is_bindings { my $self = shift; return 1; }
sub as_json { my $self = shift; my $max_result_size = shift || 0; my $width = $self->bindings_count; # my $bridge = $self->bridge; my @variables; for (my $i=0; $i < $width; $i++) { my $name = $self->binding_name($i); push(@variables, $name) if $name; } my $count = 0; my @sorted = $self->sorted_by; my $order = scalar(@sorted) ? JSON::true : JSON::false; my $dist = $self->_args->{distinct} ? JSON::true : JSON::false; my $data = { head => { vars => \@variables }, results => { ordered => $order, distinct => $dist }, }; my @bindings; while (!$self->finished) { my %row; for (my $i = 0; $i < $width; $i++) { my $name = $self->binding_name($i); my $value = $self->binding_value($i); if (blessed($value)) { if (my ($k, $v) = format_node_json($value, $name)) { $row{ $k } = $v; } } } push(@{ $data->{results}{bindings} }, \%row); last if ($max_result_size and ++$count >= $max_result_size); } continue { $self->next_result } return to_json( $data ); }
sub as_xml { my $self = shift; my $max_result_size = shift || 0; my $string = ''; open( my $fh, '>', \$string ); $self->print_xml( $fh, $max_result_size ); close($fh); return $string; }
sub as_string { my $self = shift; my $max_result_size = shift || 0; my $rescount = shift; my @names = $self->binding_names; my $headers = \@names; my @rows; my $count = 0; while (my $row = $self->next) { push(@rows, [ map { blessed($_) ? RDF::Trine::Serializer::Turtle->node_as_concise_string($_) : '' } @{ $row }{ @names } ]); last if ($max_result_size and ++$count >= $max_result_size); } if (ref($rescount)) { $$rescount = scalar(@rows); } my @rule = qw(- +); my @headers = (\q"| "); push(@headers, map { $_ => \q" | " } @$headers); pop @headers; push @headers => (\q" |"); if ('ARRAY' eq ref $rows[0]) { if (@$headers == @{ $rows[0] }) { my $table = Text::Table->new(@headers); $table->rule(@rule); $table->body_rule(@rule); $table->load(@rows); return join('', $table->rule(@rule), $table->title, $table->rule(@rule), map({ $table->body($_) } 0 .. @rows), $table->rule(@rule) ); } else { die("make_table() rows must be an AoA with rows being same size as headers"); } } else { return ''; } }
sub as_statements { my $self = shift; my @names = @_; my $sub = sub { my $row = $self->next; return undef unless (defined $row); my @values = @{ $row }{ @names }; my $statement = (scalar(@values) == 3 or not(defined($values[3]))) ? RDF::Trine::Statement->new( @values[ 0 .. 2 ] ) : RDF::Trine::Statement::Quad->new( @values ); return $statement; }; return RDF::Trine::Iterator::Graph->new( $sub ) }
sub print_xml { my $self = shift; my $fh = shift; my $max_result_size = shift || 0; my $width = $self->bindings_count; my @variables; for (my $i=0; $i < $width; $i++) { my $name = $self->binding_name($i); push(@variables, $name) if $name; } no strict 'refs'; print {$fh} <<"END"; <?xml version="1.0" encoding="utf-8"?> <sparql xmlns="http://www.w3.org/2005/sparql-results#"> <head> END my $t = join("\n", map { qq(\t<variable name="$_"/>) } @variables); my $delay_output = 0; my $delayed = ''; if ($self->extra_result_data) { $delay_output = $fh; undef $fh; open( $fh, '>', \$delayed ) or die $!; } else { if ($t) { print {$fh} "${t}\n"; } } print {$fh} <<"END"; </head> <results> END my $count = 0; while (my $row = $self->next) { my @row; print {$fh} "\t\t<result>\n"; for (my $i = 0; $i < $width; $i++) { my $name = $self->binding_name($i); my $value = $row->{ $name }; print {$fh} "\t\t\t" . $self->format_node_xml($value, $name) . "\n"; } print {$fh} "\t\t</result>\n"; last if ($max_result_size and ++$count >= $max_result_size); } if ($delay_output) { my $extra = $self->extra_result_data; my $extraxml = ''; foreach my $tag (keys %$extra) { $extraxml .= qq[<extra name="${tag}">\n]; my $value = $extra->{ $tag }; foreach my $e (@$value) { foreach my $k (keys %$e) { my $v = $e->{ $k }; my @values = @$v; foreach ($k, @values) { s/&/&/g; s/</</g; s/"/"/g; } $extraxml .= qq[\t<extrakey id="$k">] . join(',', @values) . qq[</extrakey>\n]; } } $extraxml .= "</extra>\n"; } my $u = URI->new('data:'); $u->media_type('text/xml'); $u->data($extraxml); my $uri = "$u"; $uri =~ s/&/&/g; $uri =~ s/</</g; $uri =~ s/'/'/g; $uri =~ s/"/"/g; $fh = $delay_output; print {$fh} "${t}\n"; print {$fh} qq[\t<link href="$uri" />\n]; print {$fh} $delayed; } print {$fh} "</results>\n"; print {$fh} "</sparql>\n"; }
sub format_node_json ($$$) { # my $bridge = shift; # return undef unless ($bridge); my $node = shift; my $name = shift; my $node_label; if(!defined $node) { return; } elsif ($node->isa('RDF::Trine::Node::Resource')) { $node_label = $node->uri_value; return $name => { type => 'uri', value => $node_label }; } elsif ($node->isa('RDF::Trine::Node::Literal')) { $node_label = $node->literal_value; return $name => { type => 'literal', value => $node_label }; } elsif ($node->isa('RDF::Trine::Node::Blank')) { $node_label = $node->blank_identifier; return $name => { type => 'bnode', value => $node_label }; } else { return; } }
sub construct_args { my $self = shift; my $type = $self->type; my @names = $self->binding_names; my $args = $self->_args || {}; return ($type, \@names, %{ $args }); } 1; __END__