/usr/local/CPAN/DJabberd/DJabberd/Connection/Admin.pm
package DJabberd::Connection::Admin;
use strict;
use warnings;
no warnings 'redefine';
use base 'Danga::Socket';
use fields qw(buffer server handle);
use vars qw($initial_memory @Help);
use Devel::Peek ();
my $has_gladiator = eval "use Devel::Gladiator; 1;";
my $has_cycle = eval "use Devel::Cycle; 1;";
my $has_devel_leak = eval "use Devel::Leak; 1;";
sub on_startup {
$initial_memory ||= get_memory();
}
sub get_memory {
my $mem;
if ($^O eq 'linux') {
my $pid = $$;
($mem) = `cat /proc/$pid/status | grep ^VmRSS` =~ /(\d+)/;
} else {
$mem = 0;
}
return $mem;
}
sub new {
my ($class, $sock, $server) = @_;
my $self = $class->SUPER::new($sock);
$self->{server} = $server;
return $self;
}
sub event_read {
my DJabberd::Connection::Admin $self = shift;
my $bref = $self->read(20_000);
return $self->close unless defined $bref;
$self->{buffer} .= $$bref;
while ($self->{buffer} =~ s/^(.*?)\r?\n//) {
my $line = $1;
$self->process_line($line);
}
}
sub process_line {
my DJabberd::Connection::Admin $self = shift;
my $line = shift;
return unless $line =~ /\S/;
$line =~ s/^\s*(\w+)\s*//;
my $command = $1;
unless ($command) {
$self->write("Cannot parse command '$line'");
return;
}
if (my $cref = DJabberd::Connection::Admin->can("CMD_" . $command)) {
$cref->($self, $line);
} else {
$self->write("Unknown command '$command'");
}
}
push @Help, 'close | exit | quit';
sub CMD_close { $_[0]->close }
sub CMD_quit { $_[0]->close }
sub CMD_exit { $_[0]->close }
push @Help, 'conns | connections';
*CMD_conns = \&CMD_connections;
sub CMD_connections {
my ($self, $filter) = @_;
my $map = Danga::Socket->DescriptorMap;
my @list;
foreach (keys %$map) {
my $obj = $map->{$_};
next if $filter eq "clients" && ref $obj ne "DJabberd::Connection::ClientIn";
next if $filter eq "servers" && ref $obj ne "DJabberd::Connection::ServerIn" &&
ref $obj ne "DJabberd::Connection::ServerOut";
push @list, $_;
}
my $conns = join(' ', @list);
$self->write($conns);
}
push @Help, 'latency_log';
sub CMD_latency_log {
my $self = shift;
foreach my $le (@DJabberd::Stats::stanza_process_latency_log) {
next unless defined $le;
$self->write("$le->[0]\t$le->[1]");
}
$self->end;
}
push @Help, 'latency';
sub CMD_latency {
my $self = shift;
my %hist;
my @buckets = (
'0.0005',
'0.001', '0.002', '0.005',
'0.01', '0.02', '0.05',
'0.1', '0.2',
'1.0', '2.0', '10.0',
);
foreach my $td (@DJabberd::Stats::stanza_process_latency) {
next unless defined $td;
foreach my $bk (@buckets) {
if ($td < $bk) {
$hist{$bk}++;
last;
}
}
}
foreach my $bk (@buckets) {
next unless $hist{$bk};
$self->write("-$bk\t$hist{$bk}");
}
$self->end;
}
push @Help, 'help';
sub CMD_help {
my $self = shift;
$self->write($_) for 'Available commands:', (sort @Help), '.';
}
push @Help, 'list vhosts';
sub CMD_list {
my $self = shift;
my $type = shift;
if ($type =~ /^vhosts?/) {
$self->write($_) foreach keys %{$self->{server}->{vhosts}};
$self->end;
} else {
$self->write("Cannot list '$type'");
}
}
push @Help, 'stats';
sub CMD_stats {
my $self = shift;
my $conns = keys %{ Danga::Socket->DescriptorMap};
my $conns_quick = $DJabberd::Stats::counter{connect} - $DJabberd::Stats::counter{disconnect};
my $users = 0;
DJabberd->foreach_vhost(sub {
my $vhost = shift;
$users += keys %{$vhost->{jid2sock}};
});
$self->write("connections\t$conns\tconnections");
$self->write("users\t$users\tusers");
$self->write("connections_quickcalc\t$conns_quick\tconnections");
my $mem = get_memory();
my $user_mem = $mem - $initial_memory;
$self->write("mem_total\t$mem\tkB");
$self->write("mem_connections\t$user_mem\tkB");
$self->write("mem_per_connection\t". ($user_mem / ($conns || 1) ) . "\tkB/conn");
$self->end;
}
push @Help, 'counters';
sub CMD_counters {
my $self = shift;
foreach my $name (sort keys %DJabberd::Stats::counter) {
$self->write("$name\t$DJabberd::Stats::counter{$name}");
}
$self->end;
}
push @Help, 'users';
sub CMD_users {
my $self = shift;
DJabberd->foreach_vhost(sub {
my $vhost = shift;
$self->write("$vhost->{server_name}");
foreach my $jid (keys %{$vhost->{jid2sock}}) {
$self->write("\t$jid");
}
});
$self->end;
}
push @Help, 'version';
sub CMD_version {
$_[0]->write($DJabberd::VERSION);
}
sub _in_sub_process {
my $code = shift;
my $rand = rand();
my $file = ".tmp.$$.$rand.answer";
my $cpid = fork;
if ($cpid) {
wait;
open (my $fh, $file);
my $data = do { local $/; <$fh> };
my $res = eval { Storable::thaw($data); };
unlink $file;
return $res;
}
my $res = $code->();
if (open (my $fh, ">$file.writing")) {
print $fh Storable::nfreeze($res);
CORE::close($fh);
rename "$file.writing", $file;
exit 0;
}
}
sub arena_ref_counts {
my $all = Devel::Gladiator::walk_arena();
my %ct;
foreach my $it (@$all) {
$ct{ref $it}++;
if (ref $it eq "REF") {
$ct{"REF-" . ref $$it}++;
}
elsif (ref $it eq "DJabberd::IQ") {
$ct{"DJabberd::IQ-" . $it->signature}++;
}
elsif (ref $it eq "Gearman::Task") {
$ct{"Gearman::Task-func:$it->{func}"}++;
}
elsif (ref $it eq "DJabberd::Callback") {
$ct{"DJabberd::Callback-" . $it->{_phase}}++ if $it->{_phase};
}
elsif (ref $it eq "CODE") {
$ct{Devel::Peek::CvGV($it)}++;
}
}
$all = undef;
return \%ct;
}
push @Help, 'gladiator [lite | delta | all] ' . ( $has_gladiator ? '' : '(unavailable)' );
my %last_gladiator;
sub CMD_gladiator {
my ($self, $cmd) = @_;
unless ($has_gladiator) {
$self->end;
return;
}
$cmd ||= "lite";
my $ct = _in_sub_process(sub { arena_ref_counts() });
my $ret;
$ret .= "ARENA COUNTS:\n";
foreach my $k (sort {$ct->{$b} <=> $ct->{$a}} keys %$ct) {
my $delta = $ct->{$k} - ($last_gladiator{$k} || 0);
$last_gladiator{$k} = $ct->{$k};
if ($cmd eq "delta") {
next unless $delta;
} elsif ($cmd eq "lite") {
next if $k =~ /^REF-/;
next if $k =~ /log4perl/i;
} else {
next unless $ct->{$k} > 1 || $cmd eq "all";
}
$ret .= sprintf(" %4d %-4d $k\n", $ct->{$k}, $delta);
}
$self->write($ret);
$self->end;
}
push @Help, 'cycle ' . ( $has_gladiator ? '' : '(unavailable)' );
sub CMD_cycle {
my $self = shift;
unless ($has_gladiator && $has_cycle) {
$self->end;
return;
}
my $array = Devel::Gladiator::walk_arena();
my @list = grep { ref($_) =~ /^DJabberd|Gearman|CODE/ } @$array;
$array = undef;
use Data::Dumper;
# my $flist = \%DJabberd::Connection::ClientIn::FIELDS;
# foreach my $k (sort { $flist->{$a} <=> $flist->{$b} } keys %$flist) {
# printf STDERR " %4d %s\n", $flist->{$k}, $k;
# }
find_cycle(\@list);
$self->end;
}
push @Help, 'fields package_name';
sub CMD_fields {
my ($self, $arg) = @_;
my $flist = eval "\\%${arg}::FIELDS";
foreach my $k (sort { $flist->{$a} <=> $flist->{$b} } keys %$flist) {
printf STDERR " %4d %s\n", $flist->{$k}, $k;
}
$self->end;
}
push @Help, 'note_arena ' . ( $has_devel_leak ? '' : '(unavailable)' );
sub CMD_note_arena {
my ($self, $arg) = @_;
return unless $has_devel_leak;
$self->{handle} = 1;
Devel::Leak::NoteSV($self->{handle});
$self->end;
}
push @Help, 'check_arena ' . ( $has_devel_leak ? '' : '(unavailable)' );
sub CMD_check_arena {
my ($self, $arg) = @_;
return unless $has_devel_leak;
Devel::Leak::CheckSV($self->{handle});
$self->end;
}
push @Help, 'reload';
sub CMD_reload {
my $self = shift;
delete $INC{"DJabberd/Connection/Admin.pm"};
my $rv = eval "use DJabberd::Connection::Admin; 1;";
if ($rv) {
$self->write("OK");
} else {
$self->write("ERROR: $@");
}
}
push @Help, 'send_stanza JID STANZA';
sub CMD_send_stanza {
my ($self, $params) = @_;
my ($vname, $barejid_str, $e_stanza) = split(/\s+/, $params);
my $vhost = DJabberd->lookup_vhost($vname);
unless ($vhost) {
$self->write("ERROR bogus vhost");
return;
}
my $jid = DJabberd::JID->new($barejid_str);
unless ($jid) {
$self->write("ERROR bogus jid");
return;
}
my @dconns = $vhost->find_conns_of_bare($jid);
unless (@dconns) {
$self->write("ERROR not connected");
return;
}
foreach my $c (@dconns) {
$c->write(DJabberd::Util::durl($e_stanza));
}
$self->write("OK");
}
sub end {
$_[0]->write('.');
}
sub write {
my $self = shift;
my $string = shift;
if (defined $string) {
$self->SUPER::write($string . "\r\n");
} else {
# because event_write by default just kicks off more events, calling
# write with undef...
$self->SUPER::write(undef);
}
}
sub close {
my $self = shift;
$DJabberd::Stats::counter{disconnect}++ unless $self->{closed};
$self->SUPER::close(@_);
}
1;