/usr/local/CPAN/DJabberd/DJabberd/Connection/Admin.pm


package DJabberd::Connection::Admin;
use strict;
use warnings;
no warnings 'redefine';
use base 'Danga::Socket';

use fields qw(buffer server handle);
use vars qw($initial_memory @Help);

use Devel::Peek ();
my $has_gladiator  = eval "use Devel::Gladiator; 1;";
my $has_cycle      = eval "use Devel::Cycle; 1;";
my $has_devel_leak = eval "use Devel::Leak; 1;";

sub on_startup {
    $initial_memory ||= get_memory();
}

sub get_memory {
    my $mem;
    if ($^O eq 'linux') {
        my $pid = $$;
        ($mem) = `cat /proc/$pid/status | grep ^VmRSS` =~ /(\d+)/;
    } else {
        $mem = 0;
    }
    return $mem;
}

sub new {
    my ($class, $sock, $server) = @_;
    my $self = $class->SUPER::new($sock);
    $self->{server} = $server;
    return $self;
}

sub event_read {
    my DJabberd::Connection::Admin $self = shift;

    my $bref = $self->read(20_000);
    return $self->close unless defined $bref;

    $self->{buffer} .= $$bref;

    while ($self->{buffer} =~ s/^(.*?)\r?\n//) {
        my $line = $1;
        $self->process_line($line);
    }
}

sub process_line {
    my DJabberd::Connection::Admin $self = shift;
    my $line = shift;
    return unless $line =~ /\S/;

    $line =~ s/^\s*(\w+)\s*//;
    my $command = $1;
    unless ($command) {
        $self->write("Cannot parse command '$line'");
        return;
    }

    if (my $cref = DJabberd::Connection::Admin->can("CMD_" . $command)) {
        $cref->($self, $line);
    } else {
        $self->write("Unknown command '$command'");
    }
}

push @Help, 'close | exit | quit';
sub CMD_close { $_[0]->close }
sub CMD_quit { $_[0]->close }
sub CMD_exit { $_[0]->close }

push @Help, 'conns | connections';
*CMD_conns = \&CMD_connections;
sub CMD_connections {
    my ($self, $filter) = @_;

    my $map = Danga::Socket->DescriptorMap;
    my @list;
    foreach (keys %$map) {
        my $obj = $map->{$_};
        next if $filter eq "clients" && ref $obj ne "DJabberd::Connection::ClientIn";
        next if $filter eq "servers" && ref $obj ne "DJabberd::Connection::ServerIn" &&
            ref $obj ne "DJabberd::Connection::ServerOut";
        push @list, $_;
    }
    my $conns = join(' ', @list);
    $self->write($conns);
}

push @Help, 'latency_log';
sub CMD_latency_log {
   my $self = shift;
    foreach my $le (@DJabberd::Stats::stanza_process_latency_log) {
        next unless defined $le;
        $self->write("$le->[0]\t$le->[1]");
    }
   $self->end;
}

push @Help, 'latency';
sub CMD_latency {
    my $self = shift;
    my %hist;
    my @buckets = (
                   '0.0005',
                   '0.001', '0.002', '0.005',
                   '0.01',  '0.02',  '0.05',
                   '0.1',   '0.2',
                   '1.0', '2.0', '10.0',
                   );
    foreach my $td (@DJabberd::Stats::stanza_process_latency) {
        next unless defined $td;
        foreach my $bk (@buckets) {
            if ($td < $bk) {
                $hist{$bk}++;
                last;
            }
        }
    }
    foreach my $bk (@buckets) {
        next unless $hist{$bk};
        $self->write("-$bk\t$hist{$bk}");
    }
    $self->end;
}

push @Help, 'help';
sub CMD_help {
    my $self = shift;
    $self->write($_) for 'Available commands:', (sort @Help), '.';
}

