Monitoring::Livestatus::MULTI - connector with multiple peers


Monitoring-Livestatus documentation Contained in the Monitoring-Livestatus distribution.

Index


Code Index:

NAME

Top

Monitoring::Livestatus::MULTI - connector with multiple peers

SYNOPSIS

Top

    use Monitoring::Livestatus;
    my $nl = Monitoring::Livestatus::MULTI->new( qw{nagioshost1:9999 nagioshost2:9999 /var/spool/nagios/live.socket} );
    my $hosts = $nl->selectall_arrayref("GET hosts");

CONSTRUCTOR

Top

new ( [ARGS] )

Creates an Monitoring::Livestatus::MULTI object. new takes at least the server. Arguments are the same as in Monitoring::Livestatus.

METHODS

Top

do

See Monitoring::Livestatus for more information.

selectall_arrayref

See Monitoring::Livestatus for more information.

selectall_hashref

See Monitoring::Livestatus for more information.

selectcol_arrayref

See Monitoring::Livestatus for more information.

selectrow_array

See Monitoring::Livestatus for more information.

selectrow_arrayref

See Monitoring::Livestatus for more information.

selectrow_hashref

See Monitoring::Livestatus for more information.

selectscalar_value

See Monitoring::Livestatus for more information.

errors_are_fatal

See Monitoring::Livestatus for more information.

warnings

See Monitoring::Livestatus for more information.

verbose

See Monitoring::Livestatus for more information.

peer_addr

See Monitoring::Livestatus for more information.

peer_name

See Monitoring::Livestatus for more information.

peer_key

See Monitoring::Livestatus for more information.

disable

 $ml->disable()

disables this connection, returns the last state.

enable

 $ml->enable()

enables this connection, returns the last state.

AUTHOR

Top

Sven Nierlein, <nierlein@cpan.org>

COPYRIGHT AND LICENSE

Top


Monitoring-Livestatus documentation Contained in the Monitoring-Livestatus distribution.
package Monitoring::Livestatus::MULTI;

use 5.000000;
use strict;
use warnings;
use Carp;
use Data::Dumper;
use Config;
use Time::HiRes qw/gettimeofday tv_interval/;
use Scalar::Util qw/looks_like_number/;
use Monitoring::Livestatus;
use base "Monitoring::Livestatus";

sub new {
    my $class = shift;
    unshift(@_, "peer") if scalar @_ == 1;
    my(%options) = @_;

    $options{'backend'} = $class;
    my $self = Monitoring::Livestatus->new(%options);
    bless $self, $class;

    if(!defined $self->{'peers'}) {
        $self->{'peer'} = $self->_get_peers();

        # set our peer(s) from the options
        my %peer_options;
        my $peers;
        for my $opt_key (keys %options) {
            $peer_options{$opt_key} = $options{$opt_key};
        }
        $peer_options{'errors_are_fatal'} = 0;
        for my $peer (@{$self->{'peer'}}) {
            $peer_options{'name'} = $peer->{'name'};
            $peer_options{'peer'} = $peer->{'peer'};
            delete $peer_options{'socket'};
            delete $peer_options{'server'};

            if($peer->{'type'} eq 'UNIX') {
                push @{$peers}, new Monitoring::Livestatus::UNIX(%peer_options);
            }
            elsif($peer->{'type'} eq 'INET') {
                push @{$peers}, new Monitoring::Livestatus::INET(%peer_options);
            }
        }
        $self->{'peers'} = $peers;
        delete $self->{'socket'};
        delete $self->{'server'};
    }

    if(!defined $self->{'peers'}) {
        croak('please specify at least one peer, socket or server');
    }

    # dont use threads with only one peer
    if(scalar @{$self->{'peers'}} == 1) { $self->{'use_threads'} = 0; }

    # check for threads support
    if(!defined $self->{'use_threads'}) {
        $self->{'use_threads'} = 0;
        if($Config{useithreads}) {
            $self->{'use_threads'} = 1;
        }
    }
    if($self->{'use_threads'}) {
        eval {
            require threads;
            require Thread::Queue;
        };
        if($@) {
            $self->{'use_threads'} = 0;
            $self->{'logger'}->debug('error initializing threads: '.$@) if defined $self->{'logger'};
        } else {
            $self->_start_worker;
        }
    }

    # initialize peer keys
    $self->{'peer_by_key'} = {};
    $self->{'peer_by_addr'} = {};
    for my $peer (@{$self->{'peers'}}) {
        $self->{'peer_by_key'}->{$peer->peer_key}   = $peer;
        $self->{'peer_by_addr'}->{$peer->peer_addr} = $peer;
    }

    $self->{'name'} = 'multiple connector' unless defined $self->{'name'};
    $self->{'logger'}->debug('initialized Monitoring::Livestatus::MULTI '.($self->{'use_threads'} ? 'with' : 'without' ).' threads') if $self->{'verbose'};

    return $self;
}


