/usr/local/CPAN/ORM/ORM/Db/DBI.pm
#
# DESCRIPTION
# PerlORM - Object relational mapper (ORM) for Perl. PerlORM is Perl
# library that implements object-relational mapping. Its features are
# much similar to those of Java's Hibernate library, but interface is
# much different and easier to use.
#
# AUTHOR
# Alexey V. Akimov <akimov_alexey@sourceforge.net>
#
# COPYRIGHT
# Copyright (C) 2005-2006 Alexey V. Akimov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
package ORM::Db::DBI;
$VERSION = 0.83;
use DBI;
use base 'ORM::Db';
use ORM::Db::DBIResultSet;
use ORM::Db::DBIResultSetFull;
## use: $db = $class->new
## (
## host => string,
## database => string,
## options => string,
## user => string,
## password => string,
##
## delayed_connect => boolean,
## connect_retries => integer,
## retry_sleep => integer,
## )
##
## 'retry_sleep' in seconds.
##
sub new
{
my $class = shift;
my $self = {};
my %arg = @_;
my $data_source;
$self->{connect_retries} = defined $arg{connect_retries} ? int( $arg{connect_retries} ) : 3;
$self->{retry_sleep} = defined $arg{retry_sleep} ? int( $arg{retry_sleep} ) : 1;
$self->{delayed_connect} = $arg{delayed_connect};
$self->{database} = $arg{database};
if( $arg{data_source} )
{
$data_source = $arg{data_source};
}
else
{
$data_source =
"DBI:$arg{driver}:$arg{database}"
. ($arg{host} ? ":$arg{host}" : '')
. ($arg{options} ? ";$arg{options}" : '');
}
$self->{db_arg} = [ $data_source, $arg{user}, $arg{password} ];
$self->{db} = DBI->connect( @{$self->{db_arg}} ) unless( $arg{delayed_connect} );
return bless $self, $class;
}
sub disconnect
{
my $self = shift;
$self->{db} = undef;
$self->{delayed_connect} = 1;
}
sub database { $_[0]->{database}; }
sub count
{
my $self = shift;
my %arg = @_;
my $tjoin;
my $cond;
$tjoin = ORM::Tjoin->new( class=>$arg{class}, all_tables=>1 );
$tjoin->merge( $arg{filter}->_tjoin ) if( $arg{filter} );
$tjoin->assign_aliases;
if( $arg{filter} )
{
$cond = $arg{filter}->_sql_str( tjoin=>$tjoin );
}
my $res = $self->select
(
error => $arg{error},
query =>
(
"SELECT count(DISTINCT "
. $self->qt( $tjoin->first_basic_table_alias ) . ".id) AS "
. $self->qi( 'count' ) . "\n"
. 'FROM ' . $tjoin->sql_table_list . "\n"
. ( $cond && "WHERE $cond\n" )
),
);
return $res ? $res->next_row->{count} : 0;
}
sub select_base
{
my $self = shift;
my %arg = @_;
# Prepare $tjoin object
my $tjoin = ORM::Tjoin->new( class=>$arg{class}, all_tables=>1 );
if( $arg{data} )
{
for my $name ( keys %{$arg{data}} )
{
$tjoin->merge( $arg{data}{$name}->_tjoin ) if( defined $arg{data}{$name} );
}
for my $group_by ( @{$arg{group_by}} )
{
if( ref $group_by && UNIVERSAL::isa( $group_by, 'ORM::Metaprop' ) )
{
$tjoin->merge( $group_by->_tjoin );
}
}
}
$tjoin->merge( $arg{order}->_tjoin ) if( $arg{order} );
$tjoin->merge( $arg{filter}->_tjoin ) if( $arg{filter} );
$tjoin->merge( $arg{post_filter}->_tjoin ) if( $arg{post_filter} );
$tjoin->assign_aliases;
# Prepare WHERE statement for SQL query
my $cond = $arg{filter} && $arg{filter}->_sql_str( tjoin=>$tjoin );
# Prepare HAVING statement for SQL query
my $having = $arg{post_filter} && $arg{post_filter}->_sql_str( tjoin=>$tjoin );
# Prepare GROUP BY statement for SQL query
my $group_by;
for my $grp ( @{$arg{group_by}} )
{
$group_by .= ', ' if( $group_by );
if( UNIVERSAL::isa( $grp, 'ORM::Expr' ) )
{
$group_by .= $grp->_sql_str( tjoin=>$tjoin );
}
else
{
$group_by .= $self->qi( $grp );
}
}
# Prepare ORDER statement for SQL query
my $order = $arg{order} && $arg{order}->sql_order_by( tjoin=>$tjoin );
# Prepare SELECT statement for SQL query
my $select;
if( $arg{data} )
{
$select = '';
for my $alias ( keys %{$arg{data}} )
{
my $data = ref $arg{data}{$alias} ? $arg{data}{$alias} : ORM::Const->new( $arg{data}{$alias} );
$select .= ",\n" if( $select );
$select .= ' ' . $data->_sql_str( tjoin=>$tjoin ) . ' AS ' . $self->qi( $alias );
}
}
else
{
$select = ' DISTINCT ' . $tjoin->sql_select_basic_tables;
}
# Prepare LIMIT statement for SQL query
my $limit = $self->_sql_limit( $arg{page}, $arg{pagesize} );
# Prepare query string and fetch data
my $query =
"SELECT\n$select\n"
. 'FROM ' . $tjoin->sql_table_list . "\n"
. ( $cond ? "WHERE\n $cond\n" : '' )
. ( $group_by ? "GROUP BY $group_by\n" : '' )
. ( $having ? "HAVING\n $having\n" : '' )
. ( $order ? "ORDER BY $order\n" : '' )
. ( $limit ? "$limit\n" : '' )
. ( $self->{ta} ? $self->_ta_select."\n" : '' );
$self->select
(
tables => $tjoin->select_basic_tables,
query => $query,
error => $arg{error},
);
}
sub select_full
{
my $self = shift;
my %arg = @_;
my $error = ORM::Error->new;
my $fullres;
my $res = $self->select_base
(
class => $arg{class},
filter => $arg{filter},
order => $arg{order},
page => $arg{page},
pagesize => $arg{pagesize},
error => $error,
);
unless( $error->fatal )
{
my %class2id;
my %id2data;
my %residual_tables;
my $residual;
my $residual_data;
my $data;
$fullres = ORM::Db::DBIResultSetFull->new;
while( $data = $res->next_row )
{
my $obj = $arg{class}->_cache->get( $data->{id}, 0 );
if( $obj )
{
$fullres->add_row( $obj );
}
else
{
$fullres->add_row( $data );
if( $data->{class} ne $arg{class} )
{
$class2id{ $data->{class} } .= $data->{id}.',';
$id2data{ $data->{id} } = $data;
}
}
}
for my $inh_class ( keys %class2id )
{
$arg{class}->_load_ORM_class( $inh_class );
%residual_tables = ();
chop $class2id{ $inh_class };
for
(
my $i = scalar( @{$res->result_tables} );
$i < $inh_class->_db_tables_count;
$i++
)
{
$residual_tables{ $inh_class->_db_table( $i ) } = 1;
}
if( %residual_tables )
{
$residual = $self->select_tables
(
id => $class2id{ $inh_class },
tables => \%residual_tables,
error => $error,
);
last if( $error->fatal );
while( $residual_data = $residual->next_row )
{
$data = $id2data{ $residual_data->{id} };
for my $key ( keys %$residual_data )
{
$data->{$key} = $residual_data->{$key};
}
}
}
}
}
$error->upto( $arg{error} );
return $error->fatal ? undef : $fullres;
}
sub select_tables
{
my $self = shift;
my %arg = @_;
my @tables = keys %{$arg{tables}};
my $fields_to_select = '';
my $tables_str = '';
my $inner_join = '';
for( my $i=0; $i<@tables; $i++ )
{
if( ref $arg{tables}{$tables[$i]} eq 'HASH' )
{
for my $prop ( keys %{$arg{tables}{$tables[$i]}} )
{
$fields_to_select .= $self->qt($tables[$i]).'.'.$self->qf($prop).',';
}
}
else
{
$fields_to_select .= $self->qt( $tables[$i] ).'.*,';
}
$tables_str .= $self->qt( $tables[$i] ).',';
if( $i < $#tables )
{
$inner_join .= $self->qt($tables[$i]).".id=".$self->qt($tables[$i+1]).".id AND ";
}
}
chop $fields_to_select;
chop $tables_str;
my $query =
'SELECT ' . $fields_to_select
. ' FROM ' . $tables_str
. ' WHERE ' . $inner_join . $self->qt( $tables[0] ).'.id IN ('.$arg{id}.')'
. ( $self->{ta} ? ' '.$self->_ta_select : '' );
$self->select
(
tables => [ keys %{$arg{tables}} ],
query => $query,
error => $arg{error},
);
}
sub select_stat { shift->select_base( @_ ); }
sub insert_object
{
my $self = shift;
my %arg = @_;
my $obj = $arg{object};
my $obj_class = ref $obj;
my $id = $arg{id};
my $error = ORM::Error->new;
my $ta = $obj_class->new_transaction( error=>$error );
# Insert new records into tables
my @table = $obj_class->_db_tables;
my %values;
my $i;
for( $i=0; $i<@table && !$error->fatal; $i++ )
{
%values = ();
if( $i == 0 )
{
$values{id} = $id if( $id );
$values{class} = $obj_class if( !$obj_class->_is_sealed );
}
else
{
$values{id} = $id;
}
for my $field ( $obj_class->_db_table_fields( $table[$i] ) )
{
$values{$field} = $obj->_property_id( $field );
}
my $rows_affected = $self->insert
(
table => $table[$i],
values => \%values,
error => $error,
);
if( $rows_affected == 1 )
{
$id = $self->insertid if( $i==0 && ! $id );
}
else
{
$error->add_fatal
(
"Insert into table '$table[$i]' failed, $rows_affected rows affected"
);
}
}
$error->upto( $arg{error} );
return $id;
}
sub update_object
{
my $self = shift;
my %arg = @_;
my $obj = $arg{object};
my $obj_class = ref $obj;
my $error = ORM::Error->new;
my $ta = $obj_class->new_transaction( error=>$error );
my %table;
unless( $error->fatal )
{
for my $prop ( keys %{$arg{values}} )
{
$table{ $obj_class->_prop2table($prop) }{ $prop } = $arg{values}{$prop};
}
for my $table ( keys %table )
{
$self->update_object_part
(
object => $obj,
values => $table{ $table },
error => $error,
);
}
}
$error->upto( $arg{error} );
}
sub update_object_part
{
my $self = shift;
my %arg = @_;
my $obj = $arg{object};
my $obj_class = ref $obj;
my $check_all_props = 0;
my $left_prop = (each %{$arg{values}})[0];
my $tjoin = ORM::Tjoin->new( class=>$obj_class, left_prop=>$left_prop, all_tables=>$arg{all_tables} );
for my $prop ( keys %{$arg{values}} )
{
$check_all_props = 1 if( ! ref $arg{values}{$prop} );
if( UNIVERSAL::isa( $arg{values}{$prop}, 'ORM::Expr' ) )
{
$tjoin->merge( $arg{values}{$prop}->_tjoin );
}
}
$tjoin->assign_aliases;
# Prepare WHERE statement
my $where;
my $filter = ORM::Expr->_and( $obj->M->id == $obj->id );
if( $check_all_props )
{
for my $prop ( keys %{$arg{old_values}} )
{
$filter->add_expr
(
defined $arg{old_values}{$prop}
? $obj->M->_prop( $prop ) == $arg{old_values}{$prop}
: $obj->M->_prop( $prop )->_is_undef
);
}
}
$where = $filter->_sql_str( tjoin=>$tjoin );
# Prepare SET statement
my $set = '';
for my $prop ( keys %{$arg{values}} )
{
$set .=
$obj->M( $prop )->_sql_str( tjoin=>$tjoin )
. '='
. (
( UNIVERSAL::isa( $arg{values}{$prop}, 'ORM::Expr' ) )
? $arg{values}{$prop}
: ORM::Const->new( $arg{values}{$prop} )
)->_sql_str( tjoin=>$tjoin )
. ',';
}
chop $set;
my $rows_affected = $self->do
(
error => $arg{error},
query =>
(
"UPDATE " . $tjoin->sql_table_list . "\n"
. " SET $set\n"
. " WHERE $where"
),
);
if( $rows_affected == 0 )
{
$arg{error} && $arg{error}->add_fatal
(
"Failed to update object with id#".$obj->id
. " of class '$obj_class', $rows_affected rows affected,"
. " may be object was updated elsewhere."
);
}
elsif( $rows_affected > $tjoin->tables_count )
{
$arg{error} && $arg{error}->add_fatal
(
"Internal error occured!"
. " More than expected number of rows was updated ($rows_affected)."
. " Please report to developer."
);
}
}
sub delete_object
{
my $self = shift;
my %arg = @_;
my $obj = $arg{object};
my $obj_class = ref $obj;
my $error = ORM::Error->new;
my $ta = $obj_class->new_transaction( error=>$error );
my @table = $obj_class->_db_tables;
my $rows_affected;
$self->check_object_referers
(
object => $obj,
error => $error,
check => $arg{emulate_foreign_keys},
);
unless( $error->fatal )
{
for( $i=$#table; $i>=0 && !$error->fatal; $i-- )
{
my $rows_affected = $self->delete_by_id
(
table => $table[$i],
id => $obj->id,
error => $error,
);
if( $rows_affected != 1 )
{
$error->add_fatal( "Failed to delete row with id#$id from '$table[$i]' during object delete" );
}
}
# must check twise, new referers could be created during deletion
unless( $error->fatal )
{
$self->check_object_referers
(
object => $obj,
error => $error,
check => $arg{emulate_foreign_keys},
);
}
}
$error->upto( $arg{error} );
}
sub optimize_tables
{
my $self = shift;
my %arg = @_;
$self->do
(
query => 'OPTIMIZE TABLE '.$arg{class}->_db_tables_str,
error => $arg{error},
);
}
sub referencing_classes
{
my $self = shift;
my %arg = @_;
my $error = ORM::Error->new;
my $res;
my $data;
my @res;
$res = $self->select
(
error => $error,
query =>
'SELECT class,prop FROM '.$self->qt('_ORM_refs').' WHERE ref_class='
. $self->qc( $arg{class} )
);
unless( $error->fatal )
{
while( $data = $res->next_row )
{
push @res, $data;
}
}
$error->upto( $arg{error} );
return @res;
}
sub begin_transaction
{
my $self = shift;
my %arg = @_;
my $error = ORM::Error->new;
$self->{ta} = 1;
$self->_db_handler->begin_work();
$error->add_fatal( $self->_db_handler->errstr ) if( $self->_db_handler->err );
ORM::DbLog->new( sql=>"BEGIN", error=>$error->text );
$error->upto( $arg{error} );
}
sub commit_transaction
{
my $self = shift;
my %arg = @_;
my $error = ORM::Error->new;
delete $self->{ta};
$self->_db_handler->commit();
$error->add_fatal( $self->_db_handler->errstr ) if( $self->_db_handler->err );
ORM::DbLog->new( sql=>"COMMIT", error=>$error->text );
$error->upto( $arg{error} );
}
sub rollback_transaction
{
my $self = shift;
my %arg = @_;
my $error = ORM::Error->new;
delete $self->{ta};
unless( $self->{lost_connection} )
{
$self->_db_handler->rollback();
$error->add_fatal( $self->_db_handler->errstr ) if( $self->_db_handler->err );
ORM::DbLog->new( sql=>"ROLLBACK", error=>$error->text );
}
$error->upto( $arg{error} );
}
##
## PROTECTED METHODS
##
## use: $db->insert( table=>string, values=>hash, error=>ORM::Error )
##
sub insert
{
my $self = shift;
my %arg = @_;
my $keys;
my $values;
my $table;
$table = $self->qt( $arg{table} );
for my $key ( keys %{$arg{values}} )
{
$keys .= $self->qf( $key ) . ',';
$values .= $self->qc( $arg{values}{$key} ) . ',';
}
chop $keys;
chop $values;
$self->do
(
query => "INSERT INTO $table ($keys) VALUES ($values)",
error => $arg{error},
);
}
## use: $db->delete_by_id
## (
## table => $string,
## id => number,
## error => ORM::Error,
## )
##
sub delete_by_id
{
my $self = shift;
my %arg = @_;
$self->do
(
query => ( "DELETE FROM ".$self->qt($arg{table})." WHERE id=".$self->qc($arg{id}) ),
error => $arg{error},
);
}
## use: $db->do( query=>$string, error=>ORM::Error )
##
sub do
{
my $self = shift;
$self->select( return_rows_count=>1, @_ );
}
## use: $result = $db->select
## (
## tables => ARRAY,
## query => string,
## error => ORM::Error,
##
## return_rows_count => 1,
## );
##
sub select
{
my $self = shift;
my %arg = @_;
my $error = ORM::Error->new;
my $h_error = ORM::Error->new;
my $retry = 1;
my $tries = $self->{connect_retries};
my $query = $arg{query};
my $rows_affected;
my $st;
$self->{lost_connection} = 0;
while( $retry )
{
$retry = 0;
if( ! $self->{db} )
{
$self->_db_reconnect;
$retry = $tries--;
if( $retry )
{
if( $self->{delayed_connect} )
{
delete $self->{delayed_connect};
}
else
{
print STDERR
"No connection, connecting to SQL server '"
. $self->{db_arg}[0]
. "' ($tries tries left)\n";
}
next;
}
else
{
$error->add_fatal( "No db connection" );
$self->{lost_connection} = 1;
last;
}
}
$st = $self->{db}->prepare( $query );
$h_error = ORM::Error->new;
$st && ( $DBI::VERSION >= 1.21 ) &&
(
$st->{HandleError} = sub
{
$h_error->add_fatal( "DBI Error Handler($_[1],'".($_[2]||'')."'): $_[0]" );
return 1;
}
);
if( $st )
{
$st->execute;
}
else
{
$error->add_fatal( "Failed to execute query, 'prepare' returned undef, query='$query'" );
}
if( $st && $st->err && $self->_lost_connection( $st->err ) )
{
$self->_db_reconnect;
$retry = $tries--;
print STDERR
"Lost connection, reconnecting to SQL server '"
. $self->{db_arg}[0]
. "' ($tries tries left)\n";
}
}
# Catch up errors
if( $st && $st->err )
{
$error->add_fatal
(
'DBI Error '.$st->err.': ' . $st->errstr
. ', Query="' . $query . '"'
);
}
elsif( $h_error->any )
{
$error->add( error=>$h_error );
}
ORM::DbLog->new
(
sql => $query,
error => $error->text,
);
$error->upto( $arg{error} );
return $arg{return_rows_count}
? ( $st && $st->rows != 4294967294 ? $st->rows : 0 )
: (
$error->fatal
? undef
: ORM::Db::DBIResultSet->new( result=>$st, tables=>$arg{tables} )
);
}
sub ql
{
my $class = shift;
my $str = shift;
$str =~ s/_/\\_/g;
$str =~ s/%/\\%/g;
return $class->qc( $str );
}
sub _db_handler { $_[0]->{db}; }
sub _db_reconnect
{
my $self = shift;
$self->{db} = undef;
sleep $self->{retry_sleep};
$self->{db} = DBI->connect( @{$self->{db_arg}} );
}
sub _ta_select { 'FOR UPDATE'; }
sub _sql_limit
{
my $self = shift;
my $page = (int shift)||1;
my $pagesize = int shift;
my $sql;
if( $pagesize )
{
$sql = "LIMIT ".(($page-1)*$pagesize).",$pagesize";
}
return $sql;
}
sub check_object_referers
{
my $self = shift;
my %arg = @_;
my $obj = $arg{object};
my $obj_class = ref $obj;
if( $arg{check} )
{
for my $ref ( $obj_class->_rev_refs )
{
my $referers = $ref->[0]->count
(
filter => ( $ref->[0]->M->_prop($ref->[1])==$obj->id ),
error => $arg{error},
);
if( $referers )
{
$arg{error}->add_fatal
(
"Can't delete instance ID#" . $obj->id
. " of '$obj_class', because there're "
. "$referers instances of '$ref->[0]' refer to it."
);
}
}
}
return undef;
}
##
## ABSTRACT METHODS
##
sub insertid
{
die "You forget to override 'insertid' in '$_[0]'";
}
sub _lost_connection
{
die "You forget to override '_lost_connection' in '$_[0]'";
}
##
## SQL FUNCTIONS
##
sub _func_concat { shift; ORM::Filter::Cmp->new( '||', @_ ); }