/usr/local/CPAN/Pixie/Pixie/Store/DBI/Default.pm


package Pixie::Store::DBI::Default;

use strict;
use Carp;

our $VERSION="2.06";

use DBIx::AnyDBD;
use Storable qw/nfreeze thaw/;

use base 'Pixie::Store';

sub _raw_connect {
  my $class = shift;
  my($dsn, %named_args) = @_;

  $dsn =~ s/^(dbi:)?/dbi:/;

  my $dbi_args = {AutoCommit => 1, PrintError => 0, RaiseError => 1,};

  my $self = DBIx::AnyDBD->connect($dsn, $named_args{user}, $named_args{pass},
                                   $dbi_args,
                                   'Pixie::Store::DBI');
  return unless $self;
  $self->{'reconnector'} = sub { DBI->connect($dsn,  $named_args{user}, $named_args{pass},
                                              $dbi_args) };
  $self->set_object_table($named_args{object_table}   || 'px_object');
  $self->set_lock_table($named_args{lock_table}       || 'px_lock_info');
  $self->set_rootset_table($named_args{rootset_table} || 'px_rootset');
  return $self;
}

sub connect {
  my $proto = shift;
  my $self = $proto->_raw_connect(@_);
  $self->verify_connection;
  return $self;
}

sub deploy {
  my $proto = shift;
  my $self = $proto->_raw_connect(@_);
  eval { $self->verify_connection };
  $self->_do_deployment->verify_connection;
  return $self;
}

sub _do_deployment {
  my $self = shift;
  $self->create_object_table
       ->create_lock_table
       ->create_table_index
       ->create_rootset_table
}

sub create_object_table {
  my $self = shift;
  $self->{dbh}->do(qq{CREATE TABLE $self->{object_table}
                                                (px_oid varchar(255) NOT NULL,
                                                  px_flat_obj blob NOT NULL,
                                                  PRIMARY KEY (px_oid))});
  return $self;
}

sub create_lock_table {
  my $self = shift;
  $self->{dbh}->do(qq{CREATE TABLE $self->{lock_table}
                                            (px_oid varchar(255) NOT NULL,
                                              px_locker varchar(255) NOT NULL,
                                              PRIMARY KEY (px_oid))});
  return $self;
}

sub create_rootset_table {
  my $self = shift;
  $self->{dbh}->do(qq{CREATE TABLE $self->{rootset_table}
                                            (px_oid varchar(255) NOT NULL,
                                              PRIMARY KEY (px_oid))});
  return $self;
}

sub create_table_index {
  return $_[0];
}

sub set_object_table {
  my $self = shift;
  $self->{object_table} = shift;
  return $self;
}

sub object_table {
  my $self = shift;
  $self->{object_table};
}

sub set_lock_table {
  my $self = shift;
  $self->{lock_table} = shift;
  return $self;
}

sub lock_table {
  my $self = shift;
  $self->{lock_table};
}

sub set_rootset_table {
  my $self = shift;
  $self->{rootset_table} = shift;
  return $self;
}

sub rootset_table {
  my $self = shift;
  $self->{rootset_table};
}

sub verify_connection {
  my $self = shift;
  my $sth = $self->prepare_execute(qq{SELECT px_oid, px_flat_obj
				      				      FROM $self->{object_table} LIMIT 1});
  $sth->finish if $sth;
  return $self;
}

sub _init { $_[0] }

sub reconnect {
  my $self = shift;
  my $reconnector = $self->{'reconnector'}
      or croak("Can't reconnect; reconnector is missing.");

  if ($self->{'dbh'}) {
    $self->{'dbh'}->disconnect;
  }
  $self->{'dbh'} = &$reconnector()
      or croak("Can't reconnect; reconnector returned nothing");
  $self->rebless;
  $self->_init if $self->can('_init');
  return $self;
}

sub get_dbh {
  my $self = shift;

  (ref $self) or ( Carp::confess("Not a class method") );
  return $self->{'dbh'};
}


sub clear {
  my $self = shift;
  $self->prepare_execute(qq{DELETE FROM $_}) for map $self->$_(),
    qw/object_table lock_table rootset_table/;
  return $self;
}


sub store_at {
  my $self = shift;
  my($oid, $obj, $strategy) = @_;

  $self->begin_transaction;
  my $did_lock = $strategy->pre_store($oid, Pixie->get_the_current_pixie);
  $self->prepare_execute(qq{ DELETE FROM @{[ $self->object_table ]}
                                                          WHERE px_oid = ? },
                         $oid);
  $self->prepare_execute(qq{ INSERT INTO @{[ $self->object_table ]}
                                                          (px_oid, px_flat_obj)
                                                          VALUES ( ?, ? )},
                         $oid, nfreeze $obj);
  $strategy->post_store($did_lock, Pixie->get_the_current_pixie);
  $self->commit;
  return($oid, $obj);
}

sub get_object_at {
  my $self = shift;
  my($oid) = @_;

  my $sth = $self->prepare_execute(q{SELECT px_flat_obj FROM } .$self->{object_table} . q{ WHERE px_oid = ? },
                         $oid);
  my $rows = $sth->fetchall_arrayref();
  $sth->finish;

  if (@$rows == 0) {
    return;
  }
  elsif (@$rows == 1) {
    return thaw $rows->[0][0];
  }
  else {
    croak "Too many objects matched OID: $oid";
  }
}

sub _delete {
  my $self = shift;
  my($oid) = shift;

  $self->prepare_execute(q{DELETE FROM } .
                         $self->object_table .
                         q{ WHERE px_oid = ?}, $oid)->rows;
}