########################################

sub do {
    my $self  = shift;
    my $opts  = $self->_lowercase_and_verify_options($_[1]);
    my $t0    = [gettimeofday];

    $self->_do_on_peers("do", $opts->{'backends'}, @_);
    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for do('.$_[0].') in total') if $self->{'verbose'};
    return 1;
}


########################################

sub selectall_arrayref {
    my $self  = shift;
    my $opts  = $self->_lowercase_and_verify_options($_[1]);
    my $t0    = [gettimeofday];

    $self->_log_statement($_[0], $opts, 0) if $self->{'verbose'};

    my $return  = $self->_merge_answer($self->_do_on_peers("selectall_arrayref", $opts->{'backends'}, @_));
    my $elapsed = tv_interval ( $t0 );
    if($self->{'verbose'}) {
        my $total_results = 0;
        $total_results    = scalar @{$return} if defined $return;
        $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectall_arrayref() in total, results: '.$total_results);
    }

    return $return;
}


########################################

sub selectall_hashref {
    my $self  = shift;
    my $opts  = $self->_lowercase_and_verify_options($_[2]);
    my $t0    = [gettimeofday];

    my $return  = $self->_merge_answer($self->_do_on_peers("selectall_hashref", $opts->{'backends'}, @_));
    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectall_hashref() in total') if $self->{'verbose'};

    return $return;
}


########################################

sub selectcol_arrayref {
    my $self  = shift;
    my $opts  = $self->_lowercase_and_verify_options($_[1]);
    my $t0    = [gettimeofday];

    my $return  = $self->_merge_answer($self->_do_on_peers("selectcol_arrayref", $opts->{'backends'}, @_));
    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectcol_arrayref() in total') if $self->{'verbose'};

    return $return;
}


########################################

