RDF::Trine::Store::SPARQL - RDF Store proxy for a SPARQL endpoint


RDF-Trine documentation Contained in the RDF-Trine distribution.

Index


Code Index:

NAME

Top

RDF::Trine::Store::SPARQL - RDF Store proxy for a SPARQL endpoint

VERSION

Top

This document describes RDF::Trine::Store::SPARQL version 0.135

SYNOPSIS

Top

 use RDF::Trine::Store::SPARQL;

DESCRIPTION

Top

RDF::Trine::Store::SPARQL provides a RDF::Trine::Store API to interact with a remote SPARQL endpoint.

METHODS

Top

Beyond the methods documented below, this class inherits methods from the RDF::Trine::Store class.

new ( $url )

Returns a new storage object that will act as a proxy for the SPARQL endpoint accessible via the supplied $url.

new_with_config ( $hashref )

Returns a new storage object configured with a hashref with certain keys as arguments.

The storetype key must be SPARQL for this backend.

The following key must also be used:

url

The URL of the remote endpoint.

new_with_config ( \%config )

Returns a new RDF::Trine::Store object based on the supplied configuration hashref.

get_statements ( $subject, $predicate, $object [, $context] )

Returns a stream object of all statements matching the specified subject, predicate and objects. Any of the arguments may be undef to match any value.

get_pattern ( $bgp [, $context] )

Returns an iterator object of all bindings matching the specified graph pattern.

get_contexts

Returns an RDF::Trine::Iterator over the RDF::Trine::Node objects comprising the set of contexts of the stored quads.

add_statement ( $statement [, $context] )

Adds the specified $statement to the underlying model.

remove_statement ( $statement [, $context])

Removes the specified $statement from the underlying model.

remove_statements ( $subject, $predicate, $object [, $context])

Removes the specified $statement from the underlying model.

count_statements ( $subject, $predicate, $object, $context )

Returns a count of all the statements matching the specified subject, predicate, object, and context. Any of the arguments may be undef to match any value.

size

Returns the number of statements in the store.

supports ( [ $feature ] )

If $feature is specified, returns true if the feature is supported by the store, false otherwise. If $feature is not specified, returns a list of supported features.

get_sparql ( $sparql )

Returns an iterator object of all bindings matching the specified SPARQL query.

BUGS

Top

Please report any bugs or feature requests to <gwilliams@cpan.org>.

AUTHOR

Top

Gregory Todd Williams <gwilliams@cpan.org>

COPYRIGHT

Top


RDF-Trine documentation Contained in the RDF-Trine distribution.
package RDF::Trine::Store::SPARQL;

use strict;
use warnings;
no warnings 'redefine';
use base qw(RDF::Trine::Store);

use Set::Scalar;
use URI::Escape;
use Data::Dumper;
use List::Util qw(first);
use List::MoreUtils qw(any mesh);
use Scalar::Util qw(refaddr reftype blessed);

use RDF::Trine::Error qw(:try);

######################################################################

my @pos_names;
our $VERSION;
BEGIN {
	$VERSION	= "0.135";
	my $class	= __PACKAGE__;
	$RDF::Trine::Store::STORE_CLASSES{ $class }	= $VERSION;
	@pos_names	= qw(subject predicate object context);
}

######################################################################

sub new {
	my $class	= shift;
	my $url		= shift;
	my $u = LWP::UserAgent->new( agent => "RDF::Trine/${RDF::Trine::VERSION}" );
	$u->default_headers->push_header( 'Accept' => "application/sparql-results+xml;q=0.9,application/rdf+xml;q=0.5,text/turtle;q=0.7,text/xml" );
	
	my $self	= bless({
		ua		=> $u,
		url		=> $url,
	}, $class);
	return $self;
}

sub _new_with_string {
	my $class	= shift;
	my $config	= shift;
	return $class->new( $config );
}

sub new_with_config {
	my $proto	= shift;
	my $config	= shift;
	$config->{storetype}	= 'SPARQL';
	return $proto->SUPER::new_with_config( $config );
}

sub _new_with_config {
	my $class	= shift;
	my $config	= shift;
	return $class->new( $config->{url} );
}

sub _config_meta {
	return {
		required_keys	=> [qw(url)],
		fields			=> {
			url	=> { description => 'Endpoint URL', type => 'string' },
		}
	}
}


