| UR documentation | Contained in the UR distribution. |
UR::DataSource::RDBMS - Abstract base class for RDBMS-type data sources
This class implements the interface UR uses to query RDBMS databases with DBI. It encapsulates the system's knowledge of classes/properties relation to tables/columns, and how to generate SQL to create, retrieve, update and delete table rows that represent object instances.
UR::DataSource, UR::DataSource::Oracle, UR::DataSource::Pg, UR::DataSource::SQLite UR::DataSource::MySQL
| UR documentation | Contained in the UR distribution. |
package UR::DataSource::RDBMS; use strict; use warnings; use Scalar::Util; use File::Basename; require UR; UR::Object::Type->define( class_name => 'UR::DataSource::RDBMS', is => ['UR::DataSource','UR::Singleton'], english_name => 'ur datasource rdbms', is_abstract => 1, properties => [ server => { is => 'String', doc => 'the "server" part of the DBI connect string' }, login => { is => 'String', doc => 'user name to connect as', is_optional => 1 }, auth => { is => 'String', doc => 'authentication for the given user', is_optional => 1 }, owner => { is => 'String', doc => 'Schema/owner name to connect to', is_optional => 1 }, _all_dbh_hashref => { type => 'HASH', len => undef, is_transient => 1 }, _default_dbh => { type => 'DBI::db', len => undef, is_transient => 1 }, _last_savepoint => { type => 'String', len => undef, is_transient => 1 }, ], doc => 'A logical DBI-based database, independent of prod/dev/testing considerations or login details.', ); sub _resolve_ddl_for_table { my ($self,$table) = @_; my $table_name = $table->table_name; my $ddl; if ($table->{db_committed}) { #$ddl = "alter table $table_name "; } else { my @columns = $table->columns; for my $column (@columns) { next unless $column->last_object_revision eq '-'; my $column_name = $column->column_name; $ddl = 'create table ' . $table_name . "(\n" unless defined $ddl; $ddl .= "\t$column_name " . $column->data_type; if ($column->data_length) { $ddl .= '(' . $column->data_length . ')'; } $ddl .= ",\n" unless $column eq $columns[-1]; } $ddl .= "\n)" if defined $ddl; } return $ddl; } sub generate_schema_for_class_meta { my ($self,$class_meta,$temp) = @_; # We now support on-the-fly database introspection # this gets called with the temp flag when _sync_database realizes # it knows nothing about the table in question. # We basically presume the schema is the one we would have generated if # TODO: We still need to presume foreign keys are constrained. my $method = ($temp ? '__define__' : 'create'); my @defined; for my $p ($class_meta->parent_class_metas) { next if $p->class_name eq 'UR::Object'; next unless $p->class_name->isa("UR::Object"); my @new = $self->generate_schema_for_class_meta($p); push @defined, @new; } my %properties_with_expected_columns = map { $_->column_name => $_ } grep { $_->column_name } $class_meta->direct_property_metas; my $table_name = $class_meta->table_name; unless ($table_name) { if (my @column_names = keys %properties_with_expected_columns) { die "class " . $class_meta->__display_name__ . " has no table_name specified for columns @column_names!"; } else { # no table, but no storable columns. all ok. return; } } my %expected_constraints = map { $_->column_name => $_ } grep { $_->class_meta eq $class_meta } map { $class_meta->property_meta_for_name($_) } map { @{ $_->id_by } } grep { $_->id_by } $class_meta->all_property_metas; ## print "handling table $table_name\n"; my $t = '-'; my $table = $self->refresh_database_metadata_for_table_name($table_name); my %existing_columns; if ($table) { ## print "found table $table_name\n"; %existing_columns = map { $_->column_name => $_ } grep { $_->column_name } $table->columns; } else { ## print "adding table $table_name\n"; $table = UR::DataSource::RDBMS::Table->$method( table_name => $table_name, data_source => $self->id, owner => $self->owner, remarks => $class_meta->doc, er_type => 'entity', last_object_revision => $t, table_type => ($table_name =~ /\s/ ? 'view' : 'table'), ); die unless $table; push @defined, $table; } my ($update,$add,$extra) = _intersect_lists([keys %properties_with_expected_columns],[keys %existing_columns]); for my $column_name (@$extra) { $self->warning_message("unused table column: $table_name.$column_name\n"); } for my $column_name (@$add) { my $property = $properties_with_expected_columns{$column_name}; print "adding column $column_name\n"; my $column = UR::DataSource::RDBMS::TableColumn->$method( column_name => $column_name, table_name => $table->table_name, data_source => $table->data_source, namespace => $table->namespace, owner => $table->owner, data_type => $self->object_to_db_type($property->data_type) || 'Text', data_length => $property->data_length, nullable => $property->is_optional, remarks => $property->doc, last_object_revision => $t, ); push @defined, $column; } for my $column_name (@$update) { my $property = $properties_with_expected_columns{$column_name}; my $column = $existing_columns{$column_name}; ##print "updating column $column_name with data from property " . $property->property_name . "\n"; if ($column->data_type) { $column->data_type($self->object_to_db_type($property->data_type)) if $property->data_type; } else { $column->data_type($self->object_to_db_type($property->data_type) || 'Text'); } $column->data_length($property->data_length); $column->nullable($property->is_optional); $column->remarks($property->doc); $column->last_object_revision($t); } # handle missing meta datasource on the fly... if (@defined) { my $ns = $class_meta->namespace; my $exists = UR::Object::Type->get($ns . "::DataSource::Meta"); unless ($exists) { UR::DataSource::Meta->generate_for_namespace($ns); } } my $ddl = $self->_resolve_ddl_for_table($table); ##print "DDL2: $ddl\n"; $t = UR::Time->now; if (defined($ddl)) { my $dbh = $table->data_source->get_default_handle; $dbh->do($ddl) or die "Failed to modify the database schema!"; for my $o ($table, $table->columns) { $o->last_object_revision($t); } } return @defined; } # why isn't something like this in List::Util? sub _intersect_lists { my ($m,$n) = @_; my %shared; my %monly; my %nonly; @monly{@$m} = @$m; for my $v (@$n) { if ($monly{$v}) { $shared{$v} = delete $monly{$v}; } else{ $nonly{$v} = $v; } } return ( [ values %shared ], [ values %monly ], [ values %nonly ], ); } # override in architecture-oriented subclasses sub object_to_db_type { my ($self, $object_type) = @_; my $db_type = $object_type; # ... return $db_type; } # override in architecture-oriented subclasses sub db_to_object_type { my ($self, $db_type) = @_; my $object_type = $db_type; # ... return $object_type; } # FIXME - shouldn't this be a property of the class instead of a method? sub does_support_joins { 1 } sub get_class_meta_for_table { my $self = shift; my $table = shift; my $table_name = $table->table_name; return $self->get_class_meta_for_table_name($table_name); } sub get_class_meta_for_table_name { my($self,$table_name) = @_; # There is an unique constraint on classes, but only those which use # tables in an RDBMS, which dicates that there can be only two for # a given table in a given data source: one for the ghost and one # for the regular entity. We can't just fix this with a unique constraint # since classes with a null data source would be lost in some queries. my @class_meta = grep { not $_->class_name->isa("UR::Object::Ghost") } UR::Object::Type->get( table_name => $table_name, data_source => $self->class, ); unless (@class_meta) { # This will load every class in the namespace on the first execution :( #$DB::single = 1; @class_meta = grep { not $_->class_name->isa("UR::Object::Ghost") } UR::Object::Type->get( table_name => $table_name, data_source => $self->class, ); } $self->context_return(@class_meta); } sub dbi_data_source_name { my $self = shift->_singleton_object; my $driver = $self->driver; my $server = $self->server; unless ($driver) { Carp::confess("Cannot resolve a dbi_data_source_name with an undefined driver()"); } unless ($server) { Carp::confess("Cannot resolve a dbi_data_source_name with an undefined server()"); } return 'dbi:' . $driver . ':' . $server; } sub get_default_handle { my $self = shift->_singleton_object; my $dbh = $self->_default_dbh; unless ($dbh && $dbh->{Active}) { $dbh = $self->create_dbh(); $self->_default_dbh($dbh); } return $dbh; } *get_default_dbh = \&get_default_handle; sub has_default_handle { my $self = shift->_singleton_object; return 1 if $self->_default_dbh; return; } *has_default_dbh = \&has_default_handle; sub disconnect_default_dbh { my $self = shift->_singleton_object; my $dbh = $self->_default_dbh; unless ($dbh) { Carp::cluck("Cannot disconnect. Not connected!"); return; } $dbh->disconnect; $self->_default_dbh(undef); return $dbh; } sub set_all_dbh_to_inactive_destroy { my $self = shift->_singleton_object; my $dbhs = $self->_all_dbh_hashref; for my $k (keys %$dbhs) { $dbhs->{$k}->{InactiveDestroy} = 1; delete $dbhs->{$k}; } my $dbh = $self->_default_dbh; if ($dbh) { $dbh->disconnect; $self->_default_dbh(undef); } return 1; } sub get_for_dbh { my $class = shift; my $dbh = shift; my $ds_name = $dbh->{"private_UR::DataSource::RDBMS_name"}; return unless($ds_name); my $ds = UR::DataSource->get($ds_name); return $ds; } sub has_changes_in_base_context { shift->has_default_dbh; # TODO: actually check, as this is fairly conservative # If used for switching contexts, we'd need to safely rollback any transactions first. } sub _dbi_connect_args { my $self = shift; my @connection; $connection[0] = $self->dbi_data_source_name; $connection[1] = $self->login; $connection[2] = $self->auth; $connection[3] = { AutoCommit => 0, RaiseError => 0 }; return @connection; } sub create_dbh { my $self = shift->_singleton_object; # get connection information my @connection = $self->_dbi_connect_args(); # connect my $dbh = UR::DBI->connect(@connection); $dbh or Carp::confess("Failed to connect to the database!\n" . UR::DBI->errstr . "\n"); # used for reverse lookups $dbh->{'private_UR::DataSource::RDBMS_name'} = $self->class; # this method may be implemented in subclasses to do extra initialization if ($self->can("_init_created_dbh")) { unless ($self->_init_created_dbh($dbh)) { $dbh->disconnect; die "Failed to initialize new database connection!\n" . $self->error_message . "\n"; } } # store the handle in a hash, since it's not a UR::Object my $all_dbh_hashref = $self->_all_dbh_hashref; unless ($all_dbh_hashref) { $all_dbh_hashref = {}; $self->_all_dbh_hashref($all_dbh_hashref); } $all_dbh_hashref->{$dbh} = $dbh; Scalar::Util::weaken($all_dbh_hashref->{$dbh}); $self->__signal_change__("connect"); return $dbh; } sub _init_created_dbh { # override in sub-classes 1; } # The default is to ignore no tables, but derived classes # will probably override this sub _ignore_table { 0; } sub _get_table_names_from_data_dictionary { my $self = shift->_singleton_object; if (@_) { Carp::confess("get_tables does not currently take filters! FIXME."); } my $dbh = $self->get_default_dbh; my $owner = $self->owner; # FIXME This will fix the immediate problem of getting classes to be created out of # views. We still need to somehow mark the resulting class as read-only my $sth = $dbh->table_info("%", $owner, "%", "TABLE,VIEW"); my $table_name; $sth->bind_col(3,\$table_name); my @names; while ($sth->fetch) { next if $self->_ignore_table($table_name); $table_name =~ s/"|'//g; # Postgres puts quotes around entities that look like keywords push @names, $table_name; } return @names; } # A wrapper for DBI's table_info() since the DBD implementations of them # aren't always exactly what we need in other places in the system. Other # subclasses can override it to get custom behavior sub get_table_details_from_data_dictionary { return shift->_get_whatever_details_from_data_dictionary('table_info',@_); } sub _get_whatever_details_from_data_dictionary { my $self = shift; my $method = shift; my $dbh = $self->get_default_dbh(); return unless $dbh; return $dbh->$method(@_); } sub get_column_details_from_data_dictionary { return shift->_get_whatever_details_from_data_dictionary('column_info',@_); } sub get_foreign_key_details_from_data_dictionary { return shift->_get_whatever_details_from_data_dictionary('foreign_key_info',@_); } sub get_primary_key_details_from_data_dictionary { return shift->_get_whatever_details_from_data_dictionary('primary_key_info',@_); } sub get_table_names { map { $_->table_name } shift->get_tables(@_); } sub get_tables { my $self = shift; #my $class = shift->_singleton_class_name; #return UR::DataSource::RDBMS::Table->get(data_source_id => $class); my $ds_id; if (ref $self) { if ($self->can('id')) { $ds_id = $self->id; } else { $ds_id = ref $self; } } else { $ds_id = $self; } return UR::DataSource::RDBMS::Table->get(data_source => $ds_id); } # TODO: make "env" an optional characteristic of a class attribute # for all of the places we do this crap... sub access_level { my $self = shift; my $env = $self->_method2env("access_level"); if (@_) { if ($self->has_default_dbh) { Carp::confess("Cannot change the db access level for $self while connected!"); } $ENV{$env} = lc(shift); } else { $ENV{$env} ||= "ro"; } return $ENV{$env}; } sub _method2env { my $class = shift; my $method = shift; unless ($method =~ /^(.*)::([^\:]+)$/) { $class = ref($class) if ref($class); $method = $class . "::" . $method; } $method =~ s/::/__/g; return $method; } sub resolve_class_name_for_table_name { my $self = shift->_singleton_class_name; my $table_name = shift; my $relation_type = shift; # Should be 'TABLE' or 'VIEW' # When a table_name conflicts with a reserved word, it ends in an underscore. $table_name =~ s/_$//; my $namespace = $self->get_namespace; my $vocabulary = $namespace->get_vocabulary; my @words; $vocabulary = 'UR::Vocabulary' unless eval { $vocabulary->__meta__ }; if ($vocabulary) { @words = map { $vocabulary->convert_to_title_case($_) } map { $vocabulary->plural_to_singular($_) } map { lc($_) } split("_",$table_name); } else { @words = map { ucfirst(lc($_)) } split("_",$table_name); } if ($self->can('_resolve_class_name_for_table_name_fixups')) { @words = $self->_resolve_class_name_for_table_name_fixups(@words); } my $class_name; my $addl; if ($relation_type && $relation_type =~ m/view/i) { $addl = 'View::'; } else { # Should just be for tables, temp tables, etc $addl = ''; } $class_name = $namespace . "::" . $addl . join("",@words); return $class_name; } sub resolve_type_name_for_table_name { my $self = shift->_singleton_class_name; my $table_name = shift; my $namespace = $self->get_namespace; my $vocabulary = $namespace->get_vocabulary; $vocabulary = 'UR::Vocabulary' unless eval { $vocabulary->__meta__ }; my $vocab_obj = eval { $vocabulary->__meta__ }; my @words = ( ( map { $vocabulary->plural_to_singular($_) } map { lc($_) } split("_",$table_name) ) ); my $type_name = join(" ",@words); return $type_name; } sub resolve_property_name_for_column_name { my $self = shift->_singleton_class_name; my $column_name = shift; my @words = map { lc($_) } split("_",$column_name); my $type_name = join("_",@words); return $type_name; } sub resolve_attribute_name_for_column_name { my $self = shift->_singleton_class_name; my $column_name = shift; my @words = map { lc($_) } split("_",$column_name); my $type_name = join(" ",@words); return $type_name; } sub can_savepoint { my $class = ref($_[0]); die "Class $class didn't supply can_savepoint()"; } sub set_savepoint { my $class = ref($_[0]); die "Class $class didn't supply set_savepoint, but can_savepoint is true"; } sub rollback_to_savepoint { my $class = ref($_[0]); die "Class $class didn't supply rollback_to_savepoint, but can_savepoint is true"; } sub refresh_database_metadata_for_table_name { my ($self,$table_name) = @_; my $data_source = $self; my @column_objects; my @all_constraints; # this must be on or before the actual data dictionary queries my $revision_time = UR::Time->now(); # TABLE my $table_sth = $data_source->get_table_details_from_data_dictionary('%', $data_source->owner, $table_name, "TABLE,VIEW"); my $table_data = $table_sth->fetchrow_hashref(); unless ($table_data && %$table_data) { $self->error_message("No data for table $table_name in data source $data_source."); return; } my $data_source_id = $data_source->id; my $table_object = UR::DataSource::RDBMS::Table->get(data_source => $data_source_id, table_name => $table_name); if ($table_object) { # Already exists, update the existing entry # Instead of deleting and recreating the table object (the old way), # modify its attributes in-place. The name can't change but all the other # stuff might. $table_object->table_type($table_data->{TABLE_TYPE}); $table_object->owner($table_data->{TABLE_SCHEM}); $table_object->data_source($data_source->class); $table_object->remarks($table_data->{REMARKS}); $table_object->last_object_revision($revision_time) if ($table_object->__changes__()); } else { # Create a brand new one from scratch $table_object = UR::DataSource::RDBMS::Table->create( table_name => $table_name, table_type => $table_data->{TABLE_TYPE}, owner => $table_data->{TABLE_SCHEM}, data_source => $data_source_id, remarks => $table_data->{REMARKS}, last_object_revision => $revision_time, ); unless ($table_object) { Carp::confess("Failed to get/create table object for $table_name"); } } # COLUMNS # mysql databases seem to require you to actually put in the database name in the first arg my $db_name = ($data_source->can('db_name')) ? $data_source->db_name : '%'; my $column_sth = $data_source->get_column_details_from_data_dictionary($db_name, $data_source->owner, $table_name, '%'); unless ($column_sth) { $self->error_message("Error getting column data for table $table_name in data source $data_source."); return; } my $all_column_data = $column_sth->fetchall_arrayref({}); unless (@$all_column_data) { $self->error_message("No column data for table $table_name in data source $data_source_id"); return; } my %columns_to_delete = map {$_->column_name, $_} UR::DataSource::RDBMS::TableColumn->get(table_name => $table_name, data_source => $data_source_id); for my $column_data (@$all_column_data) { #my $id = $table_name . '.' . $column_data->{COLUMN_NAME} $column_data->{'COLUMN_NAME'} =~ s/"|'//g; # Postgres puts quotes around things that look like keywords $column_data->{'COLUMN_NAME'} = uc($column_data->{'COLUMN_NAME'}); delete $columns_to_delete{$column_data->{'COLUMN_NAME'}}; my $column_obj = UR::DataSource::RDBMS::TableColumn->get(table_name => $table_name, data_source => $data_source_id, column_name => $column_data->{'COLUMN_NAME'}); if ($column_obj) { # Already exists, change the attributes $column_obj->owner($table_object->{owner}); $column_obj->data_source($table_object->{data_source}); $column_obj->data_type($column_data->{TYPE_NAME}); $column_obj->nullable(substr($column_data->{IS_NULLABLE}, 0, 1)); $column_obj->data_length($column_data->{COLUMN_SIZE}); $column_obj->remarks($column_data->{REMARKS}); $column_obj->last_object_revision($revision_time) if ($column_obj->__changes__()); } else { # It's new, create it from scratch $column_obj = UR::DataSource::RDBMS::TableColumn->create( column_name => $column_data->{COLUMN_NAME}, table_name => $table_object->{table_name}, owner => $table_object->{owner}, data_source => $table_object->{data_source}, data_type => $column_data->{TYPE_NAME}, nullable => substr($column_data->{IS_NULLABLE}, 0, 1), data_length => $column_data->{COLUMN_SIZE}, remarks => $column_data->{REMARKS}, last_object_revision => $revision_time, ); } unless ($column_obj) { Carp::confess("Failed to create a column ".$column_data->{'COLUMN_NAME'}." for table $table_name"); } push @column_objects, $column_obj; } for my $to_delete (values %columns_to_delete) { $self->status_message("Detected column " . $to_delete->column_name . " has gone away."); $to_delete->delete; } my $bitmap_data = $data_source->get_bitmap_index_details_from_data_dictionary($table_name); for my $index (@$bitmap_data) { #push @{ $embed{bitmap_index_names}{$table_object} }, $index->{'index_name'}; my $column_object = UR::DataSource::RDBMS::TableColumn->is_loaded( table_name => uc($index->{'table_name'}), data_source => $data_source_id, column_name => uc($index->{'column_name'}), ); } # Make a note of what FKs exist in the Meta DB involving this table my @fks_in_meta_db = UR::DataSource::RDBMS::FkConstraint->get(data_source => $data_source_id, table_name => $table_name); push @fks_in_meta_db, UR::DataSource::RDBMS::FkConstraint->get(data_source => $data_source_id, r_table_name => $table_name); my %fks_in_meta_db_by_fingerprint; foreach my $fk ( @fks_in_meta_db ) { my $fingerprint = $self->_make_foreign_key_fingerprint($fk); $fks_in_meta_db_by_fingerprint{$fingerprint} = $fk; } # constraints on this table against columns in other tables my $db_owner = $data_source->owner; my $fk_sth = $data_source->get_foreign_key_details_from_data_dictionary('', $db_owner, $table_name, '', '', ''); my %fk; # hold the fk constraints that this # invocation of foreign_key_info created my @constraints; my %fks_in_real_db; if ($fk_sth) { while (my $data = $fk_sth->fetchrow_hashref()) { #push @$ref_fks, [@$data{qw(FK_NAME FK_TABLE_NAME)}]; foreach ( qw( FK_TABLE_NAME UK_TABLE_NAME FK_NAME FK_COLUMN_NAME UK_COLUMN_NAME ) ) { $data->{$_} = uc($data->{$_}); } my $fk = UR::DataSource::RDBMS::FkConstraint->get(table_name => $data->{'FK_TABLE_NAME'}, data_source => $data_source_id, fk_constraint_name => $data->{'FK_NAME'}, r_table_name => $data->{'UK_TABLE_NAME'}, ); unless ($fk) { # Postgres puts quotes around things that look like keywords foreach ( $data->{'FK_TABLE_NAME'}, $data->{'UK_TABLE_NAME'}, $data->{'UK_TABLE_NAME'}, $data->{'UK_COLUMN_NAME'}) { s/"|'//g; } $fk = UR::DataSource::RDBMS::FkConstraint->create( fk_constraint_name => $data->{'FK_NAME'}, table_name => $data->{'FK_TABLE_NAME'}, r_table_name => $data->{'UK_TABLE_NAME'}, owner => $table_object->{owner}, r_owner => $table_object->{owner}, data_source => $table_object->{data_source}, last_object_revision => $revision_time, ); $fk{$fk->id} = $fk; } if ($fk{$fk->id}) { my $fkcol = UR::DataSource::RDBMS::FkConstraintColumn->get_or_create( fk_constraint_name => $data->{'FK_NAME'}, table_name => $data->{'FK_TABLE_NAME'}, column_name => $data->{'FK_COLUMN_NAME'}, r_table_name => $data->{'UK_TABLE_NAME'}, r_column_name => $data->{'UK_COLUMN_NAME'}, owner => $table_object->{owner}, data_source => $table_object->{data_source}, ); } my $fingerprint = $self->_make_foreign_key_fingerprint($fk); $fks_in_real_db{$fingerprint} = $fk; push @constraints, $fk; } } # get foreign_key_info the other way # constraints on other tables against columns in this table my $fk_reverse_sth = $data_source->get_foreign_key_details_from_data_dictionary('', '', '', '', $db_owner, $table_name); %fk = (); # resetting this prevents data_source referencing # tables from fouling up their fk objects if ($fk_reverse_sth) { while (my $data = $fk_reverse_sth->fetchrow_hashref()) { foreach ( qw( FK_TABLE_NAME UK_TABLE_NAME FK_NAME FK_COLUMN_NAME UK_COLUMN_NAME ) ) { $data->{$_} = uc($data->{$_}); } my $fk = UR::DataSource::RDBMS::FkConstraint->get(fk_constraint_name => $data->{'FK_NAME'}, table_name => $data->{'FK_TABLE_NAME'}, r_table_name => $data->{'UK_TABLE_NAME'}, data_source => $table_object->{'data_source'}, ); unless ($fk) { # Postgres puts quotes around things that look like keywords foreach ( $data->{'FK_TABLE_NAME'}, $data->{'UK_TABLE_NAME'}, $data->{'UK_TABLE_NAME'}, $data->{'UK_COLUMN_NAME'}) { s/"|'//g; } $fk = UR::DataSource::RDBMS::FkConstraint->create( fk_constraint_name => $data->{'FK_NAME'}, table_name => $data->{'FK_TABLE_NAME'}, r_table_name => $data->{'UK_TABLE_NAME'}, owner => $table_object->{owner}, r_owner => $table_object->{owner}, data_source => $table_object->{data_source}, last_object_revision => $revision_time, ); unless ($fk) { #$DB::single=1; 1; } $fk{$fk->fk_constraint_name} = $fk; } if ($fk{$fk->fk_constraint_name}) { UR::DataSource::RDBMS::FkConstraintColumn->get_or_create( fk_constraint_name => $data->{'FK_NAME'}, table_name => $data->{'FK_TABLE_NAME'}, column_name => $data->{'FK_COLUMN_NAME'}, r_table_name => $data->{'UK_TABLE_NAME'}, r_column_name => $data->{'UK_COLUMN_NAME'}, owner => $table_object->{owner}, data_source => $table_object->{data_source}, ); } my $fingerprint = $self->_make_foreign_key_fingerprint($fk); $fks_in_real_db{$fingerprint} = $fk; push @constraints, $fk; } } # Find FKs still in the Meta db that don't exist in the real database anymore foreach my $fingerprint ( keys %fks_in_meta_db_by_fingerprint ) { unless ($fks_in_real_db{$fingerprint}) { my $fk = $fks_in_meta_db_by_fingerprint{$fingerprint}; my @fk_cols = $fk->get_related_column_objects(); $_->delete foreach @fk_cols; $fk->delete; } } # get primary_key_info my $pk_sth = $data_source->get_primary_key_details_from_data_dictionary(undef, $db_owner, $table_name); if ($pk_sth) { my @new_pk; while (my $data = $pk_sth->fetchrow_hashref()) { $data->{'COLUMN_NAME'} =~ s/"|'//g; # Postgres puts quotes around things that look like keywords my $pk = UR::DataSource::RDBMS::PkConstraintColumn->get( table_name => $table_name, data_source => $data_source_id, column_name => $data->{'COLUMN_NAME'}, ); if ($pk) { # Since the rank/order is pretty much all that might change, we # just delete and re-create these. # It's a no-op at save time if there are no changes. $pk->delete; } push @new_pk, [ table_name => $table_name, data_source => $data_source_id, owner => $data_source->owner, column_name => $data->{'COLUMN_NAME'}, rank => $data->{'KEY_SEQ'} || $data->{'ORDINAL_POSITION'}, ]; # $table_object->{primary_key_constraint_name} = $data->{PK_NAME}; # $embed{primary_key_constraint_column_names} ||= {}; # $embed{primary_key_constraint_column_names}{$table_object} ||= []; # push @{ $embed{primary_key_constraint_column_names}{$table_object} }, $data->{COLUMN_NAME}; } for my $data (@new_pk) { my $pk = UR::DataSource::RDBMS::PkConstraintColumn->create(@$data); unless ($pk) { $self->error_message("Failed to create primary key @$data"); return; } } } ## Get the unique constraints ## Unfortunately, there appears to be no DBI catalog ## method which will find these. So we have to use ## some custom SQL # # The SQL that used to live here was moved to the UR::DataSource::Oracle # and each other DataSource class needs its own implementation # The above was moved into each data source's class if (my $uc = $data_source->get_unique_index_details_from_data_dictionary($table_name)) { my %uc = %$uc; # check for redundant unique constraints # there may be both an index and a constraint for my $uc_name_1 ( keys %uc ) { my $uc_columns_1 = $uc{$uc_name_1} or next; my $uc_columns_1_serial = join ',', sort @$uc_columns_1; for my $uc_name_2 ( keys %uc ) { next if ( $uc_name_2 eq $uc_name_1 ); my $uc_columns_2 = $uc{$uc_name_2} or next; my $uc_columns_2_serial = join ',', sort @$uc_columns_2; if ( $uc_columns_2_serial eq $uc_columns_1_serial ) { delete $uc{$uc_name_1}; } } } # compare primary key constraints to unique constraints my $pk_columns_serial = join(',', sort map { $_->column_name } UR::DataSource::RDBMS::PkConstraintColumn->get(data_source => $data_source_id, table_name => $table_name, owner => $data_source->owner, )); for my $uc_name ( keys %uc ) { # see if primary key constraint has the same name as # any unique constraints # FIXME - disabling this for now, the Meta DB dosen't track PK constraint names # Isn't it just as goot to check the involved columns? #if ( $table_object->primary_key_constraint_name eq $uc_name ) { # delete $uc{$uc_name}; # next; #} # see if any unique constraints cover the exact same column(s) as # the primary key column(s) my $uc_columns_serial = join ',', sort @{ $uc{$uc_name} }; if ( $pk_columns_serial eq $uc_columns_serial ) { delete $uc{$uc_name}; } } # Create new UniqueConstraintColumn objects for the columns that don't exist, and delete the # objects if they don't apply anymore foreach my $uc_name ( keys %uc ) { my %constraint_objs = map { $_->column_name => $_ } UR::DataSource::RDBMS::UniqueConstraintColumn->get( data_source => $data_source_id, table_name => $table_name, owner => $data_source->owner || '', constraint_name => $uc_name, ); foreach my $col_name ( @{$uc{$uc_name}} ) { if ($constraint_objs{$col_name} ) { delete $constraint_objs{$col_name}; } else { my $uc = UR::DataSource::RDBMS::UniqueConstraintColumn->create( data_source => $data_source_id, table_name => $table_name, owner => $data_source->owner, constraint_name => $uc_name, column_name => $col_name, ); 1; } } foreach my $obj ( values %constraint_objs ) { $obj->delete(); } } } # Now that all columns know their foreign key constraints, # have the column objects resolve the various names # associated with the column. #for my $col (@column_objects) { $col->resolve_names } # Determine the ER type. # We have 'validation item', 'entity', and 'bridge' my $column_count = scalar($table_object->column_names) || 0; my $pk_column_count = scalar($table_object->primary_key_constraint_column_names) || 0; my $constraint_count = scalar($table_object->fk_constraint_names) || 0; if ($column_count == 1 and $pk_column_count == 1) { $table_object->er_type('validation item'); } else { if ($constraint_count == $column_count) { $table_object->er_type('bridge'); } else { $table_object->er_type('entity'); } } return $table_object; } sub _make_foreign_key_fingerprint { my($self,$fk) = @_; my @fk_cols = sort {$a->column_name cmp $b->column_name} $fk->get_related_column_objects(); my $fingerprint = join(':', $fk->table_name, $fk->r_table_name, map { $_->column_name, $_->r_column_name } @fk_cols); return $fingerprint; } # Derived classes should define a method to return a ref to an array of hash refs # describing all the bitmap indicies in the DB. Each hash ref should contain # these keys: table_name, column_name, index_name # If the DB dosen't support bitmap indicies, it should return an empty listref # This is used by the part that writes class defs based on the DB schema, and # possibly by sync_database() # Implemented methods should take one optional argument: a table name # # FIXME The API for bitmap_index and unique_index methods here aren't the same as # the other data_dictionary methods. These tqo return hashrefs of massaged # data while the others return DBI statement handles. sub get_bitmap_index_details_from_data_dictionary { my $class = shift; Carp::confess("Class $class didn't define its own bitmap_index_info() method"); } # Derived classes should define a method to return a ref to a hash keyed by constraint # names. Each value holds a listref of hashrefs containing these keys: # CONSTRAINT_NAME and COLUMN_NAME sub get_unique_index_details_from_data_dictionary { my $class = shift; Carp::confess("Class $class didn't define its own unique_index_info() method"); } sub autogenerate_new_object_id_for_class_name_and_rule { # The sequences in the database are named by a naming convention which allows us to connect them to the table # whose surrogate keys they fill. Look up the sequence and get a unique value from it for the object. # If and when we save, we should not get any integrity constraint violation errors. my $self = shift; my $class_name = shift; my $rule = shift; # Not used for the moment... if ($self->use_dummy_autogenerated_ids) { return $self->next_dummy_autogenerated_id; } my $class = UR::Object::Type->get(class_name => $class_name); my $table_name; unless ($table_name) { for my $parent_class_name ($class_name, $class_name->inheritance) { # print "checking $parent_class_name (for $class_name)\n"; my $parent_class = UR::Object::Type->get(class_name => $parent_class_name); # print "object $parent_class\n"; next unless $parent_class; if ($table_name = $parent_class->table_name) { # print "found table $table_name\n"; last; } } } my $table = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $self->id); unless ($table_name && $table) { Carp::confess("Failed to find a table for class $class_name!"); } my @keys = $table->primary_key_constraint_column_names; my $key; if (@keys > 1) { Carp::confess("Tables with multiple primary keys (i.e." . $table->table_name . " @keys) cannot have a surrogate key created from a sequence."); } elsif (@keys == 0) { Carp::confess("No primary keys found for table " . $table->table_name . "\n"); } else { $key = $keys[0]; } # FIXME Each class should have a way to override what sequence generator to use my $sequence; unless ($sequence = $class->id_sequence_generator_name) { $sequence = $self->_get_sequence_name_for_table_and_column($table_name, $key); } my $new_id = $self->_get_next_value_from_sequence($sequence); return $new_id; } sub _get_sequence_name_for_table_and_column { my($self,$table_name,$column_name) = @_; # The default is to take the column name (should be a primary key from a table) and # change the _ID at the end of the column name with _SEQ $column_name =~ s/_ID/_SEQ/; return $column_name; } sub create_iterator_closure_for_rule { my ($self, $rule) = @_; my ($rule_template, @values) = $rule->template_and_values(); my $template_data = $self->_get_template_data_for_loading($rule_template); # # the template has general class data # my $class_name = $template_data->{class_name}; my $class = $class_name; my @lob_column_names = @{ $template_data->{lob_column_names} }; my @lob_column_positions = @{ $template_data->{lob_column_positions} }; my $query_config = $template_data->{query_config}; my $post_process_results_callback = $template_data->{post_process_results_callback}; # # the template has explicit template data # my $select_clause = $template_data->{select_clause}; my $select_hint = $template_data->{select_hint}; my $from_clause = $template_data->{from_clause}; my $where_clause = $template_data->{where_clause}; my $connect_by_clause = $template_data->{connect_by_clause}; my $group_by_clause = $template_data->{group_by_clause}; my $order_by_clause = $template_data->{order_by_clause}; my $sql_params = $template_data->{sql_params}; my $filter_specs = $template_data->{filter_specs}; my @property_names_in_resultset_order = @{ $template_data->{property_names_in_resultset_order} }; # TODO: we get 90% of the way to a full where clause in the template, but # actually have to build it here since ther is no way to say "in (?)" and pass an arrayref :( # It _is_ possible, however, to process all of the filter specs with a constant number of params. # This would optimize the common case. my @all_sql_params = @$sql_params; for my $filter_spec (@$filter_specs) { my ($expr_sql, $operator, $value_position) = @$filter_spec; my $value = $values[$value_position]; my ($more_sql, @more_params) = $self->_extend_sql_for_column_operator_and_value($expr_sql, $operator, $value); $where_clause .= ($where_clause ? "\nand " : ($connect_by_clause ? "start with " : "where ")); if ($more_sql) { $where_clause .= $more_sql; push @all_sql_params, @more_params; } else { # error return; } } # The full SQL statement for the template, besides the filter logic, is built here. my $sql = "\nselect "; if ($select_hint) { $sql .= $select_hint . " "; } $sql .= $select_clause; $sql .= "\nfrom $from_clause"; $sql .= "\n$where_clause" if defined($where_clause) and length($where_clause); $sql .= "\n$connect_by_clause" if $connect_by_clause; $sql .= "\n$group_by_clause" if $group_by_clause; $sql .= "\n$order_by_clause"; my $dbh = $self->get_default_dbh; my $sth = $dbh->prepare($sql,$query_config); unless ($sth) { $class->error_message("Failed to prepare SQL $sql\n" . $dbh->errstr . "\n"); Carp::confess($class->error_message); } unless ($sth->execute(@all_sql_params)) { $class->error_message("Failed to execute SQL $sql\n" . $sth->errstr . "\n" . Data::Dumper::Dumper(\@$sql_params) . "\n"); Carp::confess($class->error_message); } die unless $sth; $self->__signal_change__('query',$sql); # buffers for the iterator my $next_db_row; my $pending_db_object_data; my $ur_test_filldb = $ENV{'UR_TEST_FILLDB'}; my $iterator = sub { unless ($sth) { #$DB::single = 1; return; } $next_db_row = $sth->fetchrow_arrayref; #$self->__signal_change__('fetch',$next_db_row); # FIXME: commented out because it may make fetches too slow unless ($next_db_row) { $sth->finish; $sth = undef; return; } # this handles things lik BLOBS, which have a special interface to get the 'real' data if ($post_process_results_callback) { $next_db_row = $post_process_results_callback->($next_db_row); } # this is used for automated re-testing against a private database $self->_CopyToAlternateDB($class,$dbh,$next_db_row) if $ur_test_filldb; return $next_db_row; }; # end of iterator closure return $iterator; } # This allows the size of an autogenerated IN-clause to be adjusted. # The limit for Oracle is 1000, and a bug requires that, in some cases # we drop to 250. my $in_clause_size_limit = 250; # This method is used when generating SQL for a rule template, in the joins # and also on a per-query basis to turn specific values into a where clause sub _extend_sql_for_column_operator_and_value { my ($self, $expr_sql, $op, $val, $escape) = @_; $op ||= ''; if ($op eq '[]' and not ref($val) eq 'ARRAY') { #$DB::single = 1; $val = []; } my $sql; my @sql_params; if ($op eq '' or $op eq '=' or $op eq 'eq') { $sql .= $expr_sql; if ($self->_value_is_null($val)) { $sql = "$expr_sql is NULL"; } else { $sql = "$expr_sql = ?"; push @sql_params, $val; } } elsif ($op eq '[]' or $op =~ /in/i) { no warnings 'uninitialized'; unless (@$val) { # an empty list was passed-in. # since "in ()", like "where 1=0", is self-contradictory, # there is no data to return, and no SQL required Carp::carp("Null in-clause passed to default_load_sql"); return; } my @list = sort @$val; my $has_null = ( (grep { length($_) == 0 } @list) ? 1 : 0); my $wrap = ($has_null or @$val > $in_clause_size_limit ? 1 : 0); my $cnt = 0; $sql .= "\n(\n " if $wrap; while (my @set = splice(@list,0,$in_clause_size_limit)) { $sql .= "\n or " if $cnt++; $sql .= $expr_sql; $sql .= " in (" . join(",",map { "'$_'" } @set) . ")"; } if ($has_null) { $sql .= "\n or $expr_sql is null" } $sql .= "\n)\n" if $wrap; } elsif($op =~ /^(like|not like|in|not in|\<\>|\<|\>|\=|\<\=|\>\=)$/i ) { # SQL operator. Use this directly. $sql .= "$expr_sql $op ?"; push @sql_params, $val; if($op =~ /like/i) { $escape ||= '\\'; $sql .= " escape '" . $escape . "'"; } } elsif($op =~ /^(ne|\!\=)$/i) { # Perlish inequality. Special SQL to handle this. if (not defined($val)) { # ne undef =~ is not null $sql .= "$expr_sql is not null"; pop @sql_params; } elsif ($op =~ /^(ne|\!\=)$/i) { # ne $v =~ should match everything but $v, including nulls # != is the same, and will rely on is_loaded to # filter out any cases where "hello" != "goodbye" returns # but Perl wants to exclude the value because they match numerically. $sql .= "( $expr_sql != ?" . " or $expr_sql is null)"; push @sql_params, $val; } } elsif ($op eq "between") { $sql .= "$expr_sql $op ? and ?"; push @sql_params, @$val; } elsif ($op eq 'true' ) { $sql .= "( $expr_sql is not null and $expr_sql != 0 )"; } elsif ($op eq 'false' ) { $sql .= "( $expr_sql is null or $expr_sql = 0)"; } else { # Something else? die "Unkown operator $op!"; } if (@sql_params > 256) { Carp::confess("A bug in Oracle causes queries using > 256 placeholders to return incorrect results."); } return ($sql, @sql_params) } sub _value_is_null { # this is a separate method since some databases, like Oracle, treat empty strings as null values my ($self, $value) = @_; return 1 if not defined $value; return if not ref($value); if (ref($value) eq 'HASH') { if ($value->{operator} eq '=' or $value->{operator} eq 'eq') { if (not defined $value->{value}) { return 1; } else { return; } } } return; } sub _resolve_ids_from_class_name_and_sql { my $self = shift; my $class_name = shift; my $sql = shift; my $query; my @params; if (ref($sql) eq "ARRAY") { ($query, @params) = @{$sql}; } else { $query = $sql; } my $class_meta = $class_name->__meta__; my @id_columns = map { $class_meta->property_meta_for_name($_)->column_name } $class_meta->id_property_names; # query for the ids my $dbh = $self->get_default_dbh(); my $sth = $dbh->prepare($query); unless ($sth) { confess("could not prepare query $query"); } $sth->execute(@params); my $data; my @id_fetch_set; while ($data = $sth->fetchrow_hashref()) { #ensure everything is uppercased. this is totally a hack right now but it makes sql queries work again. foreach my $key (keys %$data) { $data->{uc($key)} = delete $data->{$key}; } my @id_vals = map {$data->{uc($_)}} @id_columns; my $cid = $class_name->__meta__->resolve_composite_id_from_ordered_values(@id_vals); push @id_fetch_set, $cid; } return @id_fetch_set; } sub _sync_database { my $self = shift; my %params = @_; unless (ref($self)) { if ($self->isa("UR::Singleton")) { $self = $self->_singleton_object; } else { die "Called as a class-method on a non-singleton datasource!"; } } my $changed_objects = delete $params{changed_objects}; my %objects_by_class_name; for my $obj (@$changed_objects) { my $class_name = ref($obj); $objects_by_class_name{$class_name} ||= []; push @{ $objects_by_class_name{$class_name} }, $obj; } my $dbh = $self->get_default_dbh; # # Determine what commands need to be executed on the database # to sync those changes, and categorize them by type and table. # # As we iterate through changes, keep track of all of the involved tables. my %all_tables; # $all_tables{$table_name} = $number_of_commands; # Make a hash for each type of command keyed by table name. my %insert; # $insert{$table_name} = [ $change1, $change2, ...]; my %update; # $update{$table_name} = [ $change1, $change2, ...]; my %delete; # $delete{$table_name} = [ $change1, $change2, ...]; # Make a master hash referencing each of the above. # $explicit_commands_by_type_and_table{'insert'}{$table} = [ $change1, $change2 ...] my %explicit_commands_by_type_and_table = ( 'insert' => \%insert, 'update' => \%update, 'delete' => \%delete ); # Build the above data structures. { no warnings; for my $class_name (sort keys %objects_by_class_name) { for my $obj (@{ $objects_by_class_name{$class_name} }) { my @commands = $self->_default_save_sql_for_object($obj); next unless @commands; for my $change (@commands) { #$commands{$change} = $change; # Example change: # { type => 'update', table_name => $table_name, # column_names => \@changed_cols, sql => $sql, # params => \@values, class => $table_class, id => $id }; # There are often multiple changes per object, espeically # when the object is spread across multiple tables because of # inheritance. We classify each change by the table and # the class immediately associated with the table, even if # the class in an abstract parent class on the object. my $table_name = $change->{table_name}; my $id = $change->{id}; $all_tables{$table_name}++; my $table = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $self->class) || UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => 'UR::DataSource::Meta'); if ($change->{type} eq 'insert') { push @{ $insert{$change->{table_name}} }, $change; } elsif ($change->{type} eq 'update') { push @{ $update{$change->{table_name}} }, $change; } elsif ($change->{type} eq 'delete') { push @{ $delete{$change->{table_name}} }, $change; } else { print "UNKNOWN COMMAND TYPE $change->{type} $change->{sql}\n"; } } } } } # Determine which tables require a lock; my %tables_requiring_lock; for my $table_name (keys %all_tables) { my $table_object = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $self->class) || UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => 'UR::DataSource::Meta'); unless ($table_object) { warn "looking up schema for RDBMS table $table_name...\n"; $table_object = $self->refresh_database_metadata_for_table_name($table_name); unless ($table_object) { die "Failed to generate table data for $table_name!"; } } if (my @bitmap_index_names = $table_object->bitmap_index_names) { my $changes; if ($changes = $insert{$table_name} or $changes = $delete{$table_name}) { $tables_requiring_lock{$table_name} = 1; } elsif (not $tables_requiring_lock{$table_name}) { $changes = $update{$table_name}; my @column_names = sort map { @{ $_->{column_names} } } @$changes; my $last_column_name = ""; for my $column_name (@column_names) { next if $column_name eq $last_column_name; my $column_obj = UR::DataSource::RDBMS::TableColumn->get( data_source => $table_object->data_source, table_name => $table_name, column_name => $column_name, ); if ($column_obj->bitmap_index_names) { $tables_requiring_lock{$table_name} = 1; last; } $last_column_name = $column_name; } } } } # # Make a mapping of prerequisites for each command, # and a reverse mapping of dependants for each command. # my %all_table_commands; my %prerequisites; my %dependants; for my $table_name (keys %all_tables) { my $table = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $self->class) || UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => 'UR::DataSource::Meta'); my @fk = $table->fk_constraints; if ($insert{$table_name}) { $all_table_commands{"insert $table_name"} = 1; } if ($update{$table_name}) { $all_table_commands{"update $table_name"} = 1; } if ($delete{$table_name}) { $all_table_commands{"delete $table_name"} = 1; } # Go through the constraints. my $tmparray; for my $fk (@fk) { my $r_table_name = $fk->r_table_name; my $r_table = UR::DataSource::RDBMS::Table->get(table_name => $r_table_name, data_source => $self->class) || UR::DataSource::RDBMS::Table->get(table_name => $r_table_name, data_source => 'UR::DataSource::Meta'); # RULES: # insert r_table_name before insert table_name # insert r_table_name before update table_name # delete table_name before delete r_table_name # update table_name before delete r_table_name if ($insert{$table_name} and $insert{$r_table_name}) { $tmparray = $prerequisites{"insert $table_name"}{"insert $r_table_name"} ||= []; push @$tmparray, $fk; $tmparray = $dependants{"insert $r_table_name"}{"insert $table_name"} ||= []; push @$tmparray, $fk; } if ($update{$table_name} and $insert{$r_table_name}) { $tmparray = $prerequisites{"update $table_name"}{"insert $r_table_name"} ||= []; push @$tmparray, $fk; $tmparray = $dependants{"insert $r_table_name"}{"update $table_name"} ||= []; push @$tmparray, $fk; } if ($delete{$r_table_name} and $delete{$table_name}) { $tmparray = $prerequisites{"delete $r_table_name"}{"delete $table_name"} ||= []; push @$tmparray, $fk; $tmparray = $dependants{"delete $table_name"}{"delete $r_table_name"} ||= []; push @$tmparray, $fk; } if ($delete{$r_table_name} and $update{$table_name}) { $tmparray = $prerequisites{"delete $r_table_name"}{"update $table_name"} ||= []; push @$tmparray, $fk; $tmparray = $dependants{"update $table_name"}{"delete $r_table_name"} ||= []; push @$tmparray, $fk; } } } # # Use the above mapping to build an ordered list of general commands. # Note that the general command is something like "insert EMPLOYEES", # while the explicit command is an exact insert statement with params. # my @general_commands_in_order; my %self_referencing_table_commands; my %all_unresolved = %all_table_commands; my $unresolved_count; my $last_unresolved_count = 0; my @ready_to_add = (); while ($unresolved_count = scalar(keys(%all_unresolved))) { if ($unresolved_count == $last_unresolved_count) { # We accomplished nothing on the last iteration. # We are in an infinite loop unless something is done. # Rather than die with an error, issue a warning and attempt to # brute-force the sync. # Process something with minimal deps as a work-around. my @ordered_by_least_number_of_prerequisites = sort{ scalar(keys(%{$prerequisites{$a}})) <=> scalar(keys(%{$prerequisites{$b}})) } grep { $prerequisites{$_} } keys %all_unresolved; @ready_to_add = ($ordered_by_least_number_of_prerequisites[0]); warn "Circular dependency! Pushing @ready_to_add to brute-force the save.\n"; #print STDERR Data::Dumper::Dumper(\%objects_by_class_name, \%prerequisites, \%dependants ) . "\n"; } else { # This is the normal case. It is either the first iteration, # or we are on additional iterations with some progress made # in the last iteration. # Find commands which have no unresolved prerequisites. @ready_to_add = grep { not $prerequisites{$_} } keys %all_unresolved; # If there are none of the above, find commands # with only self-referencing prerequisites. unless (@ready_to_add) { # Find commands with only circular dependancies. @ready_to_add = # The circular prerequisite must be the only prerequisite on the table. grep { scalar(keys(%{$prerequisites{$_}})) == 1 } # The prerequisite must be the same as the the table itself. grep { $prerequisites{$_}{$_} } # There must be prerequisites for the given table, grep { $prerequisites{$_} } # Look at all of the unresolved table commands. keys %all_unresolved; # Note this for below. # It records the $fk object which is circular. for my $table_command (@ready_to_add) { $self_referencing_table_commands{$table_command} = $prerequisites{$table_command}{$table_command}; } } } # Record our current unresolved count for comparison on the next iteration. $last_unresolved_count = $unresolved_count; for my $db_command (@ready_to_add) { # Put it in the list. push @general_commands_in_order, $db_command; # Delete it from the main hash of command/table pairs # for which dependencies are not resolved. delete $all_unresolved{$db_command}; # Find anything which depended on this command occurring first # and remove this command from that command's prerequisite list. for my $dependant (keys %{ $dependants{$db_command} }) { # Tell it to take us out of its list of prerequisites. delete $prerequisites{$dependant}{$db_command} if $prerequisites{$dependant}; # Get rid of the prereq entry if it is empty; delete $prerequisites{$dependant} if (keys(%{ $prerequisites{$dependant} }) == 0); } # Note that nothing depends on this command any more since it has been queued. delete $dependants{$db_command}; } } # Go through the ordered list of general commands (ie "insert TABLE_NAME") # and build the list of explicit commands. my @explicit_commands_in_order; for my $general_command (@general_commands_in_order) { my ($dml_type,$table_name) = split(/\s+/,$general_command); if (my $circular_fk_list = $self_referencing_table_commands{$general_command}) { # A circular foreign key requires that the # items be inserted in a specific order. my (@rcol_sets) = map { [ $_->column_names ] } @$circular_fk_list; # Get the IDs and objects which need to be saved. my @cmds = @{ $explicit_commands_by_type_and_table{$dml_type}{$table_name} }; my @ids = map { $_->{id} } @cmds; # my @objs = $cmds[0]->{class}->is_loaded(\@ids); my $is_loaded_class = ($dml_type eq 'delete') ? $cmds[0]->{class}->ghost_class : $cmds[0]->{class}; my @objs = $is_loaded_class->is_loaded(\@ids); my %objs = map { $_->id => $_ } @objs; # Produce the explicit command list in dep order. my %unsorted_cmds = map { $_->{id} => $_ } @cmds; my $add; my @local_explicit_commands; my %adding; $add = sub { my ($cmd) = @_; if ($adding{$cmd}) { #$DB::single = 1; Carp::confess("Circular foreign key!") unless $main::skip_croak; } $adding{$cmd} = 1; my $obj = $objs{$cmd->{id}}; for my $rcol_set (@rcol_sets) { my $pid = $obj->class->__meta__->resolve_composite_id_from_ordered_values(map { $obj->$_ } @$rcol_set); if (defined $pid) { # This recursive foreign key dep may have been optional my $pcmd = delete $unsorted_cmds{$pid}; $add->($pcmd) if $pcmd; } } delete $adding{$cmd}; push @local_explicit_commands, $cmd; }; for my $cmd (@cmds) { next unless $unsorted_cmds{$cmd->{id}}; $add->(delete $unsorted_cmds{$cmd->{id}}); } if ($dml_type eq 'delete') { @local_explicit_commands = reverse @local_explicit_commands; } push @explicit_commands_in_order, @local_explicit_commands; } else { # Order is irrelevant on non-self-referencing tables. push @explicit_commands_in_order, @{ $explicit_commands_by_type_and_table{$dml_type}{$table_name} }; } } my %table_objects_by_class_name; my %column_objects_by_class_and_column_name; # Make statement handles. my %sth; for my $cmd (@explicit_commands_in_order) { my $sql = $cmd->{sql}; unless ($sth{$sql}) { my $class_name = $cmd->{class}; # get the db handle to use for this class my $dbh = $cmd->{'dbh'}; #$class_name->dbh; my $sth = $dbh->prepare($sql); $sth{$sql} = $sth; if ($dbh->errstr) { $self->error_message("Error preparing SQL:\n$sql\n" . $dbh->errstr . "\n"); return; } my $tables = $table_objects_by_class_name{$class_name}; my $class_object = $class_name->__meta__; unless ($tables) { my $tables; my @all_table_names = $class_object->all_table_names; for my $table_name (@all_table_names) { my $table = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $self->class) || UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => 'UR::DataSource::Meta'); push @$tables, $table; $column_objects_by_class_and_column_name{$class_name} ||= {}; my $columns = $column_objects_by_class_and_column_name{$class_name}; unless (%$columns) { for my $column ($table->columns) { $columns->{$column->column_name} = $column; } } } $table_objects_by_class_name{$class_name} = $tables; } my @column_objects = map { my $column = $column_objects_by_class_and_column_name{$class_name}{$_}; unless ($column) { print "looking at parent classes for $class_name\n"; for my $ancestor_class_name ($class_object->ancestry_class_names) { $column = $column_objects_by_class_and_column_name{$ancestor_class_name}{$_}; if ($column) { $column_objects_by_class_and_column_name{$class_name}{$_} = $column; last; } } unless ($column) { #$DB::single = 1; die "Failed to find a column object column $_ for class $class_name"; } } $column; } @{ $cmd->{column_names} }; # print "Column Types: @column_types\n"; for my $n (0 .. $#column_objects) { if ($column_objects[$n]->data_type eq 'BLOB') { $sth->bind_param($n+1, undef, { ora_type => 113, ora_field => $column_objects[$n]->column_name }); } } } } # Set a savepoint if possible. my $savepoint; if ($self->can_savepoint) { $savepoint = $self->_last_savepoint; if ($savepoint) { $savepoint++; } else { $savepoint=1; } my $sp_name = "sp".$savepoint; unless ($self->set_savepoint($sp_name)) { $self->error_message("Failed to set a savepoint on " . $self->class . ": " . $dbh->errstr ); return; } $self->_last_savepoint($savepoint); } else { # FIXME SQLite dosen't support savepoints, but autocommit is already off so this dies?! #$dbh->begin_work; } # Do any explicit table locking necessary. if (my @tables_requiring_lock = sort keys %tables_requiring_lock) { $self->debug_message("Locking tables: @tables_requiring_lock."); my $max_failed_attempts = 10; for my $table_name (@tables_requiring_lock) { my $table = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $self->class) || UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => 'UR::DataSource::Meta'); my $dbh = $table->dbh; my $sth = $dbh->prepare("lock table $table_name in exclusive mode"); my $failed_attempts = 0; my @err; for (1) { unless ($sth->execute) { $failed_attempts++; $self->warning_message( "Failed to lock $table_name (attempt # $failed_attempts): " . $sth->errstr ); push @err, $sth->errstr; unless ($failed_attempts >= $max_failed_attempts) { redo; } } } if ($failed_attempts > 1) { my $err = join("\n",@err); #$UR::Context::current->send_email( # To => 'example@example.edu', # From => UR::Context::Process->prog_name . ' <example@example.edu>', # Subject => ( # $failed_attempts >= $max_failed_attempts # ? "sync_database lock failure after $failed_attempts attempts" # : "sync_database lock success after $failed_attempts attempts" # ) # . " in " . UR::Context::Process->prog_name # . " on $table_name", # Message => qq/ # $failed_attempts attempts to lock table $table_name # # Errors: # $err # # The complete table lock list for this sync: # @tables_requiring_lock # / #); if ($failed_attempts >= $max_failed_attempts) { $self->error_message( "Could not obtain an exclusive table lock on table " . $table_name . " after $failed_attempts attempts" ); $self->rollback_to_savepoint($savepoint); return; } } } } # Execute the commands in the correct order. my @failures; my $last_failure_count = 0; my @previous_failure_sets; # If there are failures, we fall-back to brute force and send # a message to support to debug the inefficiency. my $skip_fault_tolerance_check = 1; for (1) { @failures = (); for my $cmd (@explicit_commands_in_order) { unless ($sth{$cmd->{sql}}->execute(@{$cmd->{params}})) { #my $dbh = $cmd->{class}->dbh; # my $dbh = UR::Context->resolve_data_source_for_object($cmd->{class})->get_default_dbh; push @failures, {cmd => $cmd, error_message => $sth{$cmd->{sql}}->errstr}; last if $skip_fault_tolerance_check; } $sth{$cmd->{sql}}->finish(); } if (@failures) { # There have been some failures. In case the error has to do with # a failure to correctly determine dependencies in the code above, # we will retry the set of failed commands. This repeats as long # as some progress is made on each iteration. if ( (@failures == $last_failure_count) or $skip_fault_tolerance_check) { # We've tried this exact set of comands before and failed. # This is a real error. Stop retrying and report. for my $error (@failures) { $self->error_message("Error executing SQL:\n$error->{cmd}{sql}\n" . $error->{error_message} . "\n"); } last; } else { # We've failed, but we haven't retried this exact set of commands # and found the exact same failures. This is either the first failure, # or we had failures before and had success on the last brute-force # approach to sorting commands. Try again. push @previous_failure_sets, \@failures; @explicit_commands_in_order = map { $_->{cmd} } @failures; $last_failure_count = scalar(@failures); $self->warning_message("RETRYING SAVE"); redo; } } } # Rollback to savepoint if there are errors. if (@failures) { if ($savepoint eq "NONE") { # A failure on a database which does not support savepoints. # We must rollback the entire transacation. # This is only a problem for a mixed raw-sql and UR::Object environment. $dbh->rollback; } else { $self->_reverse_sync_database(); } # Return false, indicating failure. return; } unless ($self->_set_specified_objects_saved_uncommitted($changed_objects)) { Carp::confess("Error setting objects to a saved state after sync_database. Exiting."); return; } if (exists $params{'commit_on_success'} and ($params{'commit_on_success'} eq '1')) { # Commit the current transaction. # The handles will automatically update their objects to # a committed state from the one set above. # It will throw an exception on failure. $dbh->commit; } # Though we succeeded, see if we had to use the fault-tolerance code to # do so, and warn software support. This should never occur. if (@previous_failure_sets) { my $msg = "Dependency failure saving: " . Dumper(\@explicit_commands_in_order) . "\n\nThe following error sets were produced:\n" . Dumper(\@previous_failure_sets) . "\n\n" . Carp::cluck . "\n\n"; $self->warning_message($msg); $UR::Context::current->send_email( To => UR::Context::Process->support_email, Subject => 'sync_database dependency sort failure', Message => $msg ) or $self->warning_message("Failed to send error email!"); } return 1; } sub _reverse_sync_database { my $self = shift; unless ($self->can_savepoint) { # This will not respect manual DML # Developers must not use this back door on non-savepoint databases. $self->get_default_dbh->rollback; return "NONE"; } my $savepoint = $self->_last_savepoint; unless ($savepoint) { Carp::confess("No savepoint set!"); } my $sp_name = "sp".$savepoint; unless ($self->rollback_to_savepoint($sp_name)) { $self->error_message("Error removing savepoint $savepoint " . $self->get_default_dbh->errstr); return 1; } $self->_last_savepoint(undef); return $savepoint; } # Given a table object and a list of primary key values, return # a where clause to match a row. Some values may be undef (NULL) # and it properly writes "column IS NULL". As a side effect, the # @$values list is altered to remove the undef value sub _matching_where_clause { my($self,$table_obj,$values) = @_; unless ($table_obj) { Carp::confess("No table passed to _matching_where_clause for $self!"); } my @pks = $table_obj->primary_key_constraint_column_names; my @where; # in @$values, the updated data values always seem to be before the where clause # values but still in the right order, so start at the right place my $skip = scalar(@$values) - scalar(@pks); for (my($pk_idx,$values_idx) = (0,$skip); $pk_idx < @pks;) { if (defined $values->[$values_idx]) { push(@where, $pks[$pk_idx] . ' = ?'); $pk_idx++; $values_idx++; } else { push(@where, $pks[$pk_idx] . ' IS NULL'); splice(@$values, $values_idx, 1); $pk_idx++; } } return join(' and ', @where); } sub _id_values_for_primary_key { my ($self,$table_obj,$object_to_save) = @_; unless ($table_obj && $object_to_save) { Carp::confess("Both table and class object should be passed for $self!"); } my $class_obj; # = $object_to_save->__meta__; foreach my $possible_class_obj ($object_to_save->__meta__->all_class_metas) { if (lc($possible_class_obj->table_name) eq lc($table_obj->table_name)) { $class_obj = $possible_class_obj; last; } } unless (defined $class_obj) { Carp::confess("Can't find class object for this table! " . $table_obj->table_name); } my @pk_cols = $table_obj->primary_key_constraint_column_names; # this previously went to $object_to_save->__meta__, which is nearly the same thing but not quite my @values = $class_obj->resolve_ordered_values_from_composite_id($object_to_save->id); my @columns = $class_obj->direct_id_column_names; my $i=0; my %column_index = map { $_ => $i++ } @columns; my @id_values_in_pk_order = @values[@column_index{@pk_cols}]; return @id_values_in_pk_order; } sub _default_save_sql_for_object { my $self = shift; my $object_to_save = shift; my %params = @_; my ($class,$id) = ($object_to_save->class, $object_to_save->id); # This was in some of the UR::Object::* meta-data stuff. # Reason unknown. #my $self = shift; #my $class_obj = UR::Object::Type->get(type_name => $self->type_name); #if ($class_obj and $class_obj->table_name) { # return $self->SUPER::default_save_sql(@_); #} #else { # return; #} my $class_object = $object_to_save->__meta__; # This object may have uncommitted changes already saved. # If so, work from the last saved data. # Normally, we go with the last committed data. my $compare_version = ($object_to_save->{'db_saved_uncommitted'} ? 'db_saved_uncommitted' : 'db_committed'); # Determine what the overall save action for the object is, # and get a specific change summary if we're doing an update. my ($action,$change_summary); if ($object_to_save->isa('UR::Object::Ghost')) { $action = 'delete'; } elsif ($object_to_save->{$compare_version}) { $action = 'update'; $change_summary = $object_to_save->property_diff($object_to_save->{$compare_version}); } else { $action = 'insert'; } # Handle each table. There is usually only one, unless, # there is inheritance within the schema. my @save_table_names = map { uc } grep { $_ } $class_object->all_table_names; @save_table_names = reverse @save_table_names unless ($object_to_save->isa('UR::Entity::Ghost')); my @commands; for my $table_name (@save_table_names) { # Get general info on the table we're working-with. my $dsn = ref($self) ? $self->id : $self; # The data source name my $table = UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => $dsn) || UR::DataSource::RDBMS::Table->get(table_name => $table_name, data_source => 'UR::DataSource::Meta'); unless ($table) { Carp::confess("No table $table_name found for data source $dsn!"); } my @table_class_obj = grep { $_->class_name !~ /::Ghost$/ } UR::Object::Type->is_loaded(table_name => $table_name); my $table_class; my $table_class_obj; if (@table_class_obj == 1) { $table_class_obj = $table_class_obj[0]; $table_class = $table_class_obj->class_name; } else { Carp::confess("NO CLASS FOR $table_name: @table_class_obj!\n"); } my $table_name_to_update = $table_name; my $data_source = $UR::Context::current->resolve_data_source_for_object($object_to_save); unless ($data_source) { Carp::confess("No ds on $object_to_save!"); } my $db_owner = $data_source->owner; # The "action" now can vary on a per-table basis. my $table_action = $action; # Handle re-classification of objects. # We skip deletion and turn insert into update in these cases. if ( ($table_class ne $class) and ( ($table_class . "::Ghost") ne $class) ) { if ($action eq 'delete') { # see if the object we're deleting actually exists reclassified my $replacement = $table_class->is_loaded($id); if ($replacement) { next; } } elsif ($action eq 'insert') { # see if the object we're inserting is actually a reclassification # of a pre-existing object my $replacing = $table_class->ghost_class->is_loaded($id); if ($replacing) { $table_action = 'update'; $change_summary = $object_to_save->property_diff(%$replacing); } } } # Determine the $sql and @values needed to save this object. my ($sql, @changed_cols, @values, @value_properties, %value_properties); if ($table_action eq 'delete') { # A row loaded from the database with its object deleted. # Delete the row in the database. @values = $self->_id_values_for_primary_key($table,$object_to_save); my $where = $self->_matching_where_clause($table, \@values); $sql = " DELETE FROM "; $sql .= "${db_owner}." if ($db_owner); $sql .= "$table_name_to_update WHERE $where"; push @commands, { type => 'delete', table_name => $table_name, column_names => undef, sql => $sql, params => \@values, class => $table_class, id => $id, dbh => $data_source->get_default_dbh }; } elsif ($table_action eq 'update') { # Pre-existing row. # Update in the database if there are columns which have changed. my $changes_for_this_table; if (@save_table_names > 1) { my @changes = map { $_ => $change_summary->{$_} } grep { $class_object->table_for_property($_) eq $table_name } keys %$change_summary; $changes_for_this_table = {@changes}; } else { # Shortcut and use the overall changes summary when # there is only one table. $changes_for_this_table = $change_summary; } for my $property (keys %$changes_for_this_table) { my $column_name = $class_object->column_for_property($property); Carp::confess("No column in table $table_name for property $property?") unless $column_name; push @changed_cols, $column_name; push @values, $changes_for_this_table->{$property}; } #$object_to_save->debug_message("Changed cols: @changed_cols", 4); if (@changed_cols) { @values = ( (map { $object_to_save->$_ } @changed_cols) , $self->_id_values_for_primary_key($table,$object_to_save)); my $where = $self->_matching_where_clause($table, \@values); $sql = " UPDATE "; $sql .= "${db_owner}." if ($db_owner); $sql .= "$table_name_to_update SET " . join(",", map { "$_ = ?" } @changed_cols) . " WHERE $where"; push @commands, { type => 'update', table_name => $table_name, column_names => \@changed_cols, sql => $sql, params => \@values, class => $table_class, id => $id, dbh => $data_source->get_default_dbh }; } } elsif ($table_action eq 'insert') { # An object without a row in the database. # Insert into the database. my @changed_cols = reverse sort $table->column_names; $sql = " INSERT INTO "; $sql .= "${db_owner}." if ($db_owner); $sql .= "$table_name_to_update (" . join(",", @changed_cols) . ") VALUES (" . join(',', split(//,'?' x scalar(@changed_cols))) . ")"; @values = map { # when there is a column but no property, use NULL as the value $object_to_save->can($_) ? $object_to_save->$_ : undef } (@changed_cols); push @commands, { type => 'insert', table_name => $table_name, column_names => \@changed_cols, sql => $sql, params => \@values, class => $table_class, id => $id, dbh => $data_source->get_default_dbh }; } else { die "Unknown action $table_action for $object_to_save" . Dumper($object_to_save) . "\n"; } } # next table return @commands; } sub _do_on_default_dbh { my $self = shift; my $method = shift; return 1 unless $self->has_default_dbh(); my $dbh = $self->get_default_dbh; unless ($dbh->$method(@_)) { $self->error_message("DataSource ",$self->get_name," failed to $method: ",$dbh->errstr); return undef; } return 1; } sub commit { my $self = shift; $self->_do_on_default_dbh('commit', @_); } sub rollback { my $self = shift; $self->_do_on_default_dbh('rollback', @_); } sub disconnect { my $self = shift; $self->_do_on_default_dbh('disconnect', @_); } sub resolve_dbic_schema_name { my $self = shift; my @schema_parts = split(/::/, ref($self) ? $self->class_name : $self); # This will be something like namespace::DataSource::name, change it to namespace::DBIC::name $schema_parts[1] = 'DBIC'; my $schema_name = join('::',@schema_parts); return $schema_name; } sub get_dbic_schema { my $self = shift; my $schema_name = $self->resolve_dbic_schema_name(); eval "use $schema_name;"; die $@ if $@; # require DBIx::Class::Schema; # # my $schema_isa = $schema_name . '::ISA'; # { no strict 'refs'; # @$schema_isa = ('DBIx::Class::Schema'); # } # # $schema_name->load_classes(); return $schema_name->connect($self->_dbi_connect_args); } sub _generate_class_data_for_loading { my ($self, $class_meta) = @_; my $parent_class_data = $self->SUPER::_generate_class_data_for_loading($class_meta); my @class_hierarchy = ($class_meta->class_name,$class_meta->ancestry_class_names); my $order_by_clause; do { my @id_column_names; for my $inheritance_class_name (@class_hierarchy) { my $inheritance_class_object = UR::Object::Type->get($inheritance_class_name); unless ($inheritance_class_object->table_name) { next; } @id_column_names = map { my $t = $inheritance_class_object->table_name; ($t) = ($t =~ /(\S+)\s*$/); $t . '.' . $_ } grep { defined } map { my $p = $inheritance_class_object->property_meta_for_name($_); die ("No property $_ found for " . $inheritance_class_object->class_name . "?") unless $p; $p->column_name; } map { $_->property_name } grep { $_->column_name } $inheritance_class_object->direct_id_property_metas; last if (@id_column_names); } $order_by_clause = "order by " . join(",", @id_column_names); }; my @all_table_properties; my @direct_table_properties; my $first_table_name; my $sub_classification_method_name; my ($sub_classification_meta_class_name, $subclassify_by); my @base_joins; my $prev_table_name; my $prev_id_column_name; for my $co ( $class_meta, @{ $parent_class_data->{parent_class_objects} } ) { my $table_name = $co->table_name; next unless $table_name; $first_table_name ||= $co->table_name; $sub_classification_method_name ||= $co->sub_classification_method_name; $sub_classification_meta_class_name ||= $co->sub_classification_meta_class_name; $subclassify_by ||= $co->subclassify_by; push @all_table_properties, map { [$co, $_, $table_name, 0 ] } sort { $a->property_name cmp $b->property_name } grep { (defined $_->column_name && $_->column_name ne '') or (defined $_->calculate_sql && $_->calculate_sql ne '') } UR::Object::Property->get( type_name => $co->type_name ); @direct_table_properties = @all_table_properties if $class_meta eq $co; } my @lob_column_names; my @lob_column_positions; my $pos = 0; for my $class_property (@all_table_properties) { my ($sql_class,$sql_property,$sql_table_name) = @$class_property; my $data_type = $sql_property->data_type || ''; if ($data_type =~ /LOB$/) { push @lob_column_names, $sql_property->column_name; push @lob_column_positions, $pos; } $pos++; } my $query_config; my $post_process_results_callback; if (@lob_column_names) { $query_config = $self->_prepare_for_lob; if ($query_config) { my $dbh = $self->get_default_dbh; my $results_row_arrayref; my @lob_ids; my @lob_values; $post_process_results_callback = sub { $results_row_arrayref = shift; @lob_ids = @$results_row_arrayref[@lob_column_positions]; @lob_values = $self->_post_process_lob_values($dbh,\@lob_ids); @$results_row_arrayref[@lob_column_positions] = @lob_values; $results_row_arrayref; }; } } my $class_data = { %$parent_class_data, all_table_properties => \@all_table_properties, direct_table_properties => \@direct_table_properties, first_table_name => $first_table_name, sub_classification_method_name => $sub_classification_method_name, sub_classification_meta_class_name => $sub_classification_meta_class_name, subclassify_by => $subclassify_by, base_joins => \@base_joins, order_by_clause => $order_by_clause, lob_column_names => \@lob_column_names, lob_column_positions => \@lob_column_positions, query_config => $query_config, post_process_results_callback => $post_process_results_callback, }; return $class_data; } sub _generate_template_data_for_loading { my ($self, $rule_template) = @_; #$DB::single = 1; # class-based values my $class_name = $rule_template->subject_class_name; my $class_meta = $class_name->__meta__; my $class_data = $self->_get_class_data_for_loading($class_meta); my @parent_class_objects = @{ $class_data->{parent_class_objects} }; my @all_table_properties = @{ $class_data->{all_table_properties} }; my $first_table_name = $class_data->{first_table_name}; my @all_id_property_names = @{ $class_data->{all_id_property_names} }; my @id_properties = @{ $class_data->{id_properties} }; my $id_property_sorter = $class_data->{id_property_sorter}; my $order_by_clause = $class_data->{order_by_clause}; my @lob_column_names = @{ $class_data->{lob_column_names} }; my @lob_column_positions = @{ $class_data->{lob_column_positions} }; my $query_config = $class_data->{query_config}; my $post_process_results_callback = $class_data->{post_process_results_callback}; my $class_table_name = $class_data->{class_table_name}; # individual query/boolexpr based my $recursion_desc = $rule_template->recursion_desc; my $recurse_property_on_this_row; my $recurse_property_referencing_other_rows; if ($recursion_desc) { ($recurse_property_on_this_row,$recurse_property_referencing_other_rows) = @$recursion_desc; } my $hints = $rule_template->hints; my $order_by = $rule_template->order_by; my $group_by = $rule_template->group_by; my $is_paged = $rule_template->is_paged; my %group_by_property_names; if ($group_by) { # we only pull back columns we're grouping by if there is grouping happening for my $name (@$group_by) { unless ($class_name->can($name)) { die "$class_name has no property/method $name. Cannot group by it!"; } $group_by_property_names{$name} = 1; } for my $data (@all_table_properties) { my $name = $data->[1]->property_name; if ($group_by_property_names{$name}) { $group_by_property_names{$name} = $data; } } @all_table_properties = grep { ref($_) } values %group_by_property_names; } my %order_by_property_names; if ($order_by) { # we only pull back columns we're ordering by if there is ordering happening for my $name (@$order_by) { unless ($class_name->can($name)) { die "$class_name has no property/method $name. Cannot order by it!"; } $order_by_property_names{$name} = 1; } for my $data (@all_table_properties) { my $name = $data->[1]->property_name; if ($order_by_property_names{$name}) { $order_by_property_names{$name} = $data; } } } #my @group_table_debug = %group_by_property_names; #print "GROUP STARTING WITH @group_table_debug\n" if $group_by; #my @order_table_debug = %order_by_property_names; #print "ORDER STARTING WITH @order_table_debug\n" if $order_by; # the following two sets of variables hold the net result of the logic my $select_clause; my $select_hint; my $from_clause; my $connect_by_clause; my $group_by_clause; # _usually_ items freshly loaded from the DB don't need to be evaluated through the rule # because the SQL gets constructed in such a way that all the items returned would pass anyway. # But in certain cases (a delegated property trying to match a non-object value (which is a bug # in the caller's code from one point of view) or with calculated non-sql properties, then the # sql will return a superset of the items we're actually asking for, and the loader needs to # validate them through the rule my $needs_further_boolexpr_evaluation_after_loading; my @sql_params; my @filter_specs; my @property_names_in_resultset_order; my $object_num = 0; # 0-based, usually zero unless there are joins my @filters = $rule_template->_property_names; my %filters = map { $_ => 0 } grep { substr($_,0,1) ne '-' } @filters; #print Data::Dumper::Dumper($rule_template->constant_value_id, $rule_template->logic_type, $rule_template->logic_detail); unless (@all_id_property_names == 1 && $all_id_property_names[0] eq "id") { delete $filters{'id'}; } my ( @sql_joins, @sql_filters, $prev_table_name, $prev_id_column_name, $eav_class, @eav_properties, $eav_cnt, %pcnt, $pk_used, @delegated_properties, %outer_joins, ); for my $co ( $class_meta, @parent_class_objects ) { my $type_name = $co->type_name; my $class_name = $co->class_name; my @id_property_objects = $co->direct_id_property_metas; my %id_properties = map { $_->property_name => 1 } @id_property_objects; my @id_column_names = map { $_->column_name } @id_property_objects; my $table_name = $co->table_name; if ($table_name) { $first_table_name ||= $table_name; if ($prev_table_name) { die "Database-level inheritance cannot be used with multi-value-id classes ($class_name)!" if @id_property_objects > 1; my $prev_table_alias; if ($prev_table_name =~ /.*\s+(\w+)\s*$/) { $prev_table_alias = $1; } else { $prev_table_alias = $prev_table_name; } push @sql_joins, $table_name => { $id_property_objects[0]->column_name => { link_table_name => $prev_table_alias, link_column_name => $prev_id_column_name }, -is_required => 1, }; } $prev_table_name = $table_name; $prev_id_column_name = $id_property_objects[0]->column_name; } my @properties_to_query = sort keys(%filters), ($hints ? @$hints : ()), ($order_by ? @$order_by : ()), ($group_by ? @$group_by : ()); while (my $property_name = shift @properties_to_query) { my $property = UR::Object::Property->get(type_name => $type_name, property_name => $property_name); next unless $property; my ($operator, $value_position); if (exists $filters{$property_name}) { $operator = $rule_template->operator_for($property_name); $value_position = $rule_template->value_position_for_property_name($property_name); unless (defined $value_position) { die "No value position found in rule template for filter property $property_name?!" . Data::Dumper::Dumper($rule_template); } delete $filters{$property_name}; } $pk_used = 1 if $id_properties{ $property_name }; if ($property->can("expr_sql")) { unless ($table_name) { $self->warning_message("Property '$property_name' of class '$class_name' can 'expr_sql' but has no table!"); next; } my $expr_sql = $property->expr_sql; if (defined $value_position) { push @sql_filters, $table_name => { # cheap hack of putting a whitespace differentiates # from a regular column below " " . $expr_sql => { operator => $operator, value_position => $value_position } }; delete $filters{$property_name}; } next; } if (my $column_name = $property->column_name) { unless ($table_name) { $self->warning_message("Property '$property_name' of class '$class_name' has column '$column_name' but has no table!"); next; } # normal column: filter on it if (defined $value_position) { push @sql_filters, $table_name => { $column_name => { operator => $operator, value_position => $value_position } }; delete $filters{$property_name}; } } elsif ($property->is_transient) { die "Query by transient property $property_name on $class_name cannot be done!"; } elsif ($property->is_delegated) { push @delegated_properties, $property; delete $filters{$property_name}; } elsif ($property->is_calculated) { $needs_further_boolexpr_evaluation_after_loading = 1; delete $filters{$property_name}; } else { next; } } } # end of inheritance loop if ( my @errors = keys(%filters) ) { my $class_name = $class_meta->class_name; $self->error_message("Unknown param(s) >@errors< used to generate SQL for $class_name!"); print Data::Dumper::Dumper($rule_template); Carp::confess(); } my $last_class_name = $class_name; my $last_class_object = $class_meta; my $last_object_num = 0; my $alias_num = 1; my %alias_sql_join; my %joins_done; my @joins_done; DELEGATED_PROPERTY: for my $delegated_property (@delegated_properties) { my $last_alias_for_this_chain; my $alias_for_property_value; my $property_name = $delegated_property->property_name; my @joins = $delegated_property->_get_joins; my $relationship_name = $delegated_property->via; unless ($relationship_name) { $relationship_name = $property_name; $needs_further_boolexpr_evaluation_after_loading = 1; } my $is_optional = $delegated_property->is_optional or $delegated_property->is_many; my $delegate_class_meta = $delegated_property->class_meta; my $via_accessor_meta = $delegate_class_meta->property_meta_for_name($relationship_name); my $final_accessor = $delegated_property->to; if (my $final_accessor_meta = $via_accessor_meta->data_type->__meta__->property_meta_for_name($final_accessor)) { while($final_accessor_meta && $final_accessor_meta->via) { $final_accessor_meta = $final_accessor_meta->to_property_meta(); } $final_accessor = $final_accessor_meta->property_name; } #print "$property_name needs join " # . " via $relationship_name " # . " to $final_accessor" # . " using joins "; my $last_class_object_excluding_inherited_joins; my $final_join = $joins[-1]; my @source_table_and_column_names; my %aliases_for_this_delegate; while (my $object_join = shift @joins) { #$DB::single = 1; #print "\tjoin $join\n"; # print Data::Dumper::Dumper($join); $last_object_num = $object_num; $object_num++; my @joins_for_object = ($object_join); my $joins_for_object = 0; while (my $join = shift @joins_for_object) { $joins_for_object++; my $source_class_name = $join->{source_class}; my $source_class_object = $join->{'source_class_meta'} || $source_class_name->__meta__; my $foreign_class_name = $join->{foreign_class}; my $foreign_class_object = $join->{'foreign_class_meta'} || $foreign_class_name->__meta__; my($foreign_data_source) = UR::Context->resolve_data_sources_for_class_meta_and_rule($foreign_class_object, $rule_template); if ($foreign_data_source ne $self) { # FIXME - do something smarter in the future where it can do a join-y thing in memory $needs_further_boolexpr_evaluation_after_loading = 1; next DELEGATED_PROPERTY; } my $where = $join->{where}; # This will get filled in during the first pass, and every time after we've successfully # performed a join - ie. that the delegated property points directly to a class/property # that is a real table/column, and not a tableless class or another delegated property unless (@source_table_and_column_names) { my @source_property_names = @{ $join->{source_property_names} }; @source_table_and_column_names = map { if ($_->[0] =~ /^(.*)\s+(\w+)\s*$/s) { # This "table_name" was actually a bit of SQL with an inline view and an alias # FIXME - this won't work if they used the optional "as" keyword $_->[0] = $1; $_->[2] = $2; } $_; } map { my $p = $source_class_object->property_meta_for_name($_); unless ($p) { Carp::confess("No property $_ for class ".$source_class_object->class_name); } my($table_name,$column_name) = $p->table_and_column_name_for_property(); if ($table_name && $column_name) { [$table_name, $column_name]; } else { #Carp::confess("Can't determine table and column for property $_ in class " . # $source_class_object->class_name); (); } } @source_property_names; } my @foreign_property_names = @{ $join->{foreign_property_names} }; my @foreign_property_meta = map { $foreign_class_object->property_meta_for_name($_) } @foreign_property_names; my $foreign_table_name; my @foreign_column_names = map { # TODO: encapsulate if ($_->is_calculated) { if ($_->calculate_sql) { $_->calculate_sql; } else { (); } } else { my $foreign_column_name; ($foreign_table_name, $foreign_column_name) = $_->table_and_column_name_for_property(); $foreign_column_name; } } @foreign_property_meta; unless (@foreign_column_names) { # all calculated properties: don't try to join any further last; } unless (@foreign_column_names == @foreign_property_meta) { # some calculated properties, be sure to re-check for a match after loading the object $needs_further_boolexpr_evaluation_after_loading = 1; } if ($foreign_table_name =~ /^(.*)\s+(\w+)\s*$/s) { $foreign_table_name = $1; } unless ($foreign_table_name) { # If we can't make the join because there is no datasource representation # for this class, we're done following the joins for this property # and will NOT try to filter on it at the datasource level $needs_further_boolexpr_evaluation_after_loading = 1; next DELEGATED_PROPERTY; } my $foreign_class_loading_data = $self->_get_class_data_for_loading($foreign_class_object); my $alias = $joins_done{$join->{id}}; unless ($alias) { $alias = "${relationship_name}_${alias_num}"; $alias_num++; if ($foreign_class_object->table_name) { my @extra_filters; # TODO This may not work correctly if the property we're joining on doesn't # have a table to get data from if ($where) { # temp hack # todo: switch to rule processing my @keys; for (my $n = 0; $n < @$where; $n += 2) { push @keys, $where->[$n]; } my @foreign_filter_property_meta = map { $foreign_class_object->property_meta_for_name($_) } @keys; my @foreign_filter_column_names = map { # TODO: encapsulate $_->is_calculated ? (defined($_->calculate_sql) ? ($_->calculate_sql) : () ) : ($_->column_name) } @foreign_filter_property_meta; for (my $n = 0; $n < @keys; $n++) { my $meta = $foreign_filter_property_meta[$n]; my $value = $where->[$n*2+1]; push @extra_filters, $meta->column_name => { value => $value }; } } push @sql_joins, "$foreign_table_name $alias" => { ( map { $foreign_column_names[$_] => { link_table_name => $last_alias_for_this_chain # join alias || $source_table_and_column_names[$_][2] # SQL inline view alias || $source_table_and_column_names[$_][0], # table_name link_column_name => $source_table_and_column_names[$_][1] } } (0..$#foreign_column_names) ), @extra_filters, }; @source_table_and_column_names = (); # Flag that we need to re-derive this at the top of the loop $alias_sql_join{$alias} = $sql_joins[-1]; # Add all of the columns in the join table to the return list # Note that we increment the object numbers. # Note: we add grouping columns individually instead of in chunks if ($group_by) { #$DB::single = 1; } else { push @all_table_properties, map { my $new = [@$_]; $new->[2] = $alias; $new->[3] = $object_num; $new } @{ $foreign_class_loading_data->{direct_table_properties} }; } $last_alias_for_this_chain = $alias; } } if ($group_by) { if ($group_by_property_names{$property_name}) { my ($p) = map { my $new = [@$_]; $new->[2] = $alias; $new->[3] = 0; $new } grep { $_->[1]->property_name eq $final_accessor } @{ $foreign_class_loading_data->{direct_table_properties} }; push @all_table_properties, $p; #print "PROPERTY $property_name IS INVOLVED IN GROUPING: $p\n"; } #else { # $DB::single = 1; # #print "PROPERTY $property_name IS NOT INVOLVDED IN GROUPING!\n"; #} } if ($order_by) { if ($order_by_property_names{$property_name}) { my ($p) = map { my $new = [@$_]; $new->[2] = $alias; $new->[3] = 0; $new } grep { $_->[1]->property_name eq $final_accessor } @{ $foreign_class_loading_data->{direct_table_properties} }; $order_by_property_names{$property_name} = $p if $p; #print "PROPERTY $property_name IS INVOLVED IN ORDERING: $p\n"; } #else { # $DB::single = 1; # #print "PROPERTY $property_name IS NOT INVOLVDED IN ORDERING!\n"; #} #my @order_table_debug = %order_by_property_names; #print " ORDER HAS @order_table_debug\n" if $order_by; } unless ($is_optional) { # if _any_ part requires this, mark it required $alias_sql_join{$alias}{-is_required} = 1; } $joins_done{$join->{id}} = $alias; push @joins_done, $join; # Set these for after all of the joins are done $last_class_name = $foreign_class_name; $last_class_object = $foreign_class_object; if (!@joins and not $alias_for_property_value) { if (grep { $_->[1]->property_name eq $final_accessor } @{ $foreign_class_loading_data->{direct_table_properties} }) { $alias_for_property_value = $alias; #print "found alias for $property_name on $foreign_class_name: $alias\n"; } else { #print "no alias for $property_name on $foreign_class_name\n"; } } if ($joins_for_object == 1) { $last_class_object_excluding_inherited_joins = $last_class_object if ($last_class_object->property_meta_for_name($final_accessor)); # on the first iteration, we figure out the remaining inherited iterations # TODO: get this into the join logic itself in the property meta my @parents = grep { $_->table_name } $foreign_class_object->ancestry_class_metas; if (@parents) { my @last_id_property_names = $foreign_class_object->id_property_names; for my $parent (@parents) { my @parent_id_property_names = $parent->id_property_names; die if @parent_id_property_names > 1; unshift @joins_for_object, { source_class => $last_class_name, source_property_names => [@last_id_property_names], # we change content below foreign_class => $parent->class_name, foreign_property_names => \@parent_id_property_names, is_optional => $is_optional, id => "${last_class_name}::" . join(',',@last_id_property_names), }; @last_id_property_names = @parent_id_property_names; $last_class_name = $foreign_class_name; } next; } } } # next join for this object } # next object join unless ($delegated_property->via) { next; } my $final_accessor_property_meta = $last_class_object_excluding_inherited_joins->property_meta_for_name($final_accessor); unless ($final_accessor_property_meta) { die "Failed to find property $final_accessor for class " . $last_class_object_excluding_inherited_joins->class_name . "!"; } # we don't know for all of the joined properties how they connect back, # but we do know for those which drove the joining #for my $pmeta (reverse @all_table_properties) { # if ($pmeta->[1] == $final_accessor_property_meta) { # $pmeta->[4] = $property_name; # } #} my $sql_lvalue; if ($final_accessor_property_meta->is_calculated) { $sql_lvalue = $final_accessor_property_meta->calculate_sql; unless (defined($sql_lvalue)) { $needs_further_boolexpr_evaluation_after_loading = 1; next; } } else { $sql_lvalue = $final_accessor_property_meta->column_name; unless (defined($sql_lvalue)) { Carp::confess("No column name set for non-delegated/calculated property $property_name of $class_name"); } } my $value_position = $rule_template->value_position_for_property_name($property_name); if (defined $value_position) { my $operator = $rule_template->operator_for($property_name); unless ($alias_for_property_value) { die "No alias found for $property_name?!"; } push @sql_filters, $alias_for_property_value => { $sql_lvalue => { operator => $operator, value_position => $value_position } }; } } # next delegated property # Build the SELECT clause explicitly. $select_clause = $self->_select_clause_for_table_property_data(@all_table_properties); # Oracle places group_by in a comment in the select $select_hint = $class_meta->query_hint; #print Data::Dumper::Dumper(\@sql_joins, \@sql_filters); # Build the FROM clause base. # Add joins to the from clause as necessary, then $from_clause = (defined $first_table_name ? "$first_table_name" : ''); my $cnt = 0; while (@sql_joins) { my $table_name = shift (@sql_joins); my $condition = shift (@sql_joins); my ($table_alias) = ($table_name =~ /(\S+)\s*$/s); my $join_type; if ($condition->{-is_required}) { $join_type = 'INNER'; } else { $join_type = 'LEFT'; } $from_clause .= "\n$join_type join " . $table_name . " on "; # Restart the counter on each join for the from clause, # but for the where clause keep counting w/o reset. $cnt = 0; for my $column_name (keys %$condition) { next if substr($column_name,0,1) eq '-'; my $linkage_data = $condition->{$column_name}; my $expr_sql = (substr($column_name,0,1) eq " " ? $column_name : "${table_alias}.${column_name}"); my @keys = qw/operator value_position value link_table_name link_column_name/; my ($operator, $value_position, $value, $link_table_name, $link_column_name) = @$linkage_data{@keys}; $from_clause .= "\n and " if ($cnt++); if ($link_table_name and $link_column_name) { # the linkage data is a join specifier $from_clause .= "${link_table_name}.${link_column_name} = $expr_sql"; } elsif (defined $value_position) { die "Joins cannot use variable values currently!" } else { my ($more_sql, @more_params) = $self->_extend_sql_for_column_operator_and_value($expr_sql, $operator, $value); if ($more_sql) { $from_clause .= $more_sql; push @sql_params, @more_params; } else { # error return; } } } # next column } # next join # build the WHERE clause by making a data structure which will be parsed outside of this module # special handling of different size lists, and NULLs, make a completely reusable SQL template very hard. while (@sql_filters) { my $table_name = shift (@sql_filters); my $condition = shift (@sql_filters); my ($table_alias) = ($table_name =~ /(\S+)\s*$/s); for my $column_name (keys %$condition) { my $linkage_data = $condition->{$column_name}; my $expr_sql = (substr($column_name,0,1) eq " " ? $column_name : "${table_alias}.${column_name}"); my @keys = qw/operator value_position value link_table_name link_column_name/; my ($operator, $value_position, $value, $link_table_name, $link_column_name) = @$linkage_data{@keys}; if ($link_table_name and $link_column_name) { # the linkage data is a join specifier Carp::confess("explicit column linkage in where clause?"); #$sql .= "${link_table_name}.${link_column_name} = $expr_sql"; } else { # the linkage data is a value position from the @values list unless (defined $value_position) { Carp::confess("No value position for $column_name in query!"); } push @filter_specs, [$expr_sql, $operator, $value_position]; } } # next column } # next join/filter $connect_by_clause = ''; if ($recursion_desc) { my ($this,$prior) = @{ $recursion_desc }; my $this_property_meta = $class_meta->property_meta_for_name($this); my $prior_property_meta = $class_meta->property_meta_for_name($prior); my $this_class_meta = $this_property_meta->class_meta; my $prior_class_meta = $prior_property_meta->class_meta; my $this_table_name = $this_class_meta->table_name; my $prior_table_name = $prior_class_meta->table_name; my $this_column_name = $this_property_meta->column_name || $this; my $prior_column_name = $prior_property_meta->column_name || $prior; $connect_by_clause = "connect by $this_table_name.$this_column_name = prior $prior_table_name.$prior_column_name\n"; #$DB::single = 1; } for my $property_meta_array (@all_table_properties) { push @property_names_in_resultset_order, $property_meta_array->[1]->property_name; } # this is only used when making a real instance object instead of a "set" my $per_object_in_resultset_loading_detail; unless ($group_by) { $per_object_in_resultset_loading_detail = $self->_generate_loading_templates_arrayref(\@all_table_properties); } my $parent_template_data = $self->SUPER::_generate_template_data_for_loading($rule_template); if ($group_by) { # when grouping, we're making set objects instead of regular objects # this means that we re-constitute the select clause and add a group_by clause #$DB::single = 1; $group_by_clause = 'group by ' . $select_clause; $order_by_clause = 'order by ' . $select_clause; # TODO: handle aggregates present in the class definition $select_clause .= ', count(*) count'; unless (@$group_by == @all_table_properties) { print "mismatch table properties vs group by!\n"; } } my @order_table_debug = %order_by_property_names; if ($order_by) { my @data; for my $name (@$order_by) { my $data = $order_by_property_names{$name}; unless (ref($data)) { next; } push @data, $data; } if (@data) { $order_by_clause = 'ORDER BY ' . $self->_select_clause_for_table_property_data(@data); } } my $template_data = $rule_template->{loading_data_cache} = { %$parent_template_data, # custom for RDBMS select_clause => $select_clause, select_hint => $select_hint, from_clause => $from_clause, connect_by_clause => $connect_by_clause, group_by_clause => $group_by_clause, order_by_clause => $order_by_clause, filter_specs => \@filter_specs, sql_params => \@sql_params, # override defaults in the regular datasource $parent_template_data needs_further_boolexpr_evaluation_after_loading => $needs_further_boolexpr_evaluation_after_loading, property_names_in_resultset_order => \@property_names_in_resultset_order, properties_meta_in_resultset_order => \@all_table_properties, loading_templates => $per_object_in_resultset_loading_detail, }; return $template_data; } # We're overriding the method in UR::Object because we support 2 more # event types: connect and query sub validate_subscription { my ($self,$subscription_property) = @_; my $retval = $self->SUPER::validate_subscription(@_); return $retval if $retval; return 1 if ($subscription_property eq 'connect' or $subscription_property eq 'query'); return; } sub _select_clause_for_table_property_data { my $self = shift; my $select_clause = ''; for my $class_property (@_) { my ($sql_class,$sql_property,$sql_table_name) = @$class_property; $sql_table_name ||= $sql_class->table_name; my ($select_table_name) = ($sql_table_name =~ /(\S+)\s*$/s); $select_clause .= ($class_property == $_[0] ? "" : ", "); # FIXME - maybe a better way would be for these sql-calculated properties, the column_name() # or maybe some other related property name) is actually calculated, so this logic # gets encapsulated in there? if (my $sql_function = $sql_property->calculate_sql) { my @calculate_from = ref($sql_property->calculate_from) eq 'ARRAY' ? @{$sql_property->calculate_from} : ( $sql_property->calculate_from ); foreach my $sql_column_name ( @calculate_from ) { $sql_function =~ s/($sql_column_name)/$sql_table_name\.$1/g; } $select_clause .= $sql_function; } else { $select_clause .= $select_table_name . "." . $sql_property->column_name; } } return $select_clause; } 1;