KeyedMutex - An interprocess keyed mutex


KeyedMutex documentation Contained in the KeyedMutex distribution.

Index


Code Index:

NAME

Top

KeyedMutex - An interprocess keyed mutex

SYNOPSIS

Top

  # start server
  % keyedmutexd >/dev/null &

  use KeyedMutex;

  my $km = KeyedMutex->new;

  until ($value = $cache->get($key)) {
    if (my $lock = $km->lock($key, 1)) {
      #locked read from DB
      $value = get_from_db($key);
      $cache->set($key, $value);
      last;
    }
  }

DESCRIPTION

Top

KeyedMutex is an interprocess keyed mutex. Its intended use is to prevent sending identical requests to database servers at the same time. By using KeyedMutex, only a single client would send a request to the database, and others can retrieve the result from a shared cache (namely memcached or Cache::Swifty) instead.

THE CONSTRUCTOR

Top

Following parameters are recognized.

sock

Optional. Path to a unix domain socket or a tcp port on which keyedmutexd is running. Defaults to /tmp/keyedmutexd.sock.

auto_reconnect

Optional. Whether or not to automatically reconnect to server on communication failure. Default is on.

METHODS

Top

lock($key, [ use_raii ])

Tries to obtain a mutex lock for given key. When the use_raii flag is not set (or omitted), the method would return 1 if successful, or undef if not. If successful, the client should later on release the lock by calling release. A return value undef means some other client that held the lock has released it. When the use_raii flag is being set, the method would return a KeyedMutex::Lock object when successful. The lock would be automatically released when the lock object is being destroyed.

release

Releases the lock acquired by a procedural-style lock (i.e. use_raii flag not being set).

locked

Returns if the object is currently holding a lock.

auto_reconnect

Sets or retrieves auto_reconnect flag.

SEE ALSO

Top

http://labs.cybozu.co.jp/blog/kazuhoatwork/

AUTHOR

Top

Copyright (c) 2007 Cybozu Labs, Inc. All rights reserved.

written by Kazuho Oku <kazuhooku@gmail.com>

LICENSE

Top

This program is free software; you can redistribute it and/or modify it under th e same terms as Perl itself.

See http://www.perl.com/perl/misc/Artistic.html


KeyedMutex documentation Contained in the KeyedMutex distribution.

package KeyedMutex;

use strict;
use warnings;

use Digest::MD5 qw/md5/;
use IO::Socket::INET;
use IO::Socket::UNIX;
use POSIX qw/:errno_h/;
use Socket qw/IPPROTO_TCP TCP_NODELAY/;

use KeyedMutex::Lock;

package KeyedMutex;

our $VERSION = '0.06';

my $MSG_NOSIGNAL = 0;
eval {
    $MSG_NOSIGNAL = Socket::MSG_NOSIGNAL;
};

use constant DEFAULT_SOCKPATH => '/tmp/keyedmutexd.sock';
use constant KEY_SIZE         => 16;

sub new {
    my ($klass, $opts) = @_;
    $klass = ref($klass) || $klass;
    $opts ||= {};
    my $self = bless {
        sock           => undef,
        locked         => undef,
        auto_reconnect =>
            defined $opts->{auto_reconnect} ? $opts->{auto_reconnect} : 1,
        _peer          => $opts->{sock} || DEFAULT_SOCKPATH,
    }, $klass;
    $self->_connect();
    $self;
}

sub DESTROY {
    my $self = shift;
    $self->{sock}->close if $self->{sock};
}

sub locked {
    my $self = shift;
    $self->{locked};
}

sub auto_reconnect {
    my $self = shift;
    $self->{auto_reconnect} = shift if @_;
    $self->{auto_reconnect};
}

sub lock {
    my ($self, $key, $use_raii) = @_;
    
    # check state
    die "already holding a lock\n" if $self->{locked};
    
    # send key
    my $hashed_key = md5($key);
    $self->_connect(1) unless $self->{sock};
    unless ($self->_send($hashed_key, KEY_SIZE)) {
        $self->_connect(1);
        $self->_send($hashed_key, KEY_SIZE)
            or die 'communication error';
    }
    # wait for response
    my $res;
    while ($self->{sock}->sysread($res, 1) != 1) {
        if ($! != EINTR) {
            $self->{sock}->close;
            $self->{sock} = undef;
            $res = 'R';
            last;
        }
    }
    return unless $res eq 'O';
    $self->{locked} = 1;
    return $use_raii ? KeyedMutex::Lock->_new($self) : 1;
}

sub release {
    my ($self) = @_;
    
    # check state
    die "not holding a lock\n" unless $self->{locked};
    
    unless ($self->_send('R', 1)) {
        $self->{sock}->close;
        $self->{sock} = undef;
    }
    $self->{locked} = undef;
    1;
}

sub _connect {
    my ($self, $is_reconnect) = @_;
    
    if ($is_reconnect) {
        die 'communication error' unless $self->{auto_reconnect};
        if ($self->{sock}) {
            $self->{sock}->close;
            $self->{sock} = undef;
        }
    }
    
    if ($self->{_peer} =~ /^(?:|(.*):)(\d+)$/) {
        my ($host, $port) = ($1 || '127.0.0.1', $2);
        $self->{sock} = IO::Socket::INET->new(
            PeerHost => $host,
            PeerPort => $port,
            Proto    => 'tcp',
        ) or die 'failed to connect to keyedmutexd';
        setsockopt($self->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)
            or die 'failed to set TCP_NODELAY';
    } else {
        $self->{sock} = IO::Socket::UNIX->new(
            Type => SOCK_STREAM,
            Peer => $self->{_peer},
        ) or die 'failed to connect to keyedmutexd';
    }
}

sub _send {
    my ($self, $data, $size) = @_;
    local $SIG{PIPE} = 'IGNORE' unless $MSG_NOSIGNAL;
    my $ret = undef;
    eval {
        no warnings;
        $ret = $self->{sock}->send($data, $MSG_NOSIGNAL) == $size;
    };
    $ret;
}

1;

__END__