sub prepare_execute {
  my($self, $sql, @params) = @_;

  my $sth;
  $sth = $self->prepare_cached($sql);
  for my $param_no ( 0 .. $#params ) {
    my $param_v = $params[$param_no];
    my @param_v = ( ref($param_v) eq 'ARRAY' ) ? @$param_v : $param_v;
    $sth->bind_param( $param_no+1, @param_v);
  }
  eval { $sth->execute };
  Carp::confess $@ if $@;
  return $sth;
}

sub begin_transaction {
  my $self = shift;
  $self->{tran_count} = 0 unless defined $self->{tran_count};
  $self->{tran_count}++;

  $self->get_dbh->{AutoCommit} = 0;
}

sub rollback_db {
  my $self = shift;

  my $dbh = $self->get_dbh;
  if (!$dbh->{AutoCommit}) {
    $dbh->rollback;
  }
  $dbh->{AutoCommit} = 1;
  $self->{tran_count} = undef;
}

sub commit {
  my $self = shift;
  my $dbh = $self->get_dbh;
  if (!$dbh->{AutoCommit}) {
    $dbh->commit;
  }
  $dbh->{AutoCommit} = 1;
  $self->{tran_count} = undef;
}

sub lock {
  my $self = shift;
  $self->begin_transaction;
  return $self;
}

sub unlock {
  my $self = shift;
  $self->commit;
  return $self;
}

sub rollback {
  my $self = shift;
  $self->rollback_db;
  return $self;
}

sub remove_from_rootset {
  my $self = shift;
  my($oid) = @_;

  $self->prepare_execute(qq{DELETE FROM @{[ $self->rootset_table ]}
                                                        WHERE px_oid = ?},
                         $oid);
  return $self;
}

sub _add_to_rootset {
  my $self = shift;
  my($thing) = @_;
  $self->begin_transaction;
  $self->prepare_execute(qq{DELETE FROM @{[ $self->rootset_table ]}
                                                        WHERE px_oid = ?},
                         $thing->PIXIE::oid);
  $self->prepare_execute(qq{INSERT INTO @{[ $self->rootset_table ]} (px_oid)
                                                        VALUES ( ? )},
                         $thing->PIXIE::oid);
  $self->commit;
  return $self;
}


sub rootset {
  my $self = shift;
  my $rows = $self->selectall_arrayref(qq{SELECT px_oid FROM @{[$self->rootset_table]}
					  					  WHERE px_oid NOT LIKE '<NAME:PIXIE::\%'}) ;
  my @ary = map $_->[0], @$rows;
  return wantarray ? @ary : \@ary;
}

sub working_set_for {
  my $self = shift;
  my $p = shift;
  my $rows = $self->selectall_arrayref(
    qq{SELECT px_oid FROM @{[$self->object_table]}
              WHERE px_oid NOT LIKE '<NAME:PIXIE::\%' AND
              NOT(px_oid = '@{[$self->object_graph_for($p)->PIXIE::oid]}')});

  my @ary = map $_->[0], @$rows;
  return wantarray ? @ary : \@ary;
}

sub lock_object_for {
  my $self = shift;
  my($oid, $pixie, $timeout) = @_;
  $timeout = 30 unless defined $timeout;
  my $lock_holder = $self->locker_for($oid);
  return 0 if $lock_holder eq $pixie->_oid;
  my $keep_trying = 1;
  local $SIG{ALRM} = sub { $keep_trying = 0 };
  alarm $timeout;
  while ($keep_trying) {
    eval {$self->prepare_execute(q{INSERT INTO } . $self->lock_table .
				 q{ ( px_oid, px_locker )
				    				    VALUES ( ?, ? )},
				 $oid, $pixie->_oid)};
    last unless $keep_trying && $@;
    select undef, undef, undef, rand(2 * 1000);
  }
  alarm 0;
  $lock_holder = $self->locker_for($oid);
  unless ($lock_holder eq $pixie->_oid) {
    die "Cannot lock $oid for $pixie. Lock is held by ", $lock_holder;
  }
  $self->SUPER::lock_object_for($oid, $pixie);
  return 1;
}

sub unlock_object_for {
  my $self = shift;
  my($oid, $pixie) = @_;
  my $pixie_oid = ref($pixie) ? $pixie->_oid : $pixie;
  eval { $self->prepare_execute(q{DELETE FROM } . $self->lock_table .
                                q{ WHERE px_oid = ? AND px_locker = ? },
                                $oid, $pixie_oid) };
  die "Couldn't unlock $oid for $pixie_oid: $@" if $@;
  if ( my $other_locker = $self->locker_for($oid) ) {
    die "$oid is locked by another process: $other_locker";
  }
  $self->SUPER::unlock_object_for($oid, $pixie);
  return 1;
}

sub locker_for {
  my $self = shift;
  my($oid) = @_;
  $oid = $oid->px_oid if ref $oid;

  my $sth = $self->prepare_execute(q{SELECT px_locker FROM } . $self->lock_table .
                                   q{ WHERE px_oid = ? },
                                   $oid);
  my $rows = $sth->fetchall_arrayref();
  $sth->finish;
  if ( @$rows == 0 ) {
    return '';
  }
  elsif ( @$rows == 1 ) {
    return $rows->[0][0];
  }
  else {
    croak "Too many objects matched OID: $oid";
  }
}

sub release_all_locks {
    my $self = shift;
    # Ensure a connection
    $self->{dbh} = &{$self->{reconnector}}
        unless $self->{dbh};
    $self->SUPER::release_all_locks;
}

sub DESTROY {
    my $self = shift;
    $self->release_all_locks;
    $self->SUPER::DESTROY;
}
1;