| mogilefs-server documentation | Contained in the mogilefs-server distribution. |
Returns a new IOStatWatcher object.
Sets the list of hosts to connect to for collecting IOStat information. This call can block if you pass it hostnames instead of ip addresses.
Upon successful connection, the on_stats callback will be called each time the statistics are collected. Error states (failed connections, etc.) will trigger retries on 60 second intervals, and disconnects will trigger an immediate reconnect.
Sets the coderef called for the on_stats callback.
Called each time device use statistics are collected. The host
argument is the value passed in to the set_hosts method. The
stats object is a hashref of mogile device numbers (without leading
"dev") to their corresponding utilization percentages.
| mogilefs-server documentation | Contained in the mogilefs-server distribution. |
package MogileFS::IOStatWatcher; use strict; use Sys::Syscall 0.22; # We use it indirectly, and trigger bugs in earlier versions. use Danga::Socket; use IO::Socket::INET;
sub new { my ($class) = @_; my $self = bless { hosts => {}, }, $class; $self->on_stats; # set an empty handler. return $self; }
sub set_hosts { my ($self, @ips) = @_; my $old_hosts = $self->{hosts}; my $new_hosts = {}; foreach my $host (@ips) { $new_hosts->{$host} = (delete $old_hosts->{$host}) || MogileFS::IOStatWatch::Client->new($host, $self); } # TODO: close hosts that were removed (things in %$old_hosts) $self->{hosts} = $new_hosts; }
sub on_stats { my ($self, $cb) = @_; unless (ref $cb eq 'CODE') { $cb = sub {}; } $self->{on_stats} = $cb; }
# Everything beyond here is internal. sub got_stats { my ($self, $host, $stats) = @_; $self->{on_stats}->($host, $stats); } sub restart_monitoring_if_needed { my ($self, $host) = @_; return unless $self->{hosts}->{$host} && $self->{hosts}->{$host}->{closed}; $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self); } sub got_error { my ($self, $host) = @_; Danga::Socket->AddTimer(60, sub { $self->restart_monitoring_if_needed($host); }); } sub got_disconnect { my ($self, $host) = @_; $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self); } # Support class that does the communication with individual hosts. package MogileFS::IOStatWatch::Client; use strict; use warnings; use base 'Danga::Socket'; use fields qw(host watcher buffer active); sub new { my MogileFS::IOStatWatch::Client $self = shift; my $hostspec = shift; my $watcher = shift; my $sock = IO::Socket::INET->new( PeerAddr => $hostspec, PeerPort => MogileFS->config("mogstored_stream_port"), Proto => 'tcp', Blocking => 0, ); return unless $sock; $self = fields::new($self) unless ref $self; $self->SUPER::new($sock); $self->watch_write(1); $self->watch_read(1); $self->{watcher} = $watcher; $self->{buffer} = ''; $self->{host} = $hostspec; return $self; } sub event_write { my MogileFS::IOStatWatch::Client $self = shift; $self->{active} = 1; $self->write("watch\n"); $self->watch_write(0); # I hope I can safely assume that 6 characters will write properly. } sub event_read { my MogileFS::IOStatWatch::Client $self = shift; my $bref = $self->read(10240); return $self->close unless defined $bref; $self->{buffer} .= $$bref; if ($self->{buffer} =~ m/^ERR\s+(.*?)\s* $ /x) { # There was an error on the way to watching this machine, close it and stay quiet. $self->close; } # If we can yank off lines till there is one by itself with a . on it, we've gotten a full set of stats. while ($self->{buffer} =~ s/^(.*?\n)?\.\n//s) { my %stats; foreach my $line (split /\n+/, $1) { next unless $line; my ($devnum, $util) = split /\s+/, $line; $stats{$devnum} = $util; } $self->{watcher}->got_stats($self->{host}, \%stats); } } sub event_err { my MogileFS::IOStatWatch::Client $self = shift; $self->{watcher}->got_error($self->{host}); } sub event_hup { my MogileFS::IOStatWatch::Client $self = shift; $self->{watcher}->got_error($self->{host}); } sub close { my MogileFS::IOStatWatch::Client $self = shift; if ($self->{active}) { $self->{watcher}->got_disconnect($self->{host}); } else { $self->{watcher}->got_error($self->{host}); } $self->SUPER::close(@_); } 1;