Solstice::Database - Wrapper around DBI.


Solstice documentation Contained in the Solstice distribution.

Index


Code Index:

NAME

Top

Solstice::Database - Wrapper around DBI.

SYNOPSIS

Top

  use Solstice::Database;

  my $db = new Solstice::Database()

  # For a read only query that can be sent to any read only servers
  # that are syncronized, or the write master otherwise.
  $db->readQuery("SELECT fname, lname FROM solstice.Person WHERE person_id=?", 15);

  while (my $data_ref = $db->fetchRow()) {
    warn "First: ".$data_ref->{'fname'};
    warn "Last:".$data_ref->{'lname'};
  }

  # For any inserts/updates/deletes, that
  # must go to the master
  $db->writeQuery("INSERT INTO solstice.Person (fname, lname) VALUES (?, ?)", 'Patrick', 'Michaud');

  # Get the id of that person
  my $id = $db->getLastInsertID();

  # Get a read lock (that is, lock other people from reading)
  $db->readLock('solstice.Person');

  # Get a write lock (that is, lock other people from writing)
  $db->writeLock('solstice.Person');

  # Unlock any locks
  $db->unlockTable('solstice.Person');

DESCRIPTION

Top

This object is here to make the database connections reliable and consistent across the Solstice tools source tree. Unlike the most generic methods for database connectivity, these methods are reliable and efficient in the mod_perl environment.

**It is strongly recommended that you use this object to make all of your database connections when programming perl source for the Solstice Tools**

Export

No symbols exported.

Methods

new()

Constructor. Creates a database handle and caches it.

readQuery($sqlCommand [, $param]*)

For a read only query that can be sent to any read only servers that are synchronized, or the write master. Dies on error, and returns undef.

getSlave

Returns the database handle to a slave db. If there are no slaves, returns the master database handle.

hasSlaves

Returns a count of slaves available for connecting.

fetchRow()

After a read query, fetches a row of results. Returns undef when there aren't any more rows to read, otherwise returns a hash ref.

rowCount()

Return a count of rows returned by the last read query, or undef if a read cursor is not defined.

writeQuery($sql_command [, $param]*)

For any inserts/updates/deletes that must go to the master. Dies on error, and returns undef.

getLastInsertID()

Gets the id of the most recently inserted row.

readLock($table_name)

Gets a read lock (lock other people from reading). Dies on error, and returns undef.

writeLock($table_name)

Gets a write lock (lock other people from writing or reading). Dies on error, and returns undef.

unlockTables()

Release any table locks. Dies on error, and returns undef.

DESTROY()

Destructor.

Private methods

_isSlaveCurrent($dbh)

Takes a database handle and returns true or false if it is caught up with the master

_releaseCursor()

Releases the statement handle that was used for reading.

_connect()

Opens and returns the database handle.

_connectToSlave(\%slave_params)

Private functions

_reportErrorAndDie($function, $error, $caller, $sql)

Sends an email to the admin, and dies.

Modules Used

DBI, Date::Dumper, Time::HiRes, Solstice::Configure, Solstice::Email.

AUTHOR

Top

Catalyst Group, <catalyst@u.washington.edu>

VERSION

Top

$Revision: 2998 $

COPYRIGHT

Top


Solstice documentation Contained in the Solstice distribution.
package Solstice::Database;

use 5.006_000;
use strict;
use warnings;

use base qw(Solstice);

use DBI;
use Data::Dumper;
use Time::HiRes qw(gettimeofday tv_interval);
use Solstice::Configure;
use Solstice::Email;
use Solstice::PositionService;
use List::Util qw(shuffle);

use constant SLOW_QUERY_TIME => 10;
use constant FALSE  => 0;
use constant TRUE   => 1;

our ($VERSION) = ('$Revision: 2998 $' =~ /^\$Revision:\s*([\d.]*)/);

our %dbh_cache;
our %dbh_ping_time;
our %slave_dbh_cache;
our %slave_dbh_ping_time;


sub new {
    my $pkg = shift;

    my $self = $pkg->SUPER::new(@_);
    $self->{'_dbh'} = $self->_connect();

    return $self;
}


