/usr/local/CPAN/AxKit2/AxKit2/Console.pm


# Copyright 2001-2006 The Apache Software Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

package AxKit2::Console;

use strict;
use warnings;

use IO::Socket;
use AxKit2::Constants;
use Socket qw(IPPROTO_TCP TCP_NODELAY);

use base 'Danga::Socket';

use fields qw(
    alive_time
    create_time
    line
    );
    
use constant CLEANUP_TIME => 5; # seconds

our $PROMPT = "\nEnter command (or \"HELP\" for help)\n> ";

Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);

sub create {
    my $class    = shift;
    my $config = shift;
    
    my $PORT = $config->console_port;
    
    return unless $PORT;
    
    my $sock = IO::Socket::INET->new(
            LocalAddr => $config->console_addr || '127.0.0.1',
            LocalPort => $PORT,
            Proto     => 'tcp',
            Type      => SOCK_STREAM,
            Blocking  => 0,
            Reuse     => 1,
            Listen    => SOMAXCONN )
               or die "Error creating server on port $PORT : $@\n";

    IO::Handle::blocking($sock, 0);
    
    my $accept_handler = sub {
        my $csock = $sock->accept;
        return unless $csock;

        if ($::DEBUG) {
            AxKit2::Client->log(LOGDEBUG, "Listen child making a AxKit2::Connection for ", fileno($csock));
        }

        IO::Handle::blocking($csock, 0);
        setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

        if (my $client = eval { AxKit2::Console->new($csock, $config) }) {
            $client->watch_read(1);
            return;
        } else {
            die("Error creating new Console: $@") if $@;
        }
    };

    Danga::Socket->AddOtherFds(fileno($sock) => $accept_handler);
}

sub max_idle_time       { 30 }
sub max_connect_time    { 180 }
sub event_err { my AxKit2::Connection $self = shift; $self->close("Error") }
sub event_hup { my AxKit2::Connection $self = shift; $self->close("Disconnect (HUP)") }

sub new {
    my $self = shift;
    my $sock = shift;
    my $conf = shift;
    $self = fields::new($self) unless ref($self);

    $self->SUPER::new($sock);

    my $now = time;
    $self->{alive_time} = $self->{create_time} = $now;
    $self->{line} = '';
    
    $self->write($PROMPT);
    
    return $self;
}

sub event_read {
    my AxKit2::Console $self = shift;
    $self->{alive_time} = time;

    my $bref = $self->read(8192);
    return $self->close($!) unless defined $bref;
    $self->process_read_buf($bref);
}

sub process_read_buf {
    my AxKit2::Console $self = shift;
    my $bref = shift;
    $self->{line} .= $$bref;
    
    while ($self->{line} =~ s/^(.*?\n)//) {
        my $line = $1;
        $self->process_line($line);
    }
}

sub process_line {
    my AxKit2::Console $self = shift;
    my $line = shift;
    
    $line =~ s/\r?\n//;
    my ($cmd, @params) = split(/ +/, $line);
    my $meth = "cmd_" . lc($cmd);
    if (my $lookup = $self->can($meth)) {
        $lookup->($self, @params);
        $self->write($PROMPT);
    }
    else {
        # No such method - i.e. unrecognized command
        return $self->write("command '$cmd' unrecognised\n$PROMPT");
    }
}

my %helptext;

$helptext{help} = "HELP [CMD] - Get help on all commands or a specific command";

sub cmd_help {
    my $self = shift;
    my ($subcmd) = @_;
    
    $subcmd ||= 'help';
    $subcmd = lc($subcmd);
    
    if ($subcmd eq 'help') {
        my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext));
        $self->write("Available Commands:\n\n$txt\n");
    }
    my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list.";
    $self->write("$txt\n");
}

$helptext{quit} = "QUIT - Exit the console";
sub cmd_quit {
    my $self = shift;
    $self->close;
}

$helptext{list} = "LIST [LIMIT] - List current connections, specify limit or negative limit to shrink list";
sub cmd_list {
    my $self = shift;
    my ($count) = @_;
    
    my $descriptors = Danga::Socket->DescriptorMap;
    
    my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n";
    my @all;
    foreach my $fd (keys %$descriptors) {
        my $pob = $descriptors->{$fd};
        if ($pob->isa("AxKit2::Connection")) {
            next unless $pob->peer_addr_string; # haven't even started yet
            push @all, [$pob+0, $pob->peer_addr_string, $pob->uptime];
        }
    }
    
    @all = sort { $a->[2] <=> $b->[2] } @all;
    if ($count) {
        if ($count > 0) {
            @all = @all[$#all-($count-1) .. $#all];
        }
        else {
            @all = @all[0..(abs($count) - 1)];
        }
    }
    foreach my $item (@all) {
        $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item);
    }
    
    $self->write( $list );
}

