| Net-DBus documentation | Contained in the Net-DBus distribution. |
Net::DBus::Reactor - application event loop
Create and run an event loop:
use Net::DBus::Reactor; my $reactor = Net::DBus::Reactor->main(); $reactor->run();
Manage some file handlers
$reactor->add_read($fd,
Net::DBus::Callback->new(method => sub {
my $fd = shift;
...read some data...
}, args => [$fd]);
$reactor->add_write($fd,
Net::DBus::Callback->new(method => sub {
my $fd = shift;
...write some data...
}, args => [$fd]);
Temporarily (dis|en)able a handle
# Disable $reactor->toggle_read($fd, 0); # Enable $reactor->toggle_read($fd, 1);
Permanently remove a handle
$reactor->remove_read($fd);
Manage a regular timeout every 100 milliseconds
my $timer = $reactor->add_timeout(100,
Net::DBus::Callback->new(
method => sub {
...process the alarm...
}));
Temporarily (dis|en)able a timer
# Disable $reactor->toggle_timeout($timer, 0); # Enable $reactor->toggle_timeout($timer, 1);
Permanently remove a timer
$reactor->remove_timeout($timer);
Add a post-dispatch hook
my $hook = $reactor->add_hook(Net::DBus::Callback->new(
method => sub {
... do some work...
}));
Remove a hook
$reactor->remove_hook($hook);
This class provides a general purpose event loop for the purposes of multiplexing I/O events and timeouts in a single process. The underlying implementation is done using the select system call. File handles can be registered for monitoring on read, write and exception (out-of-band data) events. Timers can be registered to expire with a periodic frequency. These are implemented using the timeout parameter of the select system call. Since this parameter merely represents an upper bound on the amount of time the select system call is allowed to sleep, the actual period of the timers may vary. Under normal load this variance is typically 10 milliseconds. Finally, hooks may be registered which will be invoked on each iteration of the event loop (ie after processing the file events, or timeouts indicated by the select system call returning).
Creates a new event loop ready for monitoring file handles, or generating timeouts. Except in very unsual circumstances (examples of which I can't think up) it is not neccessary or desriable to explicitly create new reactor instances. Instead call the main method to get a handle to the singleton instance.
Return a handle to the singleton instance of the reactor. This is the recommended way of getting hold of a reactor, since it removes the need for modules to pass around handles to their privately created reactors.
Registers a Net::DBus::Connection or Net::DBus::Server object
for management by the event loop. This basically involves
hooking up the watch & timeout callbacks to the event loop.
For connections it will also register a hook to invoke the
dispatch method periodically.
Starts the event loop monitoring any registered
file handles and timeouts. At least one file
handle, or timer must have been registered prior
to running the reactor, otherwise it will immediately
exit. The reactor will run until all registered
file handles, or timeouts have been removed, or
disabled. The reactor can be explicitly stopped by
calling the shutdown method.
Explicitly shutdown the reactor after pending events have been processed.
Perform one iteration of the event loop, going to sleep until an event occurs on a registered file handle, or a timeout occurrs. This method is generally not required in day-to-day use.
Registers a file handle for monitoring of read
events. The $callback parameter specifies an
instance of the Net::DBus::Callback object to invoke
each time an event occurs. The optional $status
parameter is a boolean value to specify whether the
watch is initially enabled.
Registers a file handle for monitoring of write
events. The $callback parameter specifies an
instance of the Net::DBus::Callback object to invoke
each time an event occurs. The optional $status
parameter is a boolean value to specify whether the
watch is initially enabled.
Registers a file handle for monitoring of exception
events. The $callback parameter specifies an
instance of the Net::DBus::Callback object to invoke
each time an event occurs. The optional $status
parameter is a boolean value to specify whether the
watch is initially enabled.
Registers a new timeout to expire every $interval
milliseconds. The $callback parameter specifies an
instance of the Net::DBus::Callback object to invoke
each time the timeout expires. The optional $status
parameter is a boolean value to specify whether the
timeout is initially enabled. The return parameter is
a unique identifier which can be used to later remove
or disable the timeout.
Removes a previously registered timeout specified by
the $id parameter.
Updates the state of a previously registered timeout
specifed by the $id parameter. The $status
parameter specifies whether the timeout is to be enabled
or disabled, while the optional $interval parameter
can be used to change the period of the timeout.
Registers a new hook to be fired on each iteration
of the event loop. The $callback parameter
specifies an instance of the Net::DBus::Callback
class to invoke. The $status parameter determines
whether the hook is initially enabled, or disabled.
The return parameter is a unique id which should
be used to later remove, or disable the hook.
Removes the previously registered hook identified
by $id.
Updates the status of the previously registered
hook identified by $id. The $status parameter
determines whether the hook is to be enabled or
disabled.
Removes a watch on the file handle $fd.
Updates the status of a watch on the file handle $fd.
The $status parameter species whether the watch is
to be enabled or disabled.
Net::DBus::Callback, Net::DBus::Connection, Net::DBus::Server
Daniel Berrange <dan@berrange.com>
Copyright 2004 by Daniel Berrange
| Net-DBus documentation | Contained in the Net-DBus distribution. |
# -*- perl -*- # # Copyright (C) 2004-2006 Daniel P. Berrange # # This program is free software; You can redistribute it and/or modify # it under the same terms as Perl itself. Either: # # a) the GNU General Public License as published by the Free # Software Foundation; either version 2, or (at your option) any # later version, # # or # # b) the "Artistic License" # # The file "COPYING" distributed along with this file provides full # details of the terms and conditions of the two licenses.
package Net::DBus::Reactor; use 5.006; use strict; use warnings; use Net::DBus::Binding::Watch; use Net::DBus::Callback; use Time::HiRes qw(gettimeofday);
sub new { my $proto = shift; my $class = ref($proto) || $proto; my %params = @_; my $self = {}; $self->{fds} = { read => {}, write => {}, exception => {} }; $self->{timeouts} = []; $self->{hooks} = []; bless $self, $class; return $self; } use vars qw($main_reactor);
sub main { my $class = shift; $main_reactor = $class->new() unless defined $main_reactor; return $main_reactor; }
sub manage { my $self = shift; my $object = shift; if ($object->can("set_watch_callbacks")) { $object->set_watch_callbacks(sub { my $object = shift; my $watch = shift; $self->_manage_watch_on($object, $watch); }, sub { my $object = shift; my $watch = shift; $self->_manage_watch_off($object, $watch); }, sub { my $object = shift; my $watch = shift; $self->_manage_watch_toggle($object, $watch); }); } if ($object->can("set_timeout_callbacks")) { $object->set_timeout_callbacks(sub { my $object = shift; my $timeout = shift; my $key = $self->add_timeout($timeout->get_interval, Net::DBus::Callback->new(object => $timeout, method => "handle", args => []), $timeout->is_enabled); $timeout->set_data($key); }, sub { my $object = shift; my $timeout = shift; my $key = $timeout->get_data; $self->remove_timeout($key); }, sub { my $object = shift; my $timeout = shift; my $key = $timeout->get_data; $self->remove_timeout($key, $timeout->is_enabled, $timeout->get_interval); }); } if ($object->can("dispatch")) { $self->add_hook(Net::DBus::Callback->new(object => $object, method => "dispatch", args => []), 1); } if ($object->can("flush")) { $self->add_hook(Net::DBus::Callback->new(object => $object, method => "flush", args => []), 1); } } sub _manage_watch_on { my $self = shift; my $object = shift; my $watch = shift; my $flags = $watch->get_flags; if ($flags & &Net::DBus::Binding::Watch::READABLE) { $self->add_read($watch->get_fileno, Net::DBus::Callback->new(object => $watch, method => "handle", args => [&Net::DBus::Binding::Watch::READABLE]), $watch->is_enabled); } if ($flags & &Net::DBus::Binding::Watch::WRITABLE) { $self->add_write($watch->get_fileno, Net::DBus::Callback->new(object => $watch, method => "handle", args => [&Net::DBus::Binding::Watch::WRITABLE]), $watch->is_enabled); } # $self->add_exception($watch->get_fileno, $watch, # Net::DBus::Callback->new(object => $watch, # method => "handle", # args => [&Net::DBus::Binding::Watch::ERROR]), # $watch->is_enabled); } sub _manage_watch_off { my $self = shift; my $object = shift; my $watch = shift; my $flags = $watch->get_flags; if ($flags & &Net::DBus::Binding::Watch::READABLE) { $self->remove_read($watch->get_fileno); } if ($flags & &Net::DBus::Binding::Watch::WRITABLE) { $self->remove_write($watch->get_fileno); } # $self->remove_exception($watch->get_fileno); } sub _manage_watch_toggle { my $self = shift; my $object = shift; my $watch = shift; my $flags = $watch->get_flags; if ($flags & &Net::DBus::Binding::Watch::READABLE) { $self->toggle_read($watch->get_fileno, $watch->is_enabled); } if ($flags & &Net::DBus::Binding::Watch::WRITABLE) { $self->toggle_write($watch->get_fileno, $watch->is_enabled); } $self->toggle_exception($watch->get_fileno, $watch->is_enabled); }
sub run { my $self = shift; $self->{running} = 1; while ($self->{running}) { $self->step }; }
sub shutdown { my $self = shift; $self->{running} = 0; }
sub step { my $self = shift; my @callbacks = $self->_dispatch_hook(); foreach my $callback (@callbacks) { $callback->invoke; } my ($ri, $ric) = $self->_bits("read"); my ($wi, $wic) = $self->_bits("write"); my ($ei, $eic) = $self->_bits("exception"); my $timeout = $self->_timeout($self->_now); if (!$ric && !$wic && !$eic && !(defined $timeout)) { $self->{running} = 0; return; } my ($ro, $wo, $eo); my $n = select($ro=$ri,$wo=$wi,$eo=$ei, (defined $timeout ? ($timeout ? $timeout/1000 : 0) : undef)); @callbacks = (); if ($n) { push @callbacks, $self->_dispatch_fd("read", $ro); push @callbacks, $self->_dispatch_fd("write", $wo); push @callbacks, $self->_dispatch_fd("error", $eo); } push @callbacks, $self->_dispatch_timeout($self->_now); #push @callbacks, $self->_dispatch_hook(); foreach my $callback (@callbacks) { $callback->invoke; } return 1; } sub _now { my $self = shift; my @now = gettimeofday; return $now[0] * 1000 + (($now[1] - ($now[1] % 1000)) / 1000); } sub _bits { my $self = shift; my $type = shift; my $vec = ''; my $count = 0; foreach (keys %{$self->{fds}->{$type}}) { next unless $self->{fds}->{$type}->{$_}->{enabled}; $count++; vec($vec, $_, 1) = 1; } return ($vec, $count); } sub _timeout { my $self = shift; my $now = shift; my $timeout; foreach (@{$self->{timeouts}}) { next unless $_->{enabled}; my $expired = $now - $_->{last_fired}; my $interval = ($expired > $_->{interval} ? 0 : $_->{interval} - $expired); $timeout = $interval if !(defined $timeout) || ($interval < $timeout); } return $timeout; } sub _dispatch_fd { my $self = shift; my $type = shift; my $vec = shift; my @callbacks; foreach my $fd (keys %{$self->{fds}->{$type}}) { next unless $self->{fds}->{$type}->{$fd}->{enabled}; if (vec($vec, $fd, 1)) { my $rec = $self->{fds}->{$type}->{$fd}; push @callbacks, $self->{fds}->{$type}->{$fd}->{callback}; } } return @callbacks; } sub _dispatch_timeout { my $self = shift; my $now = shift; my @callbacks; foreach my $timeout (@{$self->{timeouts}}) { next unless $timeout->{enabled}; my $expired = $now - $timeout->{last_fired}; # Select typically returns a little (0-10 ms) before we # asked it for. (8 milliseconds seems reasonable balance # between early timeouts & extra select calls if ($expired >= ($timeout->{interval}-8)) { $timeout->{last_fired} = $now; push @callbacks, $timeout->{callback}; } } return @callbacks; } sub _dispatch_hook { my $self = shift; my $now = shift; my @callbacks; foreach my $hook (@{$self->{hooks}}) { next unless $hook->{enabled}; push @callbacks, $hook->{callback}; } return @callbacks; }
sub add_read { my $self = shift; $self->_add("read", @_); }
sub add_write { my $self = shift; $self->_add("write", @_); }
sub add_exception { my $self = shift; $self->_add("exception", @_); }
sub add_timeout { my $self = shift; my $interval = shift; my $callback = shift; my $enabled = shift; $enabled = 1 unless defined $enabled; my $key; for (my $i = 0 ; $i <= $#{$self->{timeouts}} && !(defined $key); $i++) { $key = $i unless defined $self->{timeouts}->[$i]; } $key = $#{$self->{timeouts}}+1 unless defined $key; $self->{timeouts}->[$key] = { interval => $interval, last_fired => $self->_now, callback => $callback, enabled => $enabled }; return $key; }
sub remove_timeout { my $self = shift; my $key = shift; die "no timeout active with key '$key'" unless defined $self->{timeouts}->[$key]; $self->{timeouts}->[$key] = undef; }
sub toggle_timeout { my $self = shift; my $key = shift; my $enabled = shift; $self->{timeouts}->[$key]->{enabled} = $enabled; $self->{timeouts}->[$key]->{interval} = shift if @_; }
sub add_hook { my $self = shift; my $callback = shift; my $enabled = shift; $enabled = 1 unless defined $enabled; my $key; for (my $i = 0 ; $i <= $#{$self->{hooks}} && !(defined $key); $i++) { $key = $i unless defined $self->{hooks}->[$i]; } $key = $#{$self->{hooks}}+1 unless defined $key; $self->{hooks}->[$key] = { callback => $callback, enabled => $enabled }; return $key; }
sub remove_hook { my $self = shift; my $key = shift; die "no hook present with key '$key'" unless defined $self->{hooks}->[$key]; $self->{hooks}->[$key] = undef; }
sub toggle_hook { my $self = shift; my $key = shift; my $enabled = shift; $self->{hooks}->[$key]->{enabled} = $enabled; } sub _add { my $self = shift; my $type = shift; my $fd = shift; my $callback = shift; my $enabled = shift; $enabled = 1 unless defined $enabled; $self->{fds}->{$type}->{$fd} = { callback => $callback, enabled => $enabled }; }
sub remove_read { my $self = shift; $self->_remove("read", @_); } sub remove_write { my $self = shift; $self->_remove("write", @_); } sub remove_exception { my $self = shift; $self->_remove("exception", @_); } sub _remove { my $self = shift; my $type = shift; my $fd = shift; die "no handle ($type) active with fd '$fd'" unless exists $self->{fds}->{$type}->{$fd}; delete $self->{fds}->{$type}->{$fd}; }
sub toggle_read { my $self = shift; $self->_toggle("read", @_); } sub toggle_write { my $self = shift; $self->_toggle("write", @_); } sub toggle_exception { my $self = shift; $self->_toggle("exception", @_); } sub _toggle { my $self = shift; my $type = shift; my $fd = shift; my $enabled = shift; $self->{fds}->{$type}->{$fd}->{enabled} = $enabled; } 1;