######## TODO: make this work for a cluster #########
sub readQuery {
    my $self = shift;
    my $statement = shift;
    my @params = @_;

    $self->_releaseCursor();

    my $config = $self->getConfigService();
    my $diagnostic = $config->getDevelopmentMode();
    
    #if($diagnostic){
        # we don't really know how handle this and do intellegent diagnostics.
        # we can re-enable this if we figure out something good
        #$self->_diagnostics(join(" ", caller), $statement, @params);
    #}

    my $start_time;
    my $time_taken;
    
    my $dbh = $self->hasSlaves() ? $self->getSlave() : $self->{'_dbh'};

    eval {
        $start_time = [gettimeofday] if $diagnostic;
        my $cursor = $dbh->prepare($statement);
        $cursor->execute(@params);
        $time_taken = tv_interval($start_time, [gettimeofday]) if $diagnostic;
        $self->{'_read_cursor'} = $cursor;
    };
    if ($@) {
        my $error = $@;
        _reportErrorAndDie('readQuery()', $error, join(" ", caller), $statement, \@params);
    }

    if ( $diagnostic &&
        ($time_taken > ($config->getSlowQueryTime() || SLOW_QUERY_TIME)) ) {
        print STDERR "SQL took $time_taken seconds, called from ". join(" ", caller)."\n";
    }
    
    Solstice::PositionService->new()->enqueue('db_read_count');

    return;
}

#XXX Old implementation - remove if the new one works out
#sub getSlave {
#    my $self = shift;
#
#    my $config = $self->getConfigService();
#    my $seen_slaves = {};
#    my $dbh;
#    while(!$self->_isSlaveCurrent($dbh)){
#
#        #check if we've seen all the slaves
#        if(scalar keys %$seen_slaves == scalar @{$config->getDBSlaves()}){
#            $dbh = $self->{'_dbh'};
#            last;
#        }
#
#        my $slave = $config->getDBSlave();
#
#        my $cached_dbh = $slave_dbh_cache{$slave->{'host_name'}};
#        return $cached_dbh if (defined $cached_dbh && $self->_isSlaveCurrent($cached_dbh));
#
#        #if we've seen this slave host try again
#        next if $seen_slaves->{$slave->{'host_name'}};
#
#        $dbh = (defined $cached_dbh) ? $cached_dbh : $self->_connectToSlave($slave);
#
#        #cache away the slaves so we only connect to each slave once
#        $slave_dbh_cache{$slave->{'host_name'}} = $dbh;
#        
#        $seen_slaves->{$slave->{'host_name'}} = TRUE;
#    }
#
#    return $dbh;
#}

sub getSlave {
    my $self = shift;

    my $config = $self->getConfigService();
    my @slaves = shuffle(@{$config->getDBSlaves()});

    for my $slave (@slaves){
        my $hostname = $slave->{'host_name'};

        my $dbh = $slave_dbh_cache{$$}{$hostname};

        if( $dbh && $slave_dbh_ping_time{$$}{$hostname} != time){
            $dbh = undef unless $dbh->ping();
            $slave_dbh_ping_time{$$}{$hostname} = time;
        }

        unless($dbh){
            $dbh = $self->_connectToSlave($slave);
            next unless $dbh;
            $slave_dbh_cache{$$}{$hostname} = $dbh;
            $slave_dbh_ping_time{$$}{$hostname} = time;
        }

        if($self->_isSlaveCurrent($dbh)){
            warn "returnign a slave!";
            return $dbh;
        }
    }

    #oops, none of the slaves worked out
    return $self->{'_dbh'};
}

sub hasSlaves {
    my $self = shift;
    return scalar @{$self->getConfigService()->getDBSlaves()};
}

sub fetchRow {
    my $self = shift;

    return undef if !defined $self->{'_read_cursor'};

    my $row = $self->{'_read_cursor'}->fetchrow_hashref();
    if (!$row) {
        $self->{'_read_cursor'}->finish();
        delete $self->{'_read_cursor'};
    }
    return $row;
}


sub rowCount {
    my $self = shift;

    return undef if !defined $self->{'_read_cursor'};
    return $self->{'_read_cursor'}->rows();
}


######## TODO: make this work for a cluster #########
sub writeQuery {
    my $self = shift;
    my $statement = shift;
    my @params = @_;

    $self->_releaseCursor();

    my $config = $self->getConfigService();
    my $diagnostic = $config->getDevelopmentMode();

    my $start_time;
    my $time_taken;

    eval {
        $start_time = [gettimeofday] if $diagnostic;
        my $sth = $self->{'_dbh'}->prepare($statement);
        $sth->execute(@params);
        $self->{'_last_insert_id'} = $sth->{'mysql_insertid'};
        $time_taken = tv_interval($start_time, [gettimeofday]) if $diagnostic;
        $self->{'_read_cursor'} = $sth;
    };
    if ($@) {
        my $error = $@;
        _reportErrorAndDie('writeQuery()', $error, join(" ", caller), $statement, \@params);
    }

    if ( $diagnostic &&
        ($time_taken > ($config->getSlowQueryTime() || SLOW_QUERY_TIME)) ) {
        print STDERR "SQL took $time_taken seconds, called from ". join(" ", caller)."\n";
    }
   
    Solstice::PositionService->new()->enqueue('db_write_count');

    return;
}


sub getLastInsertID {
    my $self = shift;
    return $self->{'_last_insert_id'};
}


