| Data-Model documentation | Contained in the Data-Model distribution. |
Data::Model::Driver::DBI - storage driver for DBI
package MyDB;
use base 'Data::Model';
use Data::Model::Schema;
use Data::Model::Driver::DBI;
my $dbi_connect_options = {};
my $driver = Data::Model::Driver::DBI->new(
dsn => 'dbi:mysql:host=localhost:database=test',
username => 'user',
password => 'password',
connect_options => $dbi_connect_options,
reuse_dbh => 1, # sharing dbh (experimental option)
# When you use by MySQL, please set up
# connect_options => { mysql_auto_reconnect => 1 },
# simultaneously. but mysql_auto_reconnect is very unsettled.
);
base_driver $driver;
install_model model_name => schema {
....
};
DBD that is working now is only mysql and SQLite.
Kazuhiro Osawa <yappo <at> shibuya <döt> pl>
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| Data-Model documentation | Contained in the Data-Model distribution. |
package Data::Model::Driver::DBI; use strict; use warnings; use base 'Data::Model::Driver'; use Carp (); $Carp::Internal{(__PACKAGE__)}++; use DBI; use Data::Model::SQL; use Data::Model::Driver::DBI::DBD; sub dbd { $_[0]->{dbd} } sub dbi_config { my($self, $name) = @_; $self->{dbi_config}->{$name} or Carp::croak "has not dbi_config name '$name'"; } sub init { my $self = shift; if (my($type) = $self->{dsn} =~ /^dbi:(\w*)/i) { $self->{dbd} = Data::Model::Driver::DBI::DBD->new($type); } $self->{dbi_config} = +{ rw => +{ dsn => delete $self->{dsn}, username => delete $self->{username}, password => delete $self->{password}, connect_options => delete $self->{connect_options}, dbh => undef, }, }; } my %reuse_handles; sub init_db { my($self, $name, %args) = @_; my $dbi_config = $self->dbi_config($name); my $dsn = $dbi_config->{dsn}; my $dbh; if ($self->{reuse_dbh}) { $dbh = $reuse_handles{$dsn}; } unless ($dbh && ($args{no_ping} || $dbh->ping)) { $dbh = DBI->connect( $dsn, $dbi_config->{username}, $dbi_config->{password}, { RaiseError => 1, PrintError => 0, AutoCommit => 1, %{ $dbi_config->{connect_options} || {} } }, ) or Carp::croak("Connection error: " . $DBI::errstr); if ($self->{reuse_dbh}) { $reuse_handles{$dsn} = $dbh; } } $self->{__dbh_init_by_driver} = 1; $dbh; } sub _get_dbh { my $self = shift; my $name = shift || 'rw'; my %args = @_; # this option is experimental my $dbi_config = $self->dbi_config($name); unless ($args{no_ping}) { $dbi_config->{dbh} = undef if $dbi_config->{dbh} and !$dbi_config->{dbh}->ping; } unless ($dbi_config->{dbh} || $args{cannot_reconnect}) { if (my $getter = $self->{get_dbh}) { $dbi_config->{dbh} = $getter->(); } else { $dbi_config->{dbh} = $self->init_db($name, %args) or Carp::croak $self->last_error; } } $dbi_config->{dbh}; } sub rw_handle { shift->_get_dbh('rw', @_) }; sub r_handle { shift->rw_handle(@_) } sub last_error {} sub add_key_to_where { my($self, $stmt, $columns, $key) = @_; if ($key) { # add where my $i = 0; for my $i (0..( scalar(@{ $key }) - 1 )) { $stmt->add_where( $columns->[$i] => $key->[$i] ); } } } sub add_index_to_where { my($self, $schema, $stmt, $index_obj) = @_; return unless my($index, $index_key) = (%{ $index_obj }); $index_key = [ $index_key ] unless ref($index_key) eq 'ARRAY'; for my $index_type (qw/ unique index /) { if (exists $schema->$index_type->{$index}) { $self->add_key_to_where($stmt, $schema->$index_type->{$index}, $index_key); last; } } } sub bind_params { my($self, $schema, $columns, $sth) = @_; my $i = 1; for my $column (@{ $columns }) { my($col, $val) = @{ $column }; my $type = $schema->column_type($col); my $attr = $self->dbd->bind_param_attributes($type, $columns, $col); $sth->bind_param($i++, $val, $attr || undef); } } sub fetch { my($self, $rec, $schema, $key, $columns, %args) = @_; $columns = +{} unless $columns; $columns->{select} ||= [ $schema->column_names ]; $columns->{from} ||= []; unshift @{ $columns->{from} }, $schema->model; my $index_query = delete $columns->{index}; my $stmt = Data::Model::SQL->new(%{ $columns }); $self->add_key_to_where($stmt, $schema->key, $key) if $key; $self->add_index_to_where($schema, $stmt, $index_query) if $index_query; my $sql = $stmt->as_sql; # bind_params my @params; for my $i (1..scalar(@{ $stmt->bind })) { push @params, [ $stmt->bind_column->[$i - 1], $stmt->bind->[$i - 1] ]; } my @bind; my $map = $stmt->select_map; for my $col (@{ $stmt->select }) { push @bind, \$rec->{ exists $map->{$col} ? $map->{$col} : $col }; } my $sth; eval { my $dbh = $self->r_handle; $self->start_query($sql, $stmt->bind); $sth = $args{no_cached_prepare} ? $dbh->prepare($sql) : $dbh->prepare_cached($sql); $self->bind_params($schema, \@params, $sth); $sth->execute; $sth->bind_columns(undef, @bind); }; if ($@) { $self->_stack_trace($sth, $sql, $stmt->bind, $@); } $sth; } sub lookup { my($self, $schema, $id, %args) = @_; my $rec = +{}; my $sth = $self->fetch($rec, $schema, $id, {}, %args); my $rv = $sth->fetch; $sth->finish; $self->end_query($sth); undef $sth; return unless $rv; return $rec; } sub lookup_multi { my($self, $schema, $ids, %args) = @_; my @keys = @{ $schema->key }; my $query = {}; if (@keys == 1) { my @id_list = map { $_->[0] } @{ $ids }; $query = { where => [ $keys[0] => \@id_list ] }; } else { my @queries; for my $id (@{ $ids }) { my %query; @query{@keys} = @{ $id }; push @queries, '-and' => [ %query ]; } $query = { where => [ -or => \@queries ] }; } my $rec = +{}; local $args{no_cached_prepare} = 1; my $sth = $self->fetch($rec, $schema, undef, $query, %args); my %resultlist; while ($sth->fetch) { my $key = $schema->get_key_array_by_hash($rec); $resultlist{join "\0", @{ $key }} = +{ %{ $rec } }; } $sth->finish; $self->end_query($sth); undef $sth; \%resultlist; } sub get { my($self, $schema, $key, $columns, %args) = @_; my $rec = +{}; my $sth = $self->fetch($rec, $schema, $key, $columns, %args); my $i = 0; my $iterator = sub { return unless $sth; return $rec if $i++ eq 1; unless ($sth->fetch) { $sth->finish; $self->end_query($sth); undef $sth; return; } $rec; }; # pre load return unless $iterator->(); return $iterator, +{ end => sub { if ($sth) { $sth->finish; $self->end_query($sth); undef $sth; } }, }; } # insert or replace sub set { my $self = shift; $self->_insert_or_replace(0, @_); } sub replace { my($self, $schema, $key, $columns, %args) = @_; if ($self->dbd->can_replace) { return $self->_insert_or_replace(1, $schema, $key, $columns, %args); } else { # $self->thx(sub { $self->delete($schema, $key, +{}, %args); return $self->set($schema, $key, $columns, %args); # }); } } sub _on_duplicate_key_update { my($self, $schema, $columns, $args, $sql, $column_list) = @_; my $table = $schema->model; # check unique keys my $keys = $schema->key; my $unique = $schema->unique; my $key_columns = []; if (scalar(@{ $keys }) >= 1) { if (scalar(keys %{ $unique }) >= 1) { Carp::croak "on_duplicate_key_update support: $table has multi unique key"; } # OK $key_columns = $keys; } elsif (scalar(keys %{ $unique }) > 1) { Carp::croak "on_duplicate_key_update support: $table has multi unique key"; } elsif (scalar(keys %{ $unique }) == 1) { # OK while (my($k, $v) = each %{ $unique }) { $key_columns = $v; } } else { Carp::croak "on_duplicate_key_update support: $table not has key or unique index"; } # check key num my $has_keys = 1; for my $k (@{ $key_columns }) { $has_keys = 0 unless defined $columns->{$k}; } Carp::croak "on_duplicate_key_update support: $table is insufficient keys" unless $has_keys; # append sql my @set; for my $column (keys %{ $args }) { my $val = $args->{$column}; if (ref($val) eq 'SCALAR') { push @set, "$column = " . ${ $val }; } elsif (!ref($val)) { push @set, "$column = ?"; push @{ $column_list }, [ $column => $val ]; } else { Carp::confess 'No references other than a SCALAR reference can use a update column'; } } ${ $sql } .= ' ON DUPLICATE KEY UPDATE ' . join(', ', @set) . "\n"; } sub _insert_or_replace { my($self, $is_replace, $schema, $key, $columns, %args) = @_; my $select_or_replace = $is_replace ? 'REPLACE' : 'INSERT'; my $table = $schema->model; my $cols = [ keys %{ $columns } ]; my @column_list = map { [ $_ => $columns->{$_} ] } @{ $cols }; my $sql = "$select_or_replace INTO $table\n"; $sql .= '(' . join(', ', @{ $cols }) . ')' . "\n" . 'VALUES (' . join(', ', ('?') x @{ $cols }) . ')' . "\n"; # ON DUPLICATE KEY UPDATE support for MySQL if ($args{on_duplicate_key_update} && $self->dbd->has_support('on_duplicate_key_update')) { $self->_on_duplicate_key_update($schema, $columns, $args{on_duplicate_key_update}, \$sql, \@column_list); } my $sth; eval { my $dbh = $self->rw_handle; $self->start_query($sql, $columns); $sth = $dbh->prepare_cached($sql); $self->bind_params($schema, \@column_list, $sth); $sth->execute; $sth->finish; $self->end_query($sth); # set autoincrement key $self->_set_auto_increment($schema, $columns, sub { $self->dbd->fetch_last_id( $schema, $columns, $dbh, $sth ) }); }; if ($@) { $self->_stack_trace($sth, $sql, \@column_list, $@); } undef $sth; $columns; } # update sub _update { my($self, $schema, $changed_columns, $columns, $where_sql, $pre_bind, $pre_bind_column) = @_; my @bind; my @bind_column; my @set; for my $column (keys %{ $changed_columns }) { my $val = $columns->{$column}; if (ref($val) eq 'SCALAR') { push @set, "$column = " . ${ $val }; } elsif (!ref($val)) { push @set, "$column = ?"; push @bind, $val; push @bind_column, $column; } else { Carp::confess 'No references other than a SCALAR reference can use a update column'; } } push @bind, @{ $pre_bind }; push @bind_column, @{ $pre_bind_column }; # bind_params my @params; for my $i (1..scalar(@bind)) { push @params, [ $bind_column[$i - 1], $bind[$i - 1] ]; } my $sql = 'UPDATE ' . $schema->model . ' SET ' . join(', ', @set) . ' ' . $where_sql; my $sth; eval { my $dbh = $self->rw_handle; $self->start_query($sql, \@bind); $sth = $dbh->prepare_cached($sql); $self->bind_params($schema, \@params, $sth); $sth->execute; $sth->finish; $self->end_query($sth); }; if ($@) { $self->_stack_trace($sth, $sql, \@params, $@); } if (wantarray) { my @ret = $sth->rows; undef $sth; return @ret; } else { my $ret = $sth->rows; undef $sth; return $ret; } } sub update { my($self, $schema, $old_key, $key, $old_columns, $columns, $changed_columns, %args) = @_; my $stmt = Data::Model::SQL->new; $self->add_key_to_where($stmt, $schema->key, $old_key); my $where_sql = $stmt->as_sql_where; return unless $where_sql; return $self->_update($schema, $changed_columns, $columns, $where_sql, $stmt->bind, $stmt->bind_column); } sub update_direct { my($self, $schema, $key, $query, $columns, %args) = @_; my $index_query = delete $query->{index}; my $stmt = Data::Model::SQL->new(%{ $query }); $self->add_key_to_where($stmt, $schema->key, $key) if $key; $self->add_index_to_where($schema, $stmt, $index_query) if $index_query; my $where_sql = $stmt->as_sql_where; return unless $where_sql; return $self->_update($schema, $columns, $columns, $where_sql, $stmt->bind, $stmt->bind_column); } # delete sub delete { my($self, $schema, $key, $columns, %args) = @_; $columns->{from} = [ $schema->model ]; my $index_query = delete $columns->{index}; my $stmt = Data::Model::SQL->new(%{ $columns }); $self->add_key_to_where($stmt, $schema->key, $key) if $key; $self->add_index_to_where($schema, $stmt, $index_query) if $index_query; # bind_params my @params; for my $i (1..scalar(@{ $stmt->bind })) { push @params, [ $stmt->bind_column->[$i - 1], $stmt->bind->[$i - 1] ]; } my $sql = "DELETE " . $stmt->as_sql; my $sth; eval { my $dbh = $self->rw_handle; $self->start_query($sql, $stmt->bind); $sth = $dbh->prepare_cached($sql); $self->bind_params($schema, \@params, $sth); $sth->execute; $sth->finish; $self->end_query($sth); }; if ($@) { $self->_stack_trace($sth, $sql, $stmt->bind, $@); } if (wantarray) { my @ret = $sth->rows; undef $sth; return @ret; } else { my $ret = $sth->rows; undef $sth; return $ret; } } # for schema sub _as_sql_hook { my $self = shift; $self->dbd->_as_sql_hook(@_); } # stack trace sub _stack_trace { my($self, $sth, $sql, $binds, $reason) = @_; require Data::Dumper; if ($sth) { # finalize sth handle $sth->finish; $self->end_query($sth); } $sql =~ s/\n/\n /gm; Carp::croak sprintf <<"TRACE", $reason, $sql, Data::Dumper::Dumper($binds); **** { Data::Model::Driver::DBI 's Exception **** Reason : %s SQL : %s **** BINDS DUMP **** %s **** Data::Model::Driver::DBI 's Exception } **** TRACE } # profile sub start_query {} sub end_query {} sub DESTROY { my $self = shift; return unless $self->{__dbh_init_by_driver}; # if (my $dbh = $self->dbh) { # $dbh->disconnect if $dbh; # } } # for transactions sub txn_begin { my $self = shift; $self->{active_transaction} = 1; my $dbh = $self->rw_handle; eval { $dbh->begin_work } or Carp::croak $@; } sub txn_rollback { my $self = shift; return unless $self->{active_transaction}; my $dbh = $self->rw_handle; eval { $dbh->rollback } or Carp::croak $@; } sub txn_commit { my $self = shift; return unless $self->{active_transaction}; my $dbh = $self->rw_handle; eval { $dbh->commit } or Carp::croak $@; } sub txn_end { $_[0]->{active_transaction} = 0; } 1;