| IO-Lambda documentation | Contained in the IO-Lambda distribution. |
IO::Lambda::Mutex - wait for a shared resource
Objects of class IO::Lambda::Mutex are mutexes, that as normal mutexes,
can be taken and released. The mutexes allow lambdas to wait for their
availability with method waiter, that creates and returns a new lambda,
that in turn will finish as soon as the caller can acquire the mutex.
use IO::Lambda qw(:lambda);
use IO::Lambda::Mutex qw(mutex);
my $mutex = IO::Lambda::Mutex-> new;
# wait for mutex that shall be available immediately
my $waiter = $mutex-> waiter;
my $error = $waiter-> wait;
die "error:$error" if $error;
# create and start a lambda that sleeps 2 seconds and then releases the mutex
my $sleeper = lambda {
context 2;
timeout { $mutex-> release }
};
$sleeper-> start;
# Create a new lambda that shall only wait for 0.5 seconds.
# It will surely fail, since $sleeper is well, still sleeping
lambda {
context $mutex-> waiter(0.5);
tail {
my $error = shift;
print $error ? "error:$error\n" : "ok\n";
# $error is expected to be 'timeout'
}
}-> wait;
# Again, wait for the same mutex but using different syntax.
# This time should be ok - $sleeper will sleep for 1.5 seconds and
# then the mutex will be available.
lambda {
context $mutex, 3;
mutex {
my $error = shift;
print $error ? "error:$error\n" : "ok\n";
# expected to be 'ok'
}
}->wait;
The constructor creates a new free mutex.
Returns boolean flag whether the mutex is free or not. Opposite of is_taken.
Returns boolean flag whether the mutex is taken or not. Opposite of is_free.
Attempts to take the mutex. If the mutex is free, the operation is successful and true value is returned. Otherwise, the operation is failed and false value is returned.
Tries to releases the taken mutex. If there are lambdas waiting (see waiter)
in the queue, the first lambda will be terminated, and thus whoever waits for
the lambda can be notified; it will be up to the code associated with the
waiter lambda to call release again. If there are no waiters in the queue,
the mutex is set free.
Creates a new lambda, that is finished when the mutex becomes available.
The lambda is inserted into the internal waiting queue. It takes as
many calls to release as many lambdas are in queue, until the mutex
becomes free. The lambda returns an error flags, which is undef if
the mutex was acquired successfully, or the error string.
If $timeout is defined, and by the time it is expired the mutex
could not be obtained, the lambda is removed from the queue, and
returned error value is 'timeout'.
Note, that after waiter succeeds, a release call is needed
should mutex be reused. See pipeline method for automatic mutex
release.
Creates a new lambda, that wraps over $lambda so that it is executed
after mutex had been obtained. Also, as soon as $lambda is finished,
the mutex is released, thus allowing others to take it.
Internal function, do not use directly, use $lambda-> terminate
instead.
Removes the lambda created previously by waiter() from internal queue. Note that after that operation the lambda will never finish by itself.
Condition wrapper over waiter.
Dmitry Karasik, <dmitry@karasik.eu.org>.
| IO-Lambda documentation | Contained in the IO-Lambda distribution. |
# $Id: Mutex.pm,v 1.13 2010/03/27 19:56:34 dk Exp $ package IO::Lambda::Mutex; use vars qw($DEBUG @ISA); $DEBUG = $IO::Lambda::DEBUG{mutex} || 0; @ISA = qw(Exporter); @EXPORT_OK = qw(mutex); %EXPORT_TAGS = ( all => \@EXPORT_OK); use strict; use warnings; use IO::Lambda qw(:all); sub new { return bless { taken => 0, queue => [], }, shift; } sub is_taken { $_[0]-> {taken} } sub is_free { not $_[0]-> {taken} } # non-blocking take sub take { my $self = shift; warn "$self is taken\n" if $DEBUG and not $self->{taken}; return $self-> {taken} ? 0 : ($self-> {taken} = 1); } # remove the lambda from queue sub remove { my ( $self, $lambda) = @_; my $found; my $q = $self-> {queue}; for ( my $i = 0; $i < @$q; $i ++) { next if $q->[$i] != $lambda; $found = $i; last; } if ( defined $found) { splice( @$q, $found, 1); $self-> release unless @$q; } else { warn "$self failed to remove $lambda from queue\n" if $DEBUG; } } sub waiter { my ( $self, $timeout) = @_; # mutex is free, can take now unless ( $self-> {taken}) { $self-> take; return lambda { undef }; } # mutex is not free, wait for it my $waiter = IO::Lambda-> new; my $bind = $waiter-> bind( sub { my ($w,$rec) = (shift,shift); # lambda was terminated, relinquish waiting and kill timeout $self-> remove($w); $w-> cancel_event($timeout) if defined $timeout; return @_; # propagate error }); push @{$self-> {queue}}, $waiter; $timeout = $waiter-> watch_timer( $timeout, sub { $self-> remove($waiter); $waiter-> resolve($bind); return 'timeout'; }) if defined $timeout; return $waiter; } sub release { my $self = shift; return unless $self-> {taken}; unless (@{$self-> {queue}}) { warn "$self is free\n" if $DEBUG; $self-> {taken} = 0; return; } my $lambda = shift @{$self-> {queue}}; warn "$self gives ownership to $lambda\n" if $DEBUG; $lambda-> terminate(undef); } sub DESTROY { $_-> terminate('dead') for @{shift-> {queue}} } sub mutex(&) { my ( $self, $timeout) = context; $self-> waiter($timeout)-> condition(shift, \&mutex, 'mutex') } sub pipeline { my ( $self, $lambda, $timeout) = @_; lambda { my @p = @_; context $self-> waiter($timeout); tail { context $lambda, @p; autocatch tail { $self-> release; return @_; }}} } 1;