$helptext{kill} = "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF";
sub cmd_kill {
    my $self = shift;
    my ($match) = @_;
    
    return $self->write("SYNTAX: KILL (\$IP | \$REF)\n") unless $match;
    
    my $descriptors = Danga::Socket->DescriptorMap;
    
    my $killed = 0;
    my $is_ip = (index($match, '.') >= 0);
    foreach my $fd (keys %$descriptors) {
        my $pob = $descriptors->{$fd};
        if ($pob->isa("Qpsmtpd::PollServer")) {
            if ($is_ip) {
                next unless $pob->connection->remote_ip; # haven't even started yet
                if ($pob->connection->remote_ip eq $match) {
                    $pob->write("550 Your connection has been killed by an administrator\r\n");
                    $pob->disconnect;
                    $killed++;
                }
            }
            else {
                # match by ID
                if ($pob+0 == hex($match)) {
                    $pob->write("550 Your connection has been killed by an administrator\r\n");
                    $pob->disconnect;
                    $killed++;
                }
            }
        }
    }
    
    $self->write("Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n");
}

$helptext{dump} = "DUMP \$REF - Dump a connection using Data::Dumper";
sub cmd_dump {
    my $self = shift;
    my ($ref) = @_;
    
    require Data::Dumper;
    $Data::Dumper::Indent=1;
    $Data::Dumper::Terse=1;
    
    my $descriptors = Danga::Socket->DescriptorMap;
    foreach my $fd (keys %$descriptors) {
        my $pob = $descriptors->{$fd};
        if ($pob->isa("AxKit2::Connection")) {
            if ($pob+0 == hex($ref)) {
                return $self->write( Data::Dumper::Dumper($pob) );
            }
        }
    }
    
    $self->write("Unable to find the connection: $ref. Try the LIST command\n");
}

sub DBI::FIRSTKEY {}

$helptext{leaks} = "LEAKS [DUMP] - Run Devel::GC::Helper to list leaks with optional Dumper output";
sub cmd_leaks {
    my $self = shift;
    my $dump = shift || '';
    $dump = (uc($dump) eq 'DUMP') ? 1 : 0;
    
    $self->write("Gathering GC stats in the background...\n");
    
    my $pid = fork;
    die "Can't fork" unless defined $pid;
    return if $pid;

    require Devel::GC::Helper;
    if ($dump) {
        require Data::Dumper;
        $Data::Dumper::Terse = 1;
        $Data::Dumper::Indent = 1;
        #$Data::Dumper::Deparse = 1;
    }
    
    # Child - run the leak sweep...
    my $leaks = Devel::GC::Helper::sweep();
    foreach my $leak (@$leaks) {
        $self->write("Leaked $leak\n");
        $self->write( Data::Dumper::Dumper($leak) ) if $dump;
    }
    $self->write( "Total leaks: " . scalar(@$leaks) . "\n");
    $self->write($PROMPT);
    
    exit;
}

$helptext{stats} = "STATS - Show status and statistics";
sub cmd_stats {
    my $self = shift;
    
    my $output = "Current Status as of " . gmtime() . " GMT\n\n";
    
    if (defined &AxKit2::Plugin::stats::get_stats) {
        # Stats plugin is loaded
        $output .= AxKit2::Plugin::stats->get_stats;
    }
    
    my $descriptors = Danga::Socket->DescriptorMap;
    
    my $current_connections = 0;
    my $current_dns = 0;
    foreach my $fd (keys %$descriptors) {
        my $pob = $descriptors->{$fd};
        if ($pob->isa("AxKit2::Connection")) {
            $current_connections++;
        }
    }
    
    $output .= "Current Connections: $current_connections\n";
    
    $self->write($output);
}

sub cmd_shutdown {
    my $self = shift;
    Danga::Socket->SetPostLoopCallback(sub { 0 });
    $self->close("shutdown");
}

# Cleanup routine to get rid of timed out sockets
sub _do_cleanup {
    my $now = time;
    
    # AxKit2::Client->log(LOGDEBUG, "do cleanup");
    
    Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);
    
    my $sf = __PACKAGE__->get_sock_ref;
    
    my $conns = 0;

    my %max_age;  # classname -> max age (0 means forever)
    my %max_connect; # classname -> max connect time
    my @to_close;
    while (my $k = each %$sf) {
        my AxKit2::Connection $v = $sf->{$k};
        my $ref = ref $v;
        next unless $v->isa('AxKit2::Console');
        $conns++;
        unless (defined $max_age{$ref}) {
            $max_age{$ref}      = $ref->max_idle_time || 0;
            $max_connect{$ref}  = $ref->max_connect_time || 0;
        }
        if (my $t = $max_connect{$ref}) {
            if ($v->{create_time} < $now - $t) {
                push @to_close, $v;
                next;
            }
        }
        if (my $t = $max_age{$ref}) {
            if ($v->{alive_time} < $now - $t) {
                push @to_close, $v;
            }
        }
    }
    
    $_->close("Timeout") foreach @to_close;
}

1;