push @Help, 'list vhosts';
sub CMD_list {
    my $self = shift;
    my $type = shift;

    if ($type =~ /^vhosts?/) {
        $self->write($_) foreach keys %{$self->{server}->{vhosts}};
        $self->end;
    } else {
        $self->write("Cannot list '$type'");
    }

}

push @Help, 'stats';
sub CMD_stats {
    my $self = shift;

    my $conns       = keys %{ Danga::Socket->DescriptorMap};
    my $conns_quick = $DJabberd::Stats::counter{connect} - $DJabberd::Stats::counter{disconnect};

    my $users = 0;
    DJabberd->foreach_vhost(sub {
        my $vhost = shift;
        $users += keys %{$vhost->{jid2sock}};
    });

    $self->write("connections\t$conns\tconnections");
    $self->write("users\t$users\tusers");
    $self->write("connections_quickcalc\t$conns_quick\tconnections");

    my $mem      = get_memory();
    my $user_mem = $mem - $initial_memory;

    $self->write("mem_total\t$mem\tkB");
    $self->write("mem_connections\t$user_mem\tkB");
    $self->write("mem_per_connection\t". ($user_mem / ($conns || 1) ) . "\tkB/conn");
    $self->end;
}

push @Help, 'counters';
sub CMD_counters {
    my $self = shift;

    foreach my $name (sort keys %DJabberd::Stats::counter) {
        $self->write("$name\t$DJabberd::Stats::counter{$name}");
    }
    $self->end;
}

push @Help, 'users';
sub CMD_users {
    my $self = shift;

    DJabberd->foreach_vhost(sub {
        my $vhost = shift;
        $self->write("$vhost->{server_name}");
        foreach my $jid (keys %{$vhost->{jid2sock}}) {
            $self->write("\t$jid");
        }
    });
    $self->end;
}

push @Help, 'version';
sub CMD_version {
    $_[0]->write($DJabberd::VERSION);
}

sub _in_sub_process {
    my $code = shift;
    my $rand = rand();
    my $file = ".tmp.$$.$rand.answer";

    my $cpid = fork;
    if ($cpid) {
        wait;
        open (my $fh, $file);
        my $data = do { local $/; <$fh> };
        my $res = eval { Storable::thaw($data); };
        unlink $file;
        return $res;
    }

    my $res = $code->();
    if (open (my $fh, ">$file.writing")) {
        print $fh Storable::nfreeze($res);
        CORE::close($fh);
        rename "$file.writing", $file;
        exit 0;
    }
}

sub arena_ref_counts {
    my $all = Devel::Gladiator::walk_arena();
    my %ct;
    foreach my $it (@$all) {
        $ct{ref $it}++;
        if (ref $it eq "REF") {
            $ct{"REF-" . ref $$it}++;
        }
        elsif (ref $it eq "DJabberd::IQ") {
            $ct{"DJabberd::IQ-" . $it->signature}++;
        }
        elsif (ref $it eq "Gearman::Task") {
            $ct{"Gearman::Task-func:$it->{func}"}++;
        }
        elsif (ref $it eq "DJabberd::Callback") {
            $ct{"DJabberd::Callback-" . $it->{_phase}}++ if $it->{_phase};
        }
        elsif (ref $it eq "CODE") {
            $ct{Devel::Peek::CvGV($it)}++;
        }
    }
    $all = undef;
    return \%ct;
}