sub readLock {
    my $self = shift;
    my $table_name = shift;

    $self->_releaseCursor();

    my $statement = "LOCK TABLES $table_name READ";
    unless ($self->{'_dbh'}->do($statement)) {
        _reportErrorAndDie('readLock()', $self->{'_dbh'}->errstr, join(" ", caller), $statement);
    }
    return;
}


sub writeLock {
    my $self = shift;
    my $table_name = shift;
    
    $self->_releaseCursor();

    my $statement = "LOCK TABLES $table_name WRITE";
    unless ($self->{'_dbh'}->do($statement)) {
        _reportErrorAndDie('writeLock()', $self->{'_dbh'}->errstr, join(" ", caller), $statement);
    }
    return;
}


sub unlockTables {
    my $self = shift;
    
    $self->_releaseCursor();

    my $statement = 'UNLOCK TABLES';
    unless ($self->{'_dbh'}->do($statement)) {
        _reportErrorAndDie('unlockTable()', $self->{'_dbh'}->errstr, join(" ", caller), $statement);
    }
    return;
}


sub DESTROY {
    my $self = shift;
    $self->_releaseCursor();
}


sub _isSlaveCurrent {
    my $self = shift;
    my $dbh = shift;
    return FALSE unless defined $dbh;

    my $cursor = $dbh->prepare('SHOW SLAVE STATUS');
    $cursor->execute();
    $self->{'_read_cursor'} = $cursor;

    my $data = $self->fetchRow();
    $self->_releaseCursor();
    
    return FALSE unless $data;
    
    return ($data->{'Seconds_Behind_Master'} eq '0');
}

sub _releaseCursor {
    my $self = shift;
    if ($self->{'_read_cursor'}) {
        $self->{'_read_cursor'}->finish();
        delete $self->{'_read_cursor'};
    }
}


sub _connect {
    my $self = shift;

    if (defined $dbh_cache{$$}){
        if($dbh_ping_time{$$} == time){
            return $dbh_cache{$$};
        }else{
            if( $dbh_cache{$$}->ping()) {
                $dbh_ping_time{$$} = time;
                return $dbh_cache{$$};
            }
        }
    }

    # get the configuration information
    my $config = $self->getConfigService();
    my $host = $config->getDBHost();
    my $port = $config->getDBPort();
    my $user = $config->getDBUser();
    my $password = $config->getDBPassword();
    my $name = $config->getDBName();
    my $connection_string = "DBI:mysql:$name:$host:$port";

    # attempt to connect
    my $dbh = DBI->connect($connection_string, $user, $password,
                           {RaiseError => TRUE});
    if (!$dbh) {
        _reportErrorAndDie('_connect()', "DBI->connect failed: ".$DBI::errstr, join(" ", caller), 'n/a');
    }

    $dbh_cache{$$} = $dbh;
    $dbh_ping_time{$$} = time;
    return $dbh;
}

sub _connectToSlave {
    my $self = shift;
    my $slave_info = shift;

    #return master if no slaves have been specified
    return $self->_connect() if !defined $slave_info;

    my $host = $slave_info->{'host_name'};
    my $port = $slave_info->{'port'};
    my $user = $slave_info->{'user'};
    my $password = $slave_info->{'password'};
    my $name = $slave_info->{'database_name'};
    my $connection_string = "DBI:mysql:$name:$host:$port";

    # attempt to connect
    my $dbh = DBI->connect($connection_string, $user, $password,
                                   {RaiseError => TRUE});

    return $dbh;
}

sub _reportErrorAndDie {
    my ($function, $error, $caller, $sql, $params) = @_;

    my $param_string = Dumper $params;

    my $config = Solstice::Configure->new();

    my $mail = Solstice::Email->new();
    $mail->to($config->getAdminEmail());
    $mail->from($config->getServerString().' <'.$config->getAdminEmail().'>');
    $mail->subject("Solstice Tools SQL Error");
    $mail->plainTextBody(
        "The following error string was caught in '$function':\n$error\n\n".
        "The caller was:\n$caller\n\n".
        "The SQL statement that was being executed was:\n$sql\n\n".
        "With params:\n$param_string\n\n"
    );
    $mail->send();

    die "SQL Error: $error\n\nCall stack: $caller\n";
}

#Not in use currently
sub _diagnostics {
    my $self = shift;
    my $caller = shift;
    my $statement = shift;
    my @params = @_;

    eval {
        my $cursor = $self->{'_dbh'}->prepare("EXPLAIN $statement");
        $cursor->execute(@params);

        while( my $row = $cursor->fetchrow_hashref() ){
            unless ($row->{'key'}){
                print STDERR "SQL not using index called from $caller\n";
                last;
            }
        }
    };

    warn "Development Mode SQL diagnostics died\n" if $@;
}


1;

__END__