RDF::Trine::Iterator::Bindings - Stream (iterator) class for bindings query results


RDF-Trine documentation Contained in the RDF-Trine distribution.

Index


Code Index:

NAME

Top

RDF::Trine::Iterator::Bindings - Stream (iterator) class for bindings query results

VERSION

Top

This document describes RDF::Trine::Iterator::Bindings version 0.135

SYNOPSIS

Top

 use RDF::Trine::Iterator::Bindings;

 my $iterator = RDF::Trine::Iterator::Bindings->new( \&data, \@names );
 while (my $row = $iterator->next) {
   # $row is a HASHref containing variable name -> RDF Term bindings
   my @vars = keys %$row;
   print $row->{ 'var' }->as_string;
 }

METHODS

Top

Beyond the methods documented below, this class inherits methods from the RDF::Trine::Iterator class.

new ( \@results, \@names, %args )
new ( \&results, \@names, %args )

Returns a new SPARQL Result interator object. Results must be either a reference to an array containing results or a CODE reference that acts as an iterator, returning successive items when called, and returning undef when the iterator is exhausted.

materialize

Returns a materialized version of the current binding iterator.

project ( @columns )

Returns a new stream that projects the current bindings to only the given columns.

join_streams ( $stream, $stream )

Performs a natural, nested loop join of the two streams, returning a new stream of joined results.

nested_loop_join ( $outer, $inner )

Performs a natural, nested loop join of the two streams, returning a new stream of joined results.

Note that the values from the $inner iterator are fully materialized for this join, and the results of the join are in the order of values from the $outer iterator. This suggests that:

* If sorting needs to be preserved, the $outer iterator should be used to determine the result ordering.

* If one iterator is much smaller than the other, it should likely be used as the $inner iterator since materialization will require less total memory.

sorted_by
binding_value_by_name ( $name )

Returns the binding of the named variable in the current result.

binding_value ( $i )

Returns the binding of the $i-th variable in the current result.

binding_values

Returns a list of the binding values from the current result.

binding_names

Returns a list of the binding names.

binding_name ( $i )

Returns the name of the $i-th result column.

bindings_count

Returns the number of variable bindings in the current result.

is_bindings

Returns true if the underlying result is a set of variable bindings.

as_json ( $max_size )

Returns a JSON serialization of the stream data.

as_xml ( $max_size )

Returns an XML serialization of the stream data.

as_string ( $max_size [, \$count] )

Returns a string table serialization of the stream data.

as_statements ( @names )
print_xml ( $fh, $max_size )

Prints an XML serialization of the stream data to the filehandle $fh.

construct_args

Returns the arguments necessary to pass to the stream constructor _new to re-create this stream (assuming the same closure as the first argument).

DEPENDENCIES

Top

JSON

Scalar::Util

AUTHOR

Top

Gregory Todd Williams <gwilliams@cpan.org>

COPYRIGHT

Top


RDF-Trine documentation Contained in the RDF-Trine distribution.
# RDF::Trine::Iterator::Bindings
# -----------------------------------------------------------------------------

package RDF::Trine::Iterator::Bindings;

use utf8;
use strict;
use warnings;
no warnings 'redefine';
use Data::Dumper;

use JSON 2.0;
use Text::Table;
use Log::Log4perl;
use Scalar::Util qw(blessed reftype);
use RDF::Trine::Iterator::Bindings::Materialized;
use RDF::Trine::Serializer::Turtle;

use RDF::Trine::Iterator qw(smap);
use base qw(RDF::Trine::Iterator);

our ($VERSION);
BEGIN {
	$VERSION	= '0.135';
}

sub new {
	my $class	= shift;
	my $stream	= shift || sub { undef };
	my $names	= shift || [];
	my %args	= @_;
	
	my $type	= 'bindings';
	my $self	= $class->SUPER::new( $stream, $type, $names, %args );
	
	my $s 	= $args{ sorted_by } || [];
	$self->{sorted_by}	= $s;
	return $self;
}

sub _new {
	my $class	= shift;
	my $stream	= shift;
	my $type	= shift;
	my $names	= shift;
	my %args	= @_;
	return $class->new( $stream, $names, %args );
}