sub get_statements {
	my $self	= shift;
	my @nodes	= @_[0..3];
	my $bound	= 0;
	my %bound;
	
	my $use_quad	= 0;
	if (scalar(@_) >= 4) {
		my $g	= $nodes[3];
		if (blessed($g) and not($g->is_variable) and not($g->is_nil)) {
			$use_quad	= 1;
			$bound++;
			$bound{ 3 }	= $g;
		}
	}
	
	my @var_map	= qw(s p o g);
	my %var_map	= map { $var_map[$_] => $_ } (0 .. $#var_map);
	my @node_map;
	foreach my $i (0 .. $#nodes) {
		if (not(blessed($nodes[$i])) or $nodes[$i]->is_variable) {
			$nodes[$i]	= RDF::Trine::Node::Variable->new( $var_map[ $i ] );
		}
	}
	
	my $node_count	= ($use_quad) ? 4 : 3;
	my $st_class	= ($use_quad) ? 'RDF::Trine::Statement::Quad' : 'RDF::Trine::Statement';
	my @triple	= @nodes[ 0..2 ];
	my $iter;
	if ($use_quad) {
		my @vars	= grep { $_->is_variable } @nodes;
		my $names	= join(' ', map { '?' . $_->name } @vars);
		my $nodes	= join(' ', map { ($_->is_variable) ? '?' . $_->name : $_->as_ntriples } @triple);
		my $g		= $nodes[3]->is_variable ? '?g' : $nodes[3]->as_ntriples;
		$iter	= $self->get_sparql( <<"END" );
SELECT $names WHERE {
	GRAPH $g {
		$nodes
	}
}
END
	} else {
		my @vars	= grep { $_->is_variable } @triple;
		my $names	= join(' ', map { '?' . $_->name } @vars);
		my $nodes	= join(' ', map { ($_->is_variable) ? '?' . $_->name : $_->as_ntriples } @triple);
		$iter	= $self->get_sparql( <<"END" );
SELECT $names WHERE { $nodes }
END
	}
	my $sub		= sub {
		my $row	= $iter->next;
		return unless $row;
		my @triple;
		foreach my $i (0 .. ($node_count-1)) {
			if ($nodes[$i]->is_variable) {
				$triple[$i]	= $row->{ $nodes[$i]->name };
			} else {
				$triple[$i]	= $nodes[$i];
			}
		}
		my $triple	= $st_class->new( @triple );
		return $triple;
	};
	return RDF::Trine::Iterator::Graph->new( $sub );
}

sub get_pattern {
	my $self	= shift;
	my $bgp		= shift;
	my $context	= shift;
	my @args	= @_;
	my %args	= @args;
	
	if ($bgp->isa('RDF::Trine::Statement')) {
		$bgp	= RDF::Trine::Pattern->new($bgp);
	}
	
	my %iter_args;
	my @triples	= grep { $_->type eq 'TRIPLE' } $bgp->triples;
	my @quads	= grep { $_->type eq 'QUAD' } $bgp->triples;
	
	my @tripless;
	foreach my $t (@triples) {
		my @nodes	= $t->nodes;
		my @nodess;
		foreach my $n (@nodes) {
			push(@nodess, ($n->is_variable ? '?' . $n->name : $n->as_ntriples));
		}
		push(@tripless, join(' ', @nodess) . ' .');
	}
	my $triples	= join("\n\t", @tripless);
	my $quads	= '';
	if (@quads) {
		return $self->SUPER::get_pattern( $bgp, $context, @args );
		throw RDF::Trine::Error::UnimplementedError -text => "SPARQL get_pattern quad support not implemented";
	}
	
	my $sparql	= <<"END";
SELECT * WHERE {
	$triples
	$quads
}
END
	if (my $o = delete $args{orderby}) {
		my @order;
		while (@$o) {
			my ($k,$order)	= splice(@$o,0,2,());
			push(@order, "${order}(?$k)");
		}
		if (@order) {
			$sparql	.= "ORDER BY " . join(' ', @order);
		}
	}
	
	my $iter	= $self->get_sparql( $sparql );
	return $iter;
}

sub get_contexts {
	my $self	= shift;
	my $sparql	= 'SELECT DISTINCT ?g WHERE { GRAPH ?g {} }';
	my $iter	= $self->get_sparql( $sparql );
	my $sub	= sub {
		my $row	= $iter->next;
		return unless $row;
		my $g	= $row->{g};
		return $g;
	};
	return RDF::Trine::Iterator->new( $sub );
}

sub add_statement {
	my $self	= shift;
	my $st		= shift;
	my $context	= shift;
	unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
		throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to add_statement";
	}
	
	if ($self->_bulk_ops) {
		push(@{ $self->{ ops } }, ['_add_statements', $st, $context]);
	} else {
		my $sparql	= $self->_add_statements_sparql( [ $st, $context ] );
		my $iter	= $self->_get_post_iterator( $sparql );
		my $row		= $iter->next;
	}
	return;
}

