/usr/local/CPAN/RWDE/RWDE/DB/DbRegistry.pm
## @file
# (Enter your file info here)
#
# $Id: DbRegistry.pm 496 2008-08-20 14:18:09Z damjan $
# Database connection and handling registry
## @class RWDE::DB::DbRegistry
# Database registry which provides the underlying access API for RWDE objects. Once instantiated and initialized the registry
# provides a one stop shop for managing database connections (open/close), connection setting modifications and
# transaction management (begin/prepare/commit/abort)
package RWDE::DB::DbRegistry;
use strict;
use warnings;
use DBI;
use Error qw(:try);
use RWDE::Configuration;
use RWDE::Exceptions;
use RWDE::RObject;
use base qw(RWDE::Singleton);
our (%dbh, %prepared_transactions, $transaction_connection, $transaction_sequence, $DB_CONFIG, $DB_CONNECTIONS, $impose_transaction);
our ($unique_instance);
use vars qw($VERSION);
$VERSION = sprintf "%d", q$Revision: 561 $ =~ /(\d+)/;
## @method object get_instance()
# Retrieve the registry instance
# @return retrieved registry instance
sub get_instance {
my ($self, $params) = @_;
if (ref $unique_instance ne $self) {
$unique_instance = $self->new;
}
return $unique_instance;
}
## @method void initialize()
# Read the configuration file for DB names and locations
# these data are used for opening connections
sub initialize {
my ($self, $params) = @_;
# initialize the params for connecting to the db
$DB_CONFIG = RWDE::Configuration->DB;
# and mapping between the db names and connections
$DB_CONNECTIONS = RWDE::Configuration->DB_CONNECTIONS;
$transaction_sequence = 0;
$impose_transaction = undef;
return ();
}
## @method object get_dbh()
# Retrieve the database handle for a specific database name. In the event that a transaction was
# previously signalled the transaction will begin here
# ¶m db Database name
# @return The database handle for a specific database
sub get_dbh {
my ($self, $params) = @_;
my $connection_name = $self->_get_connection_name($params);
unless (defined $dbh{$connection_name} and $dbh{$connection_name}->ping()) {
$self->_connect_db({ connection => $connection_name });
}
#we have received an earlier signal to start a transaction and it hasn't completed yet
if (transaction_signalled()) {
#if there is no transaction defined yet
if (!defined($transaction_connection)) {
$self->_begin_transaction($params);
}
}
return $dbh{$connection_name};
}
## @method void add_db_settings($db_settings)
# Pass a setting to the database. This needs to be set before the connection is established
# @param db_settings Specific setting to be applied to the database at connect time
sub add_db_settings {
my ($self, $params) = @_;
my $db_setting_param = $$params{db_settings};
if (not defined($db_setting_param) || ref($db_setting_param) ne "ARRAY") {
throw RWDE::DevelException({ info => "DB setting supplied undefined or not an array" });
}
my $connection_name = RWDE::DB::DbRegistry->_get_connection_name($params);
push (@{$$DB_CONFIG{$connection_name}{db_settings}}, @$db_setting_param);
return ();
}
## @method object get_db_notifications($sleeptime)
# Retrieve notifications (pg_notifies) from the database
# ¶m db The database name that you want notifications for
# @param sleeptime Length of time to block while waiting for notifications
# @return Array reference of notifications the db raised
sub get_db_notifications {
my ($self, $params) = @_;
my $dbh = $self->get_dbh($params);
my @required = qw( sleeptime );
RWDE::RObject->check_params({ required => \@required, supplied => $params });
use IO::Select;
my @notifications;
my $dbpid = $dbh->{pg_pid};
my $dbsock = $dbh->{pg_socket};
my $select = new IO::Select($dbsock)
or throw RWDE::DevelException({ info => "Failed to create selector" });
if (my @ready = $select->can_read($$params{sleeptime})) {
while (my $notification = $dbh->func('pg_notifies')) {
push(@notifications, $notification); # "got NOTIFY $n->[0] from $n->[1]";
}
}
return \@notifications;
}
## @method void destroy_dbh()
# Tear down and clear (from the registry) the database associated with the database parameter it was invoked with
# param db The database name you want to destroy the handle for
sub destroy_dbh {
my ($self, $params) = @_;
my $connection_name = $self->_get_connection_name($params);
$dbh{$connection_name}->{InactiveDestroy} = 1;
$self->closeDB($params);
return;
}
## @method void closeDB()
# Close a specific database handle and the delete referenced db from the registry
# -
# Note: Don't do explicit disconnect if we're a child process clearing out the old handle just prior to re-opening it.
# In this case, we set InactiveDestroy to keep DBI from doing the implicit disconnect when the handle goes away.
sub closeDB {
my ($self, $params) = @_;
my $connection_name = $self->_get_connection_name($params);
if ($dbh{$connection_name} and !$dbh{$connection_name}->{InactiveDestroy}) {
$dbh{$connection_name}->disconnect;
}
delete $dbh{$connection_name};
return ();
}
## @method void close_all()
# Close all database connections we are currently maintaining
sub close_all {
my ($self, $params) = @_;
foreach my $connection_name (keys %$DB_CONNECTIONS) {
if ($dbh{$connection_name} and !$dbh{$connection_name}->{InactiveDestroy}) {
$dbh{$connection_name}->disconnect;
}
delete $dbh{$connection_name};
}
return ();
}
## @method void close_all()
# Release the handles w/o explicitly disconnecting: i.e. on fork
sub destroy_all {
my ($self, $params) = @_;
foreach my $db_name (keys %$DB_CONNECTIONS) {
$self->destroy_dbh({ db => $db_name});
}
return ();
}
#================================================================
#everything below is transaction related - global can call these
#================================================================
## @method void signal_transaction()
# Signal the database backend to begin a transaction before the next database operation takes place
sub signal_transaction {
my ($self, $params) = @_;
#if this flag is already set and we ended up here something is wrong
if (!transaction_signalled()) {
$impose_transaction = 1;
}
else {
warn 'Received a signal for impose transaction, but transaction already in progress';
}
return ();
}
## @method object transaction_signalled()
# Determine if a transaction has been signalled in the backend
# @return true if a transaction is signalled, false otherwise
sub transaction_signalled {
my ($self, $params) = @_;
return (defined($impose_transaction));
}
## @method void commit_transaction()
# Commit the running transaction within the database. Will invoke an exception if a transaction
# is not currently defined or if the transaction does not execute properly. After committing a
# transaction the related state variables are cleared.
sub commit_transaction {
my ($self, $params) = @_;
if (!(defined $transaction_connection and $dbh{$transaction_connection} and $dbh{$transaction_connection}->commit())) {
warn "Commit transaction called with no outstanding transaction -> probably aborted before";
}
$transaction_connection = undef;
$impose_transaction = undef;
return ();
}
## @method void abort_transaction()
# Abort the running transaction within the database. If no transaction is running then this operation
# acts as a no-op
sub abort_transaction {
my ($self, $params) = @_;
if (defined($transaction_connection)) {
$dbh{$transaction_connection}->rollback();
$transaction_connection = undef;
}
$impose_transaction = undef;
return ();
}
## @method object prepare_transaction()
# Prepare and save the currently running transaction within the database. An exception will be thrown if
# no transaction is running or there is a problem preparing the transaction. Once the transaction is prepared a
# string handle/reference is returned back to the caller to allow for processing of the transaction at a later time
# @return string handle/reference to the prepared transaction
sub prepare_transaction {
my ($self, $params) = @_;
$self->check_transaction();
my $transaction_name = $self->_get_transaction_name($params);
#unfortunately DBI does not have a call to support 2pc yet
my $sth = $dbh{$transaction_connection}->prepare("PREPARE TRANSACTION ?");
if (!($sth && $sth->execute($transaction_name))) {
throw RWDE::DevelException({ info => $dbh{$transaction_connection}->errstr() });
}
#store the transaction connection name associated with this prepared transaction
$prepared_transactions{$transaction_name} = $transaction_connection;
#clear out transaction connection
$transaction_connection = undef;
$impose_transaction = undef;
return $transaction_name;
}
## @method void commit_prepared_transaction($transaction_name)
# Commit a previously prepared transaction within the database. An exception will be thrown if
# the transaction doesn't exist or if there is a problem committing the transaction.
# @param transaction_name Handle/reference to a previously prepared transaction
sub commit_prepared_transaction {
my ($self, $params) = @_;
#if the transaction is not defined then something is wrong
if (!defined($$params{transaction_name})) {
my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1);
throw RWDE::DevelException({ info => "Attempt to commit an unspecified prepared transaction without a name: $package on $line" });
}
my $transaction_name = $$params{transaction_name};
my $connection = $prepared_transactions{$transaction_name};
if (not defined $connection){
$connection = $self->_get_connection_name($params);
}
if (defined($connection) && defined($transaction_name)) {
#unfortunately DBI does not have a call to support 2pc yet
if (!defined($dbh{$connection}->do("COMMIT PREPARED " . $dbh{$connection}->quote($transaction_name)))) {
throw RWDE::DevelException({ info => $dbh{$connection}->errstr() });
}
}
#clear out transaction
$prepared_transactions{$transaction_name} = undef;
return ();
}
## @method void abort_prepared_transaction($transaction_name)
# Abort a previously prepared transaction within the database. An exception will be thrown if
# the transaction doesn't exist or if there is a problem committing the transaction.
# @param transaction_name Handle/reference to a previously prepared transaction
sub abort_prepared_transaction {
my ($self, $params) = @_;
#if the transaction is not defined then something is wrong
if (!defined($$params{transaction_name})) {
my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1);
throw RWDE::DevelException({ info => "Attempt to commit an unspecified prepared transaction without a name: $package on $line" });
}
my $transaction_name = $$params{transaction_name};
my $connection = $prepared_transactions{$transaction_name};
if (not defined $connection){
$connection = $self->_get_connection_name($params);
}
if (defined($connection) && defined($transaction_name)) {
#unfortunately DBI does not have a call to support 2pc yet
if (!defined($dbh{$connection}->do("ROLLBACK PREPARED " . $dbh{$connection}->quote($transaction_name)))) {
throw RWDE::DevelException({ info => $dbh{$connection}->errstr() });
}
}
else {
warn 'Either connection or transaction_name not defined';
}
#clear out transaction
$prepared_transactions{$transaction_name} = undef;
return ();
}
## @method void check_transaction()
# Test to see if we have a transaction and that everything is consistent. Throw a devel exception if no
# transaction is running.
# -
# This is used to ensure that we are in ANY transaction - regardless of connection. Maybe this is bad
sub check_transaction {
my ($self, $params) = @_;
if (!defined($transaction_connection) || !defined($dbh{$transaction_connection}) || !transaction_signalled()) {
my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1);
throw RWDE::DevelException({ info => "Attempt to get transaction name for nonexistent transaction: $package on $line" });
}
return ();
}
## @method object has_transaction()
# Determine if the argument db connection has a transaction
# ¶m db name of the database supposedly in a transaction
# @return true if the database is in a transaction, false otherwise
sub has_transaction {
my ($self, $params) = @_;
#check to see if a transaction is signalled and if the argument connection is associated with such a signalled transaction
return (transaction_signalled() && defined($transaction_connection) && ($transaction_connection eq $self->_get_connection_name($params)));
}
## @method void db_check_transaction()
# Determine if the current transaction is associated with the specified db param - otherwise devel exception
sub db_check_transaction {
my ($self, $params) = @_;
unless (has_transaction($params)) {
my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1);
throw RWDE::DevelException({ info => "$package on $line requires exclusive access to the affecting rows. Add transaction around the function call." });
}
return ();
}
## @method void cleanup()
# Cleanup running transactions on all connections by aborting them
sub cleanup {
my ($self, $params) = @_;
foreach my $db (keys %dbh) {
if (defined($transaction_connection)) {
$self->abort_transaction();
}
$dbh{$db}->do("RESET ALL"); # unset any special variables
}
return ();
}
#================================================================
#everything below is private - do not call externally
#================================================================
## @method protected object _get_connection_name($db)
# Private call to determine the connection name for a database name argument
# @param db Database name you are seeking the connection name for
# @return the connection name associated with the database name argument
sub _get_connection_name {
my ($self, $params) = @_;
my $label = $$params{db};
if (not defined $label) {
my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1);
throw RWDE::DevelException({ info => "DB not specified: $package on $line" });
}
my $connection_name = $$DB_CONNECTIONS{$label};
if (!defined($connection_name)) {
my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(2);
throw RWDE::DevelException({ info => "No connection specified for given db:$label: from $package on $line" });
}
return $connection_name;
}
sub get_host {
my ($self, $params) = @_;
my $connection_name = $self->_get_connection_name({ db => $$params{db}});
my $DB = $$DB_CONFIG{$connection_name};
return $$DB{db_host};
}
## @method protected void _connect_db($connection)
# Create a database connection for the connection_name argument given. If there are any problems an exception will be thrown.
# Upon success the database connection handle is added to the registry and may be retrieved using "get_dbh".
# @param connection The connection_name you are trying to create a connection for
sub _connect_db {
my ($self, $params) = @_;
my $connection_name = $$params{connection};
my $DB = $$DB_CONFIG{$connection_name};
my $port = $$DB{db_port} ? $$DB{db_port} : 5432;
my $datasource = 'dbi:' . $$DB{db_type} . ':dbname=' . $$DB{db_name} . ';host=' . $$DB{db_host} . ';port=' . $port;
if (RWDE::Configuration->Debug) {
#disable preparing of queries so we can see them
$dbh{$connection_name} = DBI->connect($datasource, $$DB{db_user}, $$DB{db_pass}, { PrintError => 0, pg_server_prepare => 0 })
or throw RWDE::DevelException({ info => "dbi connect failure: " . $DBI::errstr });
}
else {
#tmp fix, put prepare to 0, looks like pg_bouncer is not liking it a lot
$dbh{$connection_name} = DBI->connect($datasource, $$DB{db_user}, $$DB{db_pass}, { PrintError => 0, pg_server_prepare => 0 })
or throw RWDE::DevelException({ info => "dbi connect failure: " . $DBI::errstr });
}
# and listen for events to wake us up
foreach my $db_setting (@{$$DB{db_settings}}) {
$dbh{$connection_name}->do($db_setting)
or throw RWDE::DevelException({ info => "failed to set: $db_setting for $connection_name" });
}
return ();
}
## @method protected object _get_transaction_name()
# This method provides a uniq naming scheme for naming prepared transactions.
# @return Unique name that is suitable to be used for preparing a transaction
sub _get_transaction_name {
my ($self, $params) = @_;
$self->check_transaction();
my $dbh = $dbh{$transaction_connection};
my $transaction_name = $$ . "|" . $dbh->{pg_pid} . "|" . $transaction_sequence;
$transaction_sequence++;
return $transaction_name;
}
## @method protected void _begin_transaction()
# Private method to begin a transaction. The only appropriate place to use it within this registry construct
# is immediately after a transaction signal has been raised - but before any other database queries are executed.
sub _begin_transaction {
my ($self, $params) = @_;
#if there are ANY other transactions then something is wrong
if (defined($transaction_connection)) {
throw RWDE::DevelException({ info => "Attempt to start multiple transactions" });
}
my $connection_name = $self->_get_connection_name($params);
$transaction_connection = $connection_name;
if (!($dbh{$connection_name})) {
throw RWDE::DevelException({ info => 'Error while attempting to _begin_transaction -- no connection_name' });
}
if (!($dbh{$connection_name}->begin_work())) {
throw RWDE::DevelException({ info => 'Error while attempting to _begin_transaction failed: ' . $dbh{$connection_name}->errstr() });
}
return ();
}
1;