sub materialize {
	my $self	= shift;
	my @data	= $self->get_all;
	my @args	= $self->construct_args;
	return $self->_mclass->_new( \@data, @args );
}

sub _mclass {
	return 'RDF::Trine::Iterator::Bindings::Materialized';
}

sub project {
	my $self	= shift;
	my $class	= ref($self);
	my @proj	= @_;
	
	my $sub		= sub {
		my $row	= $self->next;
		return unless ($row);
		my $p	= { map { $_ => $row->{ $_ } } @proj };
		return $p;
	};
	
	my $args	= $self->_args();
	my $proj	= $class->new( $sub, [@proj], %$args );
	return $proj;
}

sub join_streams {
	my $self	= shift;
	my $a		= shift;
	my $b		= shift;
# 	my $bridge	= shift;
	my %args	= @_;
	
# 	Carp::confess unless ($a->isa('RDF::Trine::Iterator::Bindings'));
# 	Carp::confess unless ($b->isa('RDF::Trine::Iterator::Bindings'));
	my $l		= Log::Log4perl->get_logger("rdf.trine.iterator.bindings");
	
	my @join_sorted_by;
	if (my $o = $args{ orderby }) {
		my $req_sort	= join(',', map { $_->[1]->name => $_->[0] } @$o);
		my $a_sort		= join(',', $a->sorted_by);
		my $b_sort		= join(',', $b->sorted_by);
		
		if ($l->is_debug) {
			$l->debug('---------------------------');
			$l->debug('REQUESTED SORT in JOIN: ' . Dumper($req_sort));
			$l->debug('JOIN STREAM SORTED BY: ' . Dumper($a_sort));
			$l->debug('JOIN STREAM SORTED BY: ' . Dumper($b_sort));
		}
		my $actual_sort;
		if (substr( $a_sort, 0, length($req_sort) ) eq $req_sort) {
			$l->debug("first stream is already sorted. using it in the outer loop.");
		} elsif (substr( $b_sort, 0, length($req_sort) ) eq $req_sort) {
			$l->debug("second stream is already sorted. using it in the outer loop.");
			($a,$b)	= ($b,$a);
		} else {
			my $a_common	= join('!', $a_sort, $req_sort);
			my $b_common	= join('!', $b_sort, $req_sort);
			if ($a_common =~ qr[^([^!]+)[^!]*!\1]) {	# shared prefix between $a_sort and $req_sort?
				$l->debug("first stream is closely sorted ($1).");
			} elsif ($b_common =~ qr[^([^!]+)[^!]*!\1]) {	# shared prefix between $b_sort and $req_sort?
				$l->debug("second stream is closely sorted ($1).");
				($a,$b)	= ($b,$a);
			}
		}
		@join_sorted_by	= ($a->sorted_by, $b->sorted_by);
	}
	
	my $stream	= $self->nested_loop_join( $a, $b, %args );
	$l->debug("JOINED stream is sorted by: " . join(',', @join_sorted_by));
	$stream->{sorted_by}	= \@join_sorted_by;
	return $stream;
}