sub _add_statements_sparql {
	my $self	= shift;
	my @parts;
	foreach my $op (@_) {
		my $st		= $op->[0];
		my $context	= $op->[1];
		if ($st->isa('RDF::Trine::Statement::Quad')) {
			push(@parts, 'GRAPH ' . $st->context->as_ntriples . ' { ' . join(' ', map { $_->as_ntriples } ($st->nodes)[0..2]) . ' }');
		} else {
			push(@parts, join(' ', map { $_->as_ntriples } $st->nodes) . ' .');
		}
	}
	my $sparql	= sprintf( 'INSERT DATA { %s }', join("\n\t", @parts) );
	return $sparql;
}

sub remove_statement {
	my $self	= shift;
	my $st		= shift;
	my $context	= shift;
	
	unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
		throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to remove_statement";
	}
	
	if ($self->_bulk_ops) {
		push(@{ $self->{ ops } }, ['_remove_statements', $st, $context]);
	} else {
		my $sparql	= $self->_remove_statements_sparql( [ $st, $context ] );
		my $iter	= $self->_get_post_iterator( $sparql );
		my $row		= $iter->next;
	}
	return;
}

sub _remove_statements_sparql {
	my $self	= shift;
	my @parts;
	foreach my $op (@_) {
		my $st		= $op->[0];
		my $context	= $op->[1];
		if ($st->isa('RDF::Trine::Statement::Quad')) {
			push(@parts, 'GRAPH ' . $st->context->as_ntriples . ' { ' . join(' ', map { $_->as_ntriples } ($st->nodes)[0..2]) . ' }');
		} else {
			push(@parts, join(' ', map { $_->as_ntriples } $st->nodes) . ' .');
		}
	}
	my $sparql	= sprintf( 'DELETE DATA { %s }', join("\n\t", @parts) );
	return $sparql;
}

sub remove_statements {
	my $self	= shift;
	my $st		= shift;
	my $context	= shift;
	
	unless (blessed($st) and $st->isa('RDF::Trine::Statement')) {
		throw RDF::Trine::Error::MethodInvocationError -text => "Not a valid statement object passed to remove_statements";
	}
	
	if ($self->_bulk_ops) {
		push(@{ $self->{ ops } }, ['_remove_statement_patterns', $st, $context]);
	} else {
		my $sparql	= $self->_remove_statement_patterns_sparql( [ $st, $context ] );
		my $iter	= $self->_get_post_iterator( $sparql );
		my $row		= $iter->next;
	}
	return;
}

sub _remove_statement_patterns_sparql {
	my $self	= shift;
	my @parts;
	foreach my $op (@_) {
		my $st		= $op->[0];
		my $context	= $op->[1];
		my $sparql;
		if ($st->isa('RDF::Trine::Statement::Quad')) {
			push(@parts, 'GRAPH ' . $st->context->as_ntriples . ' { ' . join(' ', map { $_->is_variable ? '?' . $_->name : $_->as_ntriples } ($st->nodes)[0..2]) . ' }');
		} else {
			push(@parts, join(' ', map { $_->is_variable ? '?' . $_->name : $_->as_ntriples } $st->nodes) . ' .');
		}
		
	}
	my $sparql	= sprintf( 'DELETE WHERE { %s }', join("\n\t", @parts));
	return $sparql;
}

sub count_statements {
	my $self	= shift;
	my @nodes	= @_[0..3];
	my $bound	= 0;
	my %bound;
	
	my $use_quad	= 0;
	if (scalar(@_) >= 4) {
		$use_quad	= 1;
# 		warn "count statements with quad" if ($::debug);
		my $g	= $nodes[3];
		if (blessed($g) and not($g->is_variable)) {
			$bound++;
			$bound{ 3 }	= $g;
		}
	}
	
	my $sparql;
	if ($use_quad) {
		my $nodes;
		if ($nodes[3]->isa('RDF::Trine::Node::Variable')) {
			my $triple	= join(' ', map { $_->is_variable ? '?' . $_->name : $_->as_ntriples } @nodes[0..2]);
			$nodes		= "GRAPH ?__rt_graph { $triple }";
		} elsif ($nodes[3]->isa('RDF::Trine::Node::Nil')) {
			$nodes	= join(' ', map { $_->is_variable ? '?' . $_->name : $_->as_ntriples } @nodes[0..2]);
		} else {
			my $triple	= join(' ', map { $_->is_variable ? '?' . $_->name : $_->as_ntriples } @nodes[0..2]);
			my $graph	= $nodes[3]->is_variable ? '?' . $nodes[3]->name : $nodes[3]->as_ntriples;
			$nodes		= "GRAPH $graph { $triple }";
		}
		$sparql	= "SELECT (COUNT(*) AS ?count) WHERE { $nodes }";
	} else {
		$sparql	= "SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }";
	}
	my $iter	= $self->get_sparql( $sparql );
	my $row		= $iter->next;
	my $count	= $row->{count};
	return unless ($count);
	return $count->literal_value;
	
# 	
# 	
# 	
# 	
# 	
# 	# XXX try to send a COUNT() query and fall back if it fails
# 	my $iter	= $self->get_statements( @_ );
# 	my $count	= 0;
# 	while (my $st = $iter->next) {
# 		$count++;
# 	}
# 	return $count;
}