push @Help, 'gladiator [lite | delta | all] ' . ( $has_gladiator ? '' : '(unavailable)' );
my %last_gladiator;
sub CMD_gladiator {
    my ($self, $cmd) = @_;

    unless ($has_gladiator) {
        $self->end;
        return;
    }

    $cmd ||= "lite";

    my $ct = _in_sub_process(sub { arena_ref_counts() });
    my $ret;
    $ret .= "ARENA COUNTS:\n";
    foreach my $k (sort {$ct->{$b} <=> $ct->{$a}} keys %$ct) {
        my $delta = $ct->{$k} - ($last_gladiator{$k} || 0);
        $last_gladiator{$k} = $ct->{$k};
        if ($cmd eq "delta") {
            next unless $delta;
        } elsif ($cmd eq "lite") {
            next if $k =~ /^REF-/;
            next if $k =~ /log4perl/i;
        } else {
            next unless $ct->{$k} > 1 || $cmd eq "all";
        }
        $ret .= sprintf(" %4d %-4d $k\n", $ct->{$k}, $delta);
    }

    $self->write($ret);
    $self->end;
}

push @Help, 'cycle ' . ( $has_gladiator ? '' : '(unavailable)' );
sub CMD_cycle {
    my $self = shift;

    unless ($has_gladiator && $has_cycle) {
        $self->end;
        return;
    }

    my $array = Devel::Gladiator::walk_arena();
    my @list = grep { ref($_) =~ /^DJabberd|Gearman|CODE/ } @$array;
    $array = undef;

    use Data::Dumper;
#    my $flist = \%DJabberd::Connection::ClientIn::FIELDS;
#    foreach my $k (sort { $flist->{$a} <=> $flist->{$b} } keys %$flist) {
#        printf STDERR " %4d %s\n", $flist->{$k}, $k;
#    }
    find_cycle(\@list);
    $self->end;

}

push @Help, 'fields package_name';
sub CMD_fields {
    my ($self, $arg) = @_;
    my $flist = eval "\\%${arg}::FIELDS";
    foreach my $k (sort { $flist->{$a} <=> $flist->{$b} } keys %$flist) {
        printf STDERR " %4d %s\n", $flist->{$k}, $k;
    }
    $self->end;
}

push @Help, 'note_arena ' . ( $has_devel_leak ? '' : '(unavailable)' );
sub CMD_note_arena {
    my ($self, $arg) = @_;
    return unless $has_devel_leak;
    $self->{handle} = 1;
    Devel::Leak::NoteSV($self->{handle});
    $self->end;
}

push @Help, 'check_arena ' . ( $has_devel_leak ? '' : '(unavailable)' );
sub CMD_check_arena {
    my ($self, $arg) = @_;
    return unless $has_devel_leak;
    Devel::Leak::CheckSV($self->{handle});
    $self->end;
}

push @Help, 'reload';
sub CMD_reload {
    my $self = shift;
    delete $INC{"DJabberd/Connection/Admin.pm"};
    my $rv = eval "use DJabberd::Connection::Admin; 1;";
    if ($rv) {
        $self->write("OK");
    } else {
        $self->write("ERROR: $@");
    }
}

push @Help, 'send_stanza JID STANZA';
sub CMD_send_stanza {
    my ($self, $params) = @_;
    my ($vname, $barejid_str, $e_stanza) = split(/\s+/, $params);

    my $vhost = DJabberd->lookup_vhost($vname);
    unless ($vhost) {
        $self->write("ERROR bogus vhost");
        return;
    }

    my $jid = DJabberd::JID->new($barejid_str);
    unless ($jid) {
        $self->write("ERROR bogus jid");
        return;
    }

    my @dconns = $vhost->find_conns_of_bare($jid);
    unless (@dconns) {
        $self->write("ERROR not connected");
        return;
    }

    foreach my $c (@dconns) {
        $c->write(DJabberd::Util::durl($e_stanza));
    }
    $self->write("OK");
}

sub end {
    $_[0]->write('.');
}

sub write {
    my $self = shift;
    my $string = shift;

    if (defined $string) {
        $self->SUPER::write($string . "\r\n");
    } else {
        # because event_write by default just kicks off more events, calling
        # write with undef...
        $self->SUPER::write(undef);
    }
}

sub close {
    my $self = shift;
    $DJabberd::Stats::counter{disconnect}++ unless $self->{closed};
    $self->SUPER::close(@_);
}

1;