sub nested_loop_join {
	my $self	= shift;
	my $astream	= shift;
	my $bstream	= shift;
#	my $bridge	= shift;
	my %args	= @_;
	
# 	Carp::confess unless ($astream->isa('RDF::Trine::Iterator::Bindings'));
# 	Carp::confess unless ($bstream->isa('RDF::Trine::Iterator::Bindings'));
	my $l		= Log::Log4perl->get_logger("rdf.trine.iterator.bindings");
	
	################################################
	### BNODE MAP STUFF
	my $a_extra	= $astream->extra_result_data || {};
	my $b_extra	= $bstream->extra_result_data || {};
	my (%a_map, %b_map);
	foreach my $h (@{ $a_extra->{'bnode-map'} || [] }) {
		foreach my $id (keys %$h) {
			my @values	= @{ $h->{ $id } };
			push( @{ $a_map{ $id } }, @values );
		}
	}
	foreach my $h (@{ $b_extra->{'bnode-map'} || [] }) {
		foreach my $id (keys %$h) {
			my @values	= @{ $h->{ $id } };
			push( @{ $b_map{ $id } }, @values );
		}
	}
	my $a_map	= (%a_map) ? \%a_map : undef;
	my $b_map	= (%b_map) ? \%b_map : undef;
	################################################
	
	my @names	= RDF::Trine::_uniq( map { $_->binding_names() } ($astream, $bstream) );
	my $a		= $astream->project( @names );
	my $b		= $bstream->project( @names );
	
	
	my @data	= $b->get_all();
	no warnings 'uninitialized';
	
	my $inner_index;
	my $rowa;
	my $need_new_a	= 1;
	my $sub	= sub {
		OUTER: while (1) {
			if ($need_new_a) {
				$l->debug("### fetching new outer tuple");
				$rowa = $a->next;
				$inner_index	= 0;
				$need_new_a		= 0;
			}
			$l->debug("OUTER: " . Dumper($rowa));
			return undef unless ($rowa);
			LOOP: while ($inner_index <= $#data) {
				my $rowb	= $data[ $inner_index++ ];
				$l->debug("- INNER[ $inner_index ]: " . Dumper($rowb));
				$l->debug("[--JOIN--] " . join(' ', map { my $row = $_; '{' . join(', ', map { join('=', $_, ($row->{$_}) ? $row->{$_}->as_string : '(undef)') } (keys %$row)) . '}' } ($rowa, $rowb)));
				my %keysa	= map {$_=>1} (keys %$rowa);
				my @shared	= grep { $keysa{ $_ } } (keys %$rowb);
				foreach my $key (@shared) {
					my $val_a	= $rowa->{ $key };
					my $val_b	= $rowb->{ $key };
					my $defined	= 0;
					foreach my $n ($val_a, $val_b) {
						$defined++ if (defined($n));
					}
					if ($defined == 2) {
						my $equal	= $val_a->equal( $val_b );
						if (not $equal) {
							my $query 	= $args{ query };
							my $bridge	= $args{ bridge };
							if ($query and $bridge) {
								warn 'join values and bnode maps: ' . Dumper($val_a, $val_b, $a_map, $b_map) if ($a_map or $b_map);
								if ($a_map and $val_a->isa('RDF::Trine::Node::Blank')) {
									my $anames	= Set::Scalar->new( @{ $a_map{ $val_a->blank_identifier } } );
									my $bnames	= Set::Scalar->new( RDF::Query::Algebra::Service->_names_for_node( $val_b, $query, $bridge, {} ) );
									if (my $int = $anames->intersection( $bnames )) {
										warn "node equality based on $int";
										$equal	= 1;
									}
								} elsif ($b_map and $val_b->isa('RDF::Trine::Node::Blank')) {
									my $bnames	= Set::Scalar->new( @{ $b_map{ $val_b->blank_identifier } } );
									my $anames	= Set::Scalar->new( RDF::Query::Algebra::Service->_names_for_node( $val_a, $query, $bridge, {} ) );
									warn "anames: $anames\n";
									warn "bnames: $bnames\n";
									if (my $int = $anames->intersection( $bnames )) {
										warn "node equality based on $int";
										$equal	= 1;
									}
								}
							} else {
								if ($l->is_debug) {
									$l->logcluck("no query,bridge in args");
								}
							}
						}
						
						unless ($equal) {
							$l->debug("can't join because mismatch of $key (" . join(' <==> ', map {$_->as_string} ($val_a, $val_b)) . ")");
							next LOOP;
						}
					}
				}
				
				my $row	= { (map { $_ => $rowa->{$_} } grep { defined($rowa->{$_}) } keys %$rowa), (map { $_ => $rowb->{$_} } grep { defined($rowb->{$_}) } keys %$rowb) };
				if ($l->is_debug) {
					$l->debug("JOINED:");
					foreach my $key (keys %$row) {
						$l->debug("$key\t=> " . $row->{ $key }->as_string);
					}
				}
				return $row;
			}
			$need_new_a	= 1;
		}
	};
	
	my $args	= $astream->_args;
	return $astream->_new( $sub, 'bindings', \@names, %$args );
}


sub sorted_by {
	my $self	= shift;
	my $sorted	= $self->{sorted_by};
	return @$sorted;
}