sub size {
	my $self	= shift;
	return $self->count_statements( undef, undef, undef, undef );
}

sub supports {
	my $self	= shift;
	my %features	= map { $_ => 1 } (
		'http://www.w3.org/ns/sparql-service-description#SPARQL10Query',
		'http://www.w3.org/ns/sparql-service-description#SPARQL11Query',
		'http://www.w3.org/ns/sparql-service-description#SPARQL11Update',
	);
	if (@_) {
		my $f	= shift;
		return $features{ $f };
	} else {
		return keys %features;
	}
}

sub get_sparql {
	my $self	= shift;
	my $sparql	= shift;
	my $handler	= RDF::Trine::Iterator::SAXHandler->new();
	my $p		= XML::SAX::ParserFactory->parser(Handler => $handler);
	my $ua		= $self->{ua};
	
# 	warn $sparql;
	
	my $url			= $self->{url} . '?query=' . uri_escape($sparql);
	my $response	= $ua->get( $url );
	if ($response->is_success) {
		$p->parse_string( $response->content );
		return $handler->iterator;
	} else {
		my $status		= $response->status_line;
		my $endpoint	= $self->{url};
#		warn "url: $url\n";
#		warn $sparql;
		throw RDF::Trine::Error -text => "Error making remote SPARQL call to endpoint $endpoint ($status)";
	}
}

sub _get_post_iterator {
	my $self	= shift;
	my $sparql	= shift;
	my $handler	= RDF::Trine::Iterator::SAXHandler->new();
	my $p		= XML::SAX::ParserFactory->parser(Handler => $handler);
	my $ua		= $self->{ua};
	
# 	warn $sparql;
	
	my $url			= $self->{url};
	my $response	= $ua->post( $url, query => $sparql );
	if ($response->is_success) {
		$p->parse_string( $response->content );
		return $handler->iterator;
	} else {
		my $status		= $response->status_line;
		my $endpoint	= $self->{url};
#		warn "url: $url\n";
#		warn $sparql;
		throw RDF::Trine::Error -text => "Error making remote SPARQL call to endpoint $endpoint ($status)";
	}
}

sub _bulk_ops {
	my $self	= shift;
	return $self->{BulkOps};
}

sub _begin_bulk_ops {
	my $self			= shift;
	$self->{BulkOps}	= 1;
}

sub _end_bulk_ops {
	my $self			= shift;
	if (scalar(@{ $self->{ ops } || []})) {
		my @ops	= splice(@{ $self->{ ops } });
		my @aggops	= $self->_group_bulk_ops( @ops );
		my @sparql;
		foreach my $aggop (@aggops) {
			my ($type, @ops)	= @$aggop;
			my $method	= "${type}_sparql";
			push(@sparql, $self->$method( @ops ));
		}
		my $sparql	= join(";\n", @sparql);
		my $iter	= $self->_get_post_iterator( $sparql );
		my $row		= $iter->next;
	}
	$self->{BulkOps}	= 0;
}

sub _group_bulk_ops {
	my $self	= shift;
	return unless (scalar(@_));
	my @ops		= @_;
	my @bulkops;
	
	my $op		= shift(@ops);
	my $type	= $op->[0];
	push(@bulkops, [$type, [ @{$op}[1 .. $#{ $op }] ]]);
	while (scalar(@ops)) {
		my $op	= shift(@ops);
		my $type	= $op->[0];
		if ($op->[0] eq $bulkops[ $#bulkops ][0]) {
			push( @{ $bulkops[ $#bulkops ][1] }, [ @{$op}[1 .. $#{ $op }] ] );
		} else {
			push(@bulkops, [$type, [ @{$op}[1 .. $#{ $op }] ]]);
		}
	}
	
	return @bulkops;
}

1;

__END__