sub selectrow_array {
    my $self      = shift;
    my $statement = $_[0];
    my $opts      = $self->_lowercase_and_verify_options($_[1]);
    my $t0        = [gettimeofday];
    my @return;

    if((defined $opts->{'sum'} and $opts->{'sum'} == 1) or (!defined $opts->{'sum'} and $statement =~ m/^Stats:/mx)) {
        @return = @{$self->_sum_answer($self->_do_on_peers("selectrow_arrayref", $opts->{'backends'}, @_))};
    } else {
        if($self->{'warnings'}) {
            carp("selectrow_arrayref without Stats on multi backend will not work as expected!");
        }
        my $rows = $self->_merge_answer($self->_do_on_peers("selectrow_arrayref", $opts->{'backends'}, @_));
        @return = @{$rows} if defined $rows;
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectrow_array() in total') if $self->{'verbose'};

    return @return;
}


########################################

sub selectrow_arrayref {
    my $self      = shift;
    my $statement = $_[0];
    my $opts      = $self->_lowercase_and_verify_options($_[1]);
    my $t0        = [gettimeofday];
    my $return;

    if((defined $opts->{'sum'} and $opts->{'sum'} == 1) or (!defined $opts->{'sum'} and $statement =~ m/^Stats:/mx)) {
        $return = $self->_sum_answer($self->_do_on_peers("selectrow_arrayref", $opts->{'backends'}, @_));
    } else {
        if($self->{'warnings'}) {
            carp("selectrow_arrayref without Stats on multi backend will not work as expected!");
        }
        my $rows = $self->_merge_answer($self->_do_on_peers("selectrow_arrayref", $opts->{'backends'}, @_));
        $return = $rows->[0] if defined $rows->[0];
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectrow_arrayref() in total') if $self->{'verbose'};

    return $return;
}


########################################

sub selectrow_hashref {
    my $self      = shift;
    my $statement = $_[0];
    my $opts      = $self->_lowercase_and_verify_options($_[1]);

    my $t0 = [gettimeofday];

    my $return;

    if((defined $opts->{'sum'} and $opts->{'sum'} == 1) or (!defined $opts->{'sum'} and $statement =~ m/^Stats:/mx)) {
        $return = $self->_sum_answer($self->_do_on_peers("selectrow_hashref", $opts->{'backends'}, @_));
    } else {
        if($self->{'warnings'}) {
            carp("selectrow_hashref without Stats on multi backend will not work as expected!");
        }
        $return = $self->_merge_answer($self->_do_on_peers("selectrow_hashref", $opts->{'backends'}, @_));
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectrow_hashref() in total') if $self->{'verbose'};

    return $return;
}


########################################

sub selectscalar_value {
    my $self  = shift;
    my $statement = $_[0];
    my $opts      = $self->_lowercase_and_verify_options($_[1]);

    my $t0 = [gettimeofday];

    my $return;

    if((defined $opts->{'sum'} and $opts->{'sum'} == 1) or (!defined $opts->{'sum'} and $statement =~ m/^Stats:/mx)) {
        return $self->_sum_answer($self->_do_on_peers("selectscalar_value", $opts->{'backends'}, @_));
    } else {
        if($self->{'warnings'}) {
            carp("selectscalar_value without Stats on multi backend will not work as expected!");
        }
        my $rows = $self->_merge_answer($self->_do_on_peers("selectscalar_value", $opts->{'backends'}, @_));

        $return = $rows->[0] if defined $rows->[0];
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for selectscalar_value() in total') if $self->{'verbose'};

    return $return;
}


########################################

sub errors_are_fatal {
    my $self  = shift;
    my $value = shift;
    return $self->_change_setting('errors_are_fatal', $value);
}


########################################

sub warnings {
    my $self  = shift;
    my $value = shift;
    return $self->_change_setting('warnings', $value);
}


########################################

sub verbose {
    my $self  = shift;
    my $value = shift;
    return $self->_change_setting('verbose', $value);
}


########################################

sub peer_addr {
    my $self  = shift;

    my @addrs;
    for my $peer (@{$self->{'peers'}}) {
        push @addrs, $peer->peer_addr;
    }

    return wantarray ? @addrs : undef;
}


########################################

sub peer_name {
    my $self  = shift;

    my @names;
    for my $peer (@{$self->{'peers'}}) {
        push @names, $peer->peer_name;
    }

    return wantarray ? @names : $self->{'name'};
}


########################################

sub peer_key {
    my $self  = shift;

    my @keys;
    for my $peer (@{$self->{'peers'}}) {
        push @keys, $peer->peer_key;
    }

    return wantarray ? @keys : $self->{'key'};
}


########################################

sub disable {
    my $self     = shift;
    my $peer_key = shift;
    if(!defined $peer_key) {
        for my $peer (@{$self->{'peers'}}) {
            $peer->disable();
        }
        return 1;
    } else {
        my $peer     = $self->_get_peer_by_key($peer_key);
        my $prev     = $peer->{'disabled'};
        $peer->{'disabled'} = 1;
        return $prev;
    }
}


########################################

sub enable {
    my $self     = shift;
    my $peer_key = shift;
    if(!defined $peer_key) {
        for my $peer (@{$self->{'peers'}}) {
            $peer->enable();
        }
        return 1;
    } else {
        my $peer     = $self->_get_peer_by_key($peer_key);
        my $prev     = $peer->{'disabled'};
        $peer->{'disabled'} = 0;
        return $prev;
    }
}

########################################
# INTERNAL SUBS
########################################

sub _change_setting {
    my $self  = shift;
    my $key   = shift;
    my $value = shift;
    my $old   = $self->{$key};

    # set new value
    if(defined $value) {
        $self->{$key} = $value;
        for my $peer (@{$self->{'peers'}}) {
            $peer->{$key} = $value;
        }

        # restart workers
        if($self->{'use_threads'}) {
            _stop_worker();
            $self->_start_worker();
        }
    }

    return $old;
}


########################################
sub _start_worker {
    my $self = shift;

    # create job transports
    $self->{'WorkQueue'}   = Thread::Queue->new;
    $self->{'WorkResults'} = Thread::Queue->new;

    # set signal handler before thread is started
    # otherwise they would be killed when started
    # and stopped immediately after start
    $SIG{'USR1'} = sub { threads->exit(); };

    # start worker threads
    our %threads;
    my $threadcount = scalar @{$self->{'peers'}};
    for(my $x = 0; $x < $threadcount; $x++) {
        $self->{'threads'}->[$x] = threads->new(\&_worker_thread, $self->{'peers'}, $self->{'WorkQueue'}, $self->{'WorkResults'}, $self->{'logger'});
    }

    # restore sig handler as it was only for the threads
    $SIG{'USR1'} = 'DEFAULT';
    return;
}


########################################
sub _stop_worker {
    # try to kill our threads safely
    eval {
        for my $thr (threads->list()) {
            $thr->kill('USR1')->detach();
        }
    };
    return;
}


########################################
sub _worker_thread {
    local $SIG{'USR1'} = sub { threads->exit(); };

    my $peers       = shift;
    my $workQueue   = shift;
    my $workResults = shift;
    my $logger      = shift;

    while (my $job = $workQueue->dequeue) {
        my $erg;
        eval {
            $erg = _do_wrapper($peers->[$job->{'peer'}], $job->{'sub'}, $logger, @{$job->{'opts'}});
        };
        if($@) {
            warn("Error in Thread ".$job->{'peer'}." :".$@);
            $job->{'logger'}->error("Error in Thread ".$job->{'peer'}." :".$@) if defined $job->{'logger'};
        };
        $workResults->enqueue({ peer => $job->{'peer'}, result => $erg });
    }
    return;
}


########################################
sub _do_wrapper {
    my $peer   = shift;
    my $sub    = shift;
    my $logger = shift;
    my @opts   = @_;

    my $t0 = [gettimeofday];

    my $data = $peer->$sub(@opts);

    my $elapsed = tv_interval ( $t0 );
    $logger->debug(sprintf('%.4f', $elapsed).' sec for fetching data on '.$peer->peer_name.' ('.$peer->peer_addr.')') if defined $logger;

    $Monitoring::Livestatus::ErrorCode    = 0 unless defined $Monitoring::Livestatus::ErrorCode;
    $Monitoring::Livestatus::ErrorMessage = '' unless defined $Monitoring::Livestatus::ErrorMessage;
    my $return = {
            'msg'  => $Monitoring::Livestatus::ErrorMessage,
            'code' => $Monitoring::Livestatus::ErrorCode,
            'data' => $data,
    };
    return $return;
}


########################################
sub _do_on_peers {
    my $self        = shift;
    my $sub         = shift;
    my $backends    = shift;
    my @opts        = @_;
    my $statement   = $opts[0];
    my $use_threads = $self->{'use_threads'};
    my $t0          = [gettimeofday];

    my $return;
    my %codes;
    my %messages;
    my $query_options;
    if($sub eq 'selectall_hashref') {
        $query_options = $self->_lowercase_and_verify_options($opts[2]);
    } else {
        $query_options = $self->_lowercase_and_verify_options($opts[1]);
    }

    # which peers affected?
    my @peers;
    if(defined $backends) {
        my @backends;
        if(ref $backends eq '') {
            push @backends, $backends;
        }
        elsif(ref $backends eq 'ARRAY') {
            @backends = @{$backends};
        } else {
            croak("unsupported type for backend: ".ref($backends));
        }

        for my $key (@backends) {
            my $backend = $self->_get_peer_by_key($key);
            push @peers, $backend unless $backend->{'disabled'};
        }
    } else {
        # use all backends
        @peers = @{$self->{'peers'}};
    }

    # its faster without threads for only one peer
    if(scalar @peers <= 1) { $use_threads = 0; }

    # if we have limits set, we cannot use threads
    if(defined $query_options->{'limit_start'}) { $use_threads = 0; }

    if($use_threads) {
        # use the threaded variant
        $self->{'logger'}->debug('using threads') if $self->{'verbose'};

        my $peers_to_use;
        for my $peer (@peers) {
            if($peer->{'disabled'}) {
                # dont send any query
            }
            elsif($peer->marked_bad) {
                warn($peer->peer_name.' ('.$peer->peer_key.') is marked bad') if $self->{'verbose'};
            }
            else {
                $peers_to_use->{$peer->peer_key} = 1;
            }
        }
        my $x = 0;
        for my $peer (@{$self->{'peers'}}) {
            if(defined $peers_to_use->{$peer->peer_key}) {
                my $job = {
                        'peer'   => $x,
                        'sub'    => $sub,
                        'opts'   => \@opts,
                };
                $self->{'WorkQueue'}->enqueue($job);
            }
            $x++;
        }

        for(my $x = 0; $x < scalar keys %{$peers_to_use}; $x++) {
            my $result = $self->{'WorkResults'}->dequeue;
            my $peer   = $self->{'peers'}->[$result->{'peer'}];
            if(defined $result->{'result'}) {
                push @{$codes{$result->{'result'}->{'code'}}}, { 'peer' => $peer->peer_key, 'msg' => $result->{'result'}->{'msg'} };
                $return->{$peer->peer_key} = $result->{'result'}->{'data'};
            } else {
                warn("undefined result for: $statement");
            }
        }
    } else {
        $self->{'logger'}->debug('not using threads') if $self->{'verbose'};
        for my $peer (@peers) {
            if($peer->{'disabled'}) {
                # dont send any query
            }
            elsif($peer->marked_bad) {
                warn($peer->peer_name.' ('.$peer->peer_key.') is marked bad') if $self->{'verbose'};
            } else {
                my $erg = _do_wrapper($peer, $sub, $self->{'logger'}, @opts);
                $return->{$peer->peer_key} = $erg->{'data'};
                push @{$codes{$erg->{'code'}}}, { 'peer' => $peer, 'msg' => $erg->{'msg'} };

                # compute limits
                if(defined $query_options->{'limit_length'} and $peer->{'meta_data'}->{'result_count'}) {
                    last;
                }
                # set a new start if we had rows already
                if(defined $query_options->{'limit_start'}) {
                    $query_options->{'limit_start'} = $query_options->{'limit_start'} - $peer->{'meta_data'}->{'row_count'};
                }
            }
        }
    }


    # check if we different result stati
    undef $Monitoring::Livestatus::ErrorMessage;
    $Monitoring::Livestatus::ErrorCode = 0;
    my @codes = sort keys %codes;
    if(scalar @codes > 1) {
        # got different results for our backends
        if($self->{'verbose'}) {
            $self->{'logger'}->warn("got different result stati: ".Dumper(\%codes));
        }
    } else {
        # got same result codes for all backend
    }

    my $failed = 0;
    my $code = $codes[0];
    if(defined $code and $code >= 300) {
        $failed = 1;
    }

    if($failed) {
        my $msg  = $codes{$code}->[0]->{'msg'};
        $self->{'logger'}->debug("same: $code -> $msg") if $self->{'verbose'};
        $Monitoring::Livestatus::ErrorMessage = $msg;
        $Monitoring::Livestatus::ErrorCode    = $code;
        if($self->{'errors_are_fatal'}) {
            croak("ERROR ".$code." - ".$Monitoring::Livestatus::ErrorMessage." in query:\n'".$statement."'\n");
        }
        return;
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for fetching all data') if $self->{'verbose'};

    # deep copy result?
    if($use_threads
       and (
            (defined $query_options->{'deepcopy'} and $query_options->{'deepcopy'} == 1)
            or
            (defined $self->{'deepcopy'}          and $self->{'deepcopy'} == 1)
        )
       ) {
        # result has to be cloned to avoid "Invalid value for shared scalar" error

        $return = $self->_clone($return, $self->{'logger'});
    }

    return($return);
}


########################################
sub _merge_answer {
    my $self   = shift;
    my $data   = shift;
    my $return;

    my $t0 = [gettimeofday];

    # iterate over original peers to retain order
    for my $peer (@{$self->{'peers'}}) {
        my $key = $peer->peer_key;
        next if !defined $data->{$key};

        if(ref $data->{$key} eq 'ARRAY') {
            $return = [] unless defined $return;
            $return = [ @{$return}, @{$data->{$key}} ];
        } elsif(ref $data->{$key} eq 'HASH') {
            $return = {} unless defined $return;
            $return = { %{$return}, %{$data->{$key}} };
        } else {
            push @{$return}, $data->{$key};
        }
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for merging data') if $self->{'verbose'};

    return($return);
}


########################################
sub _sum_answer {
    my $self   = shift;
    my $data   = shift;
    my $return;
    my $t0 = [gettimeofday];
    for my $peername (keys %{$data}) {
        if(ref $data->{$peername} eq 'HASH') {
            for my $key (keys %{$data->{$peername}}) {
                if(!defined $return->{$key}) {
                    $return->{$key} = $data->{$peername}->{$key};
                } elsif(looks_like_number($data->{$peername}->{$key})) {
                    $return->{$key} += $data->{$peername}->{$key};
                }
            }
        }
        elsif(ref $data->{$peername} eq 'ARRAY') {
            my $x = 0;
            for my $val (@{$data->{$peername}}) {
                if(!defined $return->[$x]) {
                    $return->[$x] = $data->{$peername}->[$x];
                } else {
                    $return->[$x] += $data->{$peername}->[$x];
                }
                $x++;
            }
        } elsif(defined $data->{$peername}) {
            $return = 0 unless defined $return;
            next unless defined $data->{$peername};
            $return += $data->{$peername};
        }
    }

    my $elapsed = tv_interval ( $t0 );
    $self->{'logger'}->debug(sprintf('%.4f', $elapsed).' sec for summarizing data') if $self->{'verbose'};

    return $return;
}


########################################
sub _clone {
    my $self   = shift;
    my $data   = shift;
    my $logger = shift;
    my $t0     = [gettimeofday];

    my $return;
    if(ref $data eq '') {
        $return = $data;
    }
    elsif(ref $data eq 'ARRAY') {
        $return = [];
        for my $dat (@{$data}) {
            push @{$return}, $self->_clone($dat);
        }
    }
    elsif(ref $data eq 'HASH') {
        $return = {};
        for my $key (keys %{$data}) {
            $return->{$key} = $self->_clone($data->{$key});
        }
    }
    else {
        croak("cant clone: ".(ref $data));
    }

    my $elapsed = tv_interval ( $t0 );
    $logger->debug(sprintf('%.4f', $elapsed).' sec for cloning data') if defined $logger;

    return $return;
}


########################################
sub _get_peer_by_key {
    my $self = shift;
    my $key  = shift;

    return unless defined $key;
    return unless defined $self->{'peer_by_key'}->{$key};

    return $self->{'peer_by_key'}->{$key};
}


########################################
sub _get_peer_by_addr {
    my $self = shift;
    my $addr = shift;

    return unless defined $addr;
    return unless defined $self->{'peer_by_addr'}->{$addr};

    return $self->{'peer_by_addr'}->{$addr};
}


########################################

END {
    # try to kill our threads safely
    _stop_worker();
}

########################################

1;

__END__