sub binding_value_by_name {
	my $self	= shift;
	my $name	= shift;
	my $row		= ($self->open) ? $self->current : $self->next;
	if (exists( $row->{ $name } )) {
		return $row->{ $name };
	} else {
# 		warn "No variable named '$name' is present in query results.\n";
		return;
	}
}

sub binding_value {
	my $self	= shift;
	my $val		= shift;
	my @names	= $self->binding_names;
	return $self->binding_value_by_name( $names[ $val ] );
}


sub binding_values {
	my $self	= shift;
	my $row		= ($self->open) ? $self->current : $self->next;
	return @{ $row }{ $self->binding_names };
}


sub binding_names {
	my $self	= shift;
	my $names	= $self->{_names};
	return @$names;
}

sub binding_name {
	my $self	= shift;
	my $val		= shift;
	my $names	= $self->{_names};
	return $names->[ $val ];
}


sub bindings_count {
	my $self	= shift;
	my $names	= $self->{_names};
	return scalar( @$names ) if (scalar(@$names));
	return 0;
}

sub is_bindings {
	my $self			= shift;
	return 1;
}

sub as_json {
	my $self			= shift;
	my $max_result_size	= shift || 0;
	my $width			= $self->bindings_count;
# 	my $bridge			= $self->bridge;
	
	my @variables;
	for (my $i=0; $i < $width; $i++) {
		my $name	= $self->binding_name($i);
		push(@variables, $name) if $name;
	}
	
	my $count	= 0;
	my @sorted	= $self->sorted_by;
	my $order	= scalar(@sorted) ? JSON::true : JSON::false;
	my $dist	= $self->_args->{distinct} ? JSON::true : JSON::false;
	
	my $data	= {
					head	=> { vars => \@variables },
					results	=> { ordered => $order, distinct => $dist },
				};
	my @bindings;
	while (!$self->finished) {
		my %row;
		for (my $i = 0; $i < $width; $i++) {
			my $name		= $self->binding_name($i);
			my $value		= $self->binding_value($i);
			if (blessed($value)) {
				if (my ($k, $v) = format_node_json($value, $name)) {
					$row{ $k }		= $v;
				}
			}
		}
		
		push(@{ $data->{results}{bindings} }, \%row);
		last if ($max_result_size and ++$count >= $max_result_size);
	} continue { $self->next_result }
	
	return to_json( $data );
}

sub as_xml {
	my $self			= shift;
	my $max_result_size	= shift || 0;
	my $string	= '';
	open( my $fh, '>', \$string );
	$self->print_xml( $fh, $max_result_size );
	close($fh);
	return $string;
}

sub as_string {
	my $self			= shift;
	my $max_result_size	= shift || 0;
	my $rescount		= shift;
	my @names			= $self->binding_names;
	my $headers			= \@names;
	my @rows;
	my $count	= 0;
	while (my $row = $self->next) {
		push(@rows, [ map { blessed($_) ? RDF::Trine::Serializer::Turtle->node_as_concise_string($_) : '' } @{ $row }{ @names } ]);
		last if ($max_result_size and ++$count >= $max_result_size);
	}
	if (ref($rescount)) {
		$$rescount	= scalar(@rows);
	}
	
	my @rule			= qw(- +);
	my @headers			= (\q"| ");
	push(@headers, map { $_ => \q" | " } @$headers);
	pop	@headers;
	push @headers => (\q" |");
	
	if ('ARRAY' eq ref $rows[0]) {
		if (@$headers == @{ $rows[0] }) {
			my $table = Text::Table->new(@headers);
			$table->rule(@rule);
			$table->body_rule(@rule);
			$table->load(@rows);
		
			return join('',
					$table->rule(@rule),
					$table->title,
					$table->rule(@rule),
					map({ $table->body($_) } 0 .. @rows),
					$table->rule(@rule)
				);
		} else {
			die("make_table() rows must be an AoA with rows being same size as headers");
		}
	} else {
		return '';
	}
}

sub as_statements {
	my $self	= shift;
	my @names	= @_;
	my $sub		= sub {
		my $row	= $self->next;
		return undef unless (defined $row);
		my @values	= @{ $row }{ @names };
		my $statement	= (scalar(@values) == 3 or not(defined($values[3])))
						? RDF::Trine::Statement->new( @values[ 0 .. 2 ] )
						: RDF::Trine::Statement::Quad->new( @values );
		return $statement;
	};
	return RDF::Trine::Iterator::Graph->new( $sub )
}

sub print_xml {
	my $self			= shift;
	my $fh				= shift;
	my $max_result_size	= shift || 0;
	my $width			= $self->bindings_count;
	
	my @variables;
	for (my $i=0; $i < $width; $i++) {
		my $name	= $self->binding_name($i);
		push(@variables, $name) if $name;
	}
	
	no strict 'refs';
	print {$fh} <<"END";
<?xml version="1.0" encoding="utf-8"?>
<sparql xmlns="http://www.w3.org/2005/sparql-results#">
<head>
END
	
	my $t	= join("\n", map { qq(\t<variable name="$_"/>) } @variables);
	
	my $delay_output	= 0;
	my $delayed			= '';
	
	if ($self->extra_result_data) {
		$delay_output	= $fh;
		undef $fh;
		open( $fh, '>', \$delayed ) or die $!;
	} else {
		if ($t) {
			print {$fh} "${t}\n";
		}
	}
	
	print {$fh} <<"END";
</head>
<results>
END
	
	my $count	= 0;
	while (my $row = $self->next) {
		my @row;
		print {$fh} "\t\t<result>\n";
		for (my $i = 0; $i < $width; $i++) {
			my $name	= $self->binding_name($i);
			my $value	= $row->{ $name };
			print {$fh} "\t\t\t" . $self->format_node_xml($value, $name) . "\n";
		}
		print {$fh} "\t\t</result>\n";
		
		last if ($max_result_size and ++$count >= $max_result_size);
	}
	
	if ($delay_output) {
		my $extra = $self->extra_result_data;
		my $extraxml	= '';
		foreach my $tag (keys %$extra) {
			$extraxml	.= qq[<extra name="${tag}">\n];
			my $value	= $extra->{ $tag };
			foreach my $e (@$value) {
				foreach my $k (keys %$e) {
					my $v		= $e->{ $k };
					my @values	= @$v;
					foreach ($k, @values) {
						s/&/&amp;/g;
						s/</&lt;/g;
						s/"/&quot;/g;
					}
					$extraxml	.= qq[\t<extrakey id="$k">] . join(',', @values) . qq[</extrakey>\n];
				}
			}
			$extraxml	.= "</extra>\n";
		}
		my $u	= URI->new('data:');
		$u->media_type('text/xml');
		$u->data($extraxml);
		my $uri	= "$u";
		$uri	=~ s/&/&amp;/g;
		$uri	=~ s/</&lt;/g;
		$uri	=~ s/'/&apos;/g;
		$uri	=~ s/"/&quot;/g;
		
		$fh		= $delay_output;
		print {$fh} "${t}\n";
		print {$fh} qq[\t<link href="$uri" />\n];
		print {$fh} $delayed;
	}
	
	print {$fh} "</results>\n";
	print {$fh} "</sparql>\n";
}

sub format_node_json ($$$) {
# 	my $bridge	= shift;
# 	return undef unless ($bridge);
	
	my $node	= shift;
	my $name	= shift;
	my $node_label;
	
	if(!defined $node) {
		return;
	} elsif ($node->isa('RDF::Trine::Node::Resource')) {
		$node_label	= $node->uri_value;
		return $name => { type => 'uri', value => $node_label };
	} elsif ($node->isa('RDF::Trine::Node::Literal')) {
		$node_label	= $node->literal_value;
		return $name => { type => 'literal', value => $node_label };
	} elsif ($node->isa('RDF::Trine::Node::Blank')) {
		$node_label	= $node->blank_identifier;
		return $name => { type => 'bnode', value => $node_label };
	} else {
		return;
	}
}

sub construct_args {
	my $self	= shift;
	my $type	= $self->type;
	my @names	= $self->binding_names;
	my $args	= $self->_args || {};
	return ($type, \@names, %{ $args });
}


1;

__END__