/usr/local/CPAN/Net-Z3950-AsyncZ/Net/Z3950/AsyncZ.pm
# $Date: 2004/03/25 22:58:20 $
# $Revision: 1.14 $
package Net::Z3950::AsyncZ;
our $VERSION = '0.10';
use Net::Z3950::AsyncZ::Options::_params;
use Net::Z3950::AsyncZ::Errors;
use Net::Z3950::AsyncZ::ZLoop;
use Net::Z3950::AsyncZ::ErrMsg;
use Event;
use POSIX ":sys_wait_h";
use Symbol;
use Exporter;
use sigtrap qw (die untrapped normal-signals die error-signals);
@ISA=qw (Exporter);
@EXPORT_OK = qw(asyncZOptions isZ_MARC isZ_GRS isZ_RAW isZ_Error isZ_nonRetryable isZ_Info
isZ_DEFAULT noZ_Response isZ_Header isZ_ServerName Z_serverName getZ_RecNum
getZ_RecSize delZ_header delZ_pid delZ_serverName prep_Raw get_ZRawRec
);
%EXPORT_TAGS = (
record => [qw(isZ_MARC isZ_GRS isZ_RAW isZ_DEFAULT getZ_RecNum getZ_RecSize)],
errors => [qw(isZ_Error isZ_nonRetryable)],
header => [qw(isZ_ServerName Z_serverName noZ_Response isZ_Header
delZ_header delZ_pid delZ_serverName isZ_Info)]
);
use IPC::ShareLite qw( LOCK_EX);
use strict;
my %forkedPID=(); # pids of forked process saved in hash:
# keys = pids, values = our indexes to forked processes
# these deleted when fork data is processed
# if there are no keys left in hash, then the timer loop exits
my %exitCode=(); # saves exit codes of forked processes
# keys = pids, values = exit codes
# processes without 0 values are killed in DESTROY to prevent zombies
my %resultTable = (); # saves pids, hosts and report results of child processes
# keys = pids, values = [ host, report_results, index, retry_index ]
#
# SLOT 0 host server address
# SLOT 1 report results: boolean = true if report, false if not
# SLOT 2 index of process in current cycle (original or retry)
# SLOT 3 retry_index:
# # -1, -2 or index of process retrying a failed query
# # initialized to -1 in original cycle,-2 in retry cycle
# (a positive retry_index replaces original cycle's -1)
# # the retry_index is always -2 in the retry process:
#
my $__DBUG = 0;
my $_ERROR_VAL = Net::Z3950::AsyncZ::Errors::errorval();
$SIG{CHLD} = \&childhandler;
sub asyncZOptions { return Net::Z3950::AsyncZ::Options::_params->new(@_); }
sub isZ_Header { $_[0] =~ Net::Z3950::AsyncZ::Report::get_pats(); }
sub isZ_MARC { $_[0] =~ Net::Z3950::AsyncZ::Report::get_MARC_pat(); }
sub isZ_GRS { $_[0] =~ Net::Z3950::AsyncZ::Report::get_GRS_pat(); }
sub isZ_RAW { $_[0] =~ Net::Z3950::AsyncZ::Report::get_RAW_pat(); }
sub isZ_DEFAULT { $_[0] =~ Net::Z3950::AsyncZ::Report::get_DEFAULT_pat(); }
sub getZ_RecNum { $_[0] =~ /\s(\d+)\]/; $1; }
sub _setupUTF8 {
return if is_utf8_init();
local $^W = 0;
eval { require MARC::Charset; };
local $^W = 1;
if ($@) {
warn "UTF8 requires MARC::Charset\n";
return 0;
}
set_uft8_init();
return 1;
}
# params: string or ref to string
# boolean: true, then substitution uses 'g' modifier
# substitution string
# if subst string is not defined, empty string is substituted
# return: either string or reference to string, depending on whether a reference or a string
# was intially passed in paramter $_[0]
sub delZ_header {
my($str,$g, $subst) = @_;
my $pat = Net::Z3950::AsyncZ::Report::get_pats();
return _del_headers($str,$pat, $g, $subst);
}
# see delZ_header
sub delZ_pid {
my($str,$g, $subst) = @_;
return _del_headers($str,'<#--\d+-->', $g, $subst);
}
# see delZ_header
sub delZ_serverName {
my($str,$g, $subst) = @_;
return _del_headers($str,'<!--.*?-->', $g, $subst);
}
sub _del_headers {
my $str = ref $_[0] ? ${$_[0]} : $_[0];
my $pat = $_[1];
my $g = $_[2];
my $subst = (defined $_[3]) ? $_[3] : "";
if($g) {
$str =~ s/$pat/$subst/g;
}
else {
$str =~ s/$pat/$subst/;
}
return \$str if ref $_[0];
return $str;
}
# make string from array, return ref to string
# param: array of raw records
sub prep_Raw {
my $raw = shift;
my $str = join "",@$raw;
$raw = delZ_header(\$str); # will get back ref to string
$raw = delZ_pid($raw,1); # passing ref will get back ref
$raw = delZ_serverName($raw,1);
$raw = delZ_header($raw,1,'<!##!>');
return $raw;
}
# param: ref to string of raw records
# return next record
sub get_ZRawRec {
my $raw = shift;
return undef if ! $raw;
if ($$raw !~ /<!##!>/) { # presumed last record
my $rec = $$raw;
$$raw = "";
return $rec;
}
$$raw =~ s/(.*?)<!##!>//;
return $1;
}
# tests whether line is our substitue for absence of Report:
# {!-- library.anu.edu.au --}
# It reports previous server's name in curlies, substituted for angle brackets
# (like HTML comment) which hold server name in header of each report item
sub noZ_Response { $_[0]=~/\{!--\s+.*\s+--\}/; }
# tests if line contains server name
sub isZ_ServerName { $_[0] =~ /<!--(.*)-->/; }
sub isZ_PID { $_[0] =~ /<#--\d+-->/; }
sub isZ_Info { &isZ_PID || &noZ_Response; }
# returns server name
sub Z_serverName {
if( $_[0] =~ /<!--(.*?)-->/){
return $1 if $1;
}
return undef;
}
# returns 0 if not an error
# returns 2 if cycle 2 error
# returns 1 if non-recoverable cycle 1 error
sub isZ_Error {
my $err = shift;
return 0 if !$err;
return 2 if $err->[0] && $err->[1];
return 1 if $err->[0] && !$err->[0]->{retry};
return 0;
}
# tests return value of isZ_Error()
# returns true if the error was a cycle 1 fatal error
sub isZ_nonRetryable { $_[0] == 1; }
{
my @results=();
my @errors=();
my @recSize = ();
my $busy = 0;
my $utf8_init = 0;
sub is_utf8_init {
$utf8_init;
}
sub set_uft8_init {
$utf8_init = 1;
}
sub _utf8 {
my $index = shift;
_setupUTF8() if !$utf8_init;
return if !$utf8_init;
my $cs = MARC::Charset->new();
for(my $i = 0; $i < scalar(@{$results[$index]}); $i++) {
$results[$index]->[$i] = $cs->to_utf8($results[$index]->[$i]);
}
}
sub _saveResults {
$busy = 1;
my ($arr, $index) = @_;
$results[$index] = $arr;
$busy = 0;
}
sub _saveErrors {
@errors = @_;
}
sub _isBusy { return $busy; }
# returns reference to results array
sub getResult {
my ($self,$index) = @_;
_utf8($index) if $self->{options}[$index]-> _getFieldValue('utf8');
return $results[$index];
}
sub getZ_RecSize { $recSize[$_[0]]; }
sub getErrors {
my ($self,$index) = @_;
return [$errors[$index]->[0], $errors[$index]->[1]] if $errors[$index];
return undef;
}
sub getMaxErrors { return scalar @errors; }
sub _callback {
$busy = 1;
my ($self, $index) = @_;
_utf8($index) if $self->{options}[$index]-> _getFieldValue('utf8');
my $cb = $self->{options}[$index]-> _getFieldValue('cb');
$cb = $self->{cb} if !$cb;
my $last_el = scalar(@{$results[$index]})-1;
my $size = $results[$index]->[$last_el];
$size =~ /\*==(\d+)==\*/;
$recSize[$index] = $1 ? $1 : 0;
$results[$index]->[$last_el] =~s/\*==(\d+)==\*//;
&$cb($index, $results[$index]) if $cb;
$busy = 0;
}
}
#-------------------------------------------------------------------#
# private paramaters:
# start: start time for timers
# zl: array of forked processes
# errors: reference to Net::Z3950::AsyncZ::Errors object for main process
# share: reference to IPC::ShareLite
# timer: reference to timer watcher
# unlooped: notifies DESTROY when all pipes have been processed,
# because DESTROY is called for each closed pipe--hence
# makes it safe to do cleanup that applies to main process
# monitor_pid: pid of the monitor, for killing it
#--------------------------------------------------------------------#
sub new {
my($class, %args) = @_;
my $index = 0;
my $self = {
start => time(), zl => [], query=>$args{query}, errors=>undef,
log=>$args{log} || undef, cb=>$args{cb}, timer => undef,
timeout=>$args{timeout} || 25, timeout_min=>$args{timeout_min} || 5,
interval => $args{interval} || 1, servers=>$args{servers},
options=>$args{options}, unlooped=>0, maxpipes=>$args{maxpipes} || 4,
share => undef, monitor => 0 || $args{monitor}, monitor_pid=>undef,
swap_check => $args{swap_check} || 0, swap_attempts => $args{swap_attempts} || 5
};
bless $self,$class;
$self->{ errors } = Net::Z3950::AsyncZ::Errors->new($self->{log});
%forkedPID=();
%exitCode=();
%resultTable = ();
my $incr = $self->{maxpipes};
$self->{share} = new IPC::ShareLite( -key => $$ + 5000,
-create => 'yes',
-destroy => 'yes');
$self->{monitor_pid} = $self->_monitor() if $self->{monitor};
$SIG{HUP} = sub {
$self->{abort} = 1;
$self->{unlooped} = 1; # notify DESTROY that it's safe to kill outstanding processes
$! = 227;
die "Aborting."
};
$self->processHosts(-1,%args);
# retry servers that returned without error fatal codes
my @retries = $self->_getReTries();
$args{'servers'} = \@retries;
$self->{'servers'} = $args{'servers'};
$self->processHosts(-2, %args);
$self->_showStats(\%resultTable) if $__DBUG;
$self->_processErrors();
kill KILL => $self->{monitor_pid} if $self->{monitor};
$self->{share} = undef;
return $self;
}
sub processHosts {
my ($self, $retry_marker, %args) = @_;
my $index = 0;
my $count = 0;
$self->{unlooped} = 0;
$self->{start} = time();
%forkedPID=();
foreach my $server(@{$args{servers}}) {
$self->{server} = $server;
$self->{options}[$index] = Net::Z3950::AsyncZ::Options::_params->new(format=>$args{format},
num_to_fetch=>$args{num_to_fetch})
if ! defined $self->{options}[$index];
$self->{options}[$index]->option(_this_server=>$server->[0]);
$self->start($index, $retry_marker);
if($count == $self->{maxpipes}) {
my $mem_avail = $self->{swap_check} ? 0 : 1;
my $attempts = 0;
while(!$mem_avail) {
$mem_avail = is_mem_left();
if (!$mem_avail){
my $start_t = time();
Event->timer(at => time+$self->{swap_check},cb => sub { $_[0]->w->cancel;} );
Event::loop;
# print STDERR "(swap-check) slept: ", time()-$start_t,"\n" if $__DBUG;
}
$attempts++;
# print STDERR "(swap-check) attempts: $attempts\n" if $__DBUG;;
die "Memory resources appear to be too low to continue;\n",
"try settng the swap_check to a higher value and or",
"allowing for more than $self->{swap_attempts} swap_attempts\n"
if $attempts > $self->{swap_attempts};
}
$self->{timer} =
Event->timer(interval => $self->{interval}, hard=>1, cb=> sub { $self->timerCallBack(); } );
Event::loop();
$count = -1;
}
$index++;
$count++;
}
# if there are any servers left to wait for, get another loop
if(scalar (@{$args{servers}})%$self->{maxpipes} != 0) {
$self->{timer} =
Event->timer(interval => $self->{interval}, hard=>1, cb=> sub { $self->timerCallBack(); } );
Event::loop();
}
$self->{unlooped} = 1;
}
sub _getReTries {
my $self = shift;
my @retries=();
my $count=0;
foreach my $pid (keys %resultTable) {
if($resultTable{$pid}->[1] == 0) {
my $err = Net::Z3950::AsyncZ::ErrMsg->new($exitCode{$pid}); ## created for testing only
next if !$err->doRetry(); ## not being saved
my $index = $resultTable{$pid}->[2];
push @retries, $self->{servers}[$index];
$self->{options}[$count] = $self->{options}[$index];
$resultTable{$pid}->[3] = $count; # save retry index
$count++;
}
}
return @retries;
}
sub start {
my $self=shift;
return if defined $self->{abort};
my $index = shift;
my $retry_marker = shift;
my $pid;
if($pid = fork) {
$forkedPID{$pid} = $index;
$exitCode{$pid} = -1;
$resultTable{$pid}->[0] = @{$self->{servers}[$index]}[0]; # server name
$resultTable{$pid}->[1] = 0; # report = false
$resultTable{$pid}->[2] = $index; # current index
$resultTable{$pid}->[3] = $retry_marker; # retry index
print "process $index: \$pid = $pid $resultTable{$pid}->[0] @{$self->{servers}[$index]}[1] @{$self->{servers}[$index]}[2]\n" if $__DBUG;
}
else {
die "Server cannot handle your request at this time" unless defined $pid;
$self->{share}->destroy(0);
my $update = $self->{options}[$index]->_updateObjectHash($self);
my $query = $update->{query} ? $update->{query} : $self->{query};
my $log = $update->{log} ? $update->{log} : $self->{log};
$self->{options}[$index]->_setFieldValue('_this_pid', $$);
my $zerrs = Net::Z3950::AsyncZ::Errors->new($log, @{$self->{server}}[0], $query,
$self->{options}[$index]->get_preferredRecordSyntax(),
@{$self->{server}}[2]
);
$self->{zl}[$index] =
Net::Z3950::AsyncZ::ZLoop->new(@{$self->{server}},$query,$self->{options}[$index]);
$self->{zl}[$index]->setTimer($self->{interval});
my $host = @{$self->{servers}[$index]}[0];
if ($self->{zl}[$index]->{report} && $self->{share}) {
push @{$self->{zl}[$index]->{report}},
"*==" . $self->{zl}[$index]->{rsize} . "==*\n";
$self->{share}->store(join '',@{$self->{zl}[$index]->{report}});
}
elsif ($self->{share}) {
$self->{share}->store("");
}
else { exit (Net::Z3950::AsyncZ::ErrMsg::_EINVAL()); }
exit 0;
}
}
{
my $in_getResult = 0;
sub _gettingResult { $in_getResult; }
sub _getResult {
$in_getResult = 1;
my ($self, $pid) = @_;
exit (Net::Z3950::AsyncZ::ErrMsg::_EINVAL()) if !$self->{share};
$self->{share}->lock(LOCK_EX);
while(_isBusy()) { }
my $data = $self->{share}->fetch();
return if !$data; # presumably should never occur
# but it happened once and split doesn't
# complain about splitting an undefined value
my @data = split "\n", $data;
$data[0] =~ /<!--(.*)-->/;
my $host = $1;
$self->{share}->store("\{!\-\- $host \-\-\}") if $host;
$data[1] =~ /<#--(\d+)-->/ if $data[1];
my $_this_pid = $1 if $1;
$resultTable{$_this_pid}->[1] = 1
if $_this_pid && exists $resultTable{$_this_pid};
splice(@data,1,1);
$pid = $_this_pid if $_this_pid;
my $index = _getIndex($pid);
while(_isBusy()) { }
_saveResults(\@data, $index);
while(_isBusy()) { }
$self->_callback($index); # if $self->{cb};
$self->{share}->unlock;
$in_getResult = 0;
}
}
sub _getIndex {
my $pid = shift;
return $resultTable{$pid}->[2] if $resultTable{$pid}->[3] == -1; # cycle 1, no retry index
my $current_index = $resultTable{$pid}->[2]; # this process's index, from either cycle
foreach $pid (keys %resultTable) { # if this retry index == $current_index,
return $resultTable{$pid}->[2] # $current_index must be a cycle 2 index
if $resultTable{$pid}->[3] == $current_index; # and this table entry is cycle 1 entry
}
return $resultTable{$pid}->[2]; # default: returns cycle 1 or 2 index
}
sub allDone {
foreach my $pid (keys %exitCode) {
return 0 if $exitCode{$pid} == -1;
}
return 1;
}
sub timerCallBack {
my $self=shift;
my $Seconds = time();
foreach my $pid (keys %forkedPID) {
while (_gettingResult()) { }
$self->_getResult($pid), delete $forkedPID{$pid} if $exitCode{$pid} == 0;
}
my $endval = $Seconds - $self->{start};
if ($endval > $self->{timeout} || allDone() ) {
$self->{timer}->cancel();
Event::unloop();
}
}
sub _processErrors {
my $self = shift;
my %cycle_1 = ();
my %cycle_2 = ();
my @errors = ();
my $_count = 0;
$__DBUG =0;
print "\n\nProcessing Errors\n" if $__DBUG;
foreach my $pid (keys %resultTable) {
$cycle_2{$pid} = $resultTable{$pid}, next
if $resultTable{$pid}->[1] == 0 && $resultTable{$pid}->[3] == -2;
$cycle_1{$pid} = $resultTable{$pid}
if $resultTable{$pid}->[1] == 0;
}
print "\nCycle 1\n" if $__DBUG;
$self->_showStats(\%cycle_1) if $__DBUG;
foreach my $pid_1 (keys %cycle_1) {
my $err = Net::Z3950::AsyncZ::ErrMsg->new($exitCode{$pid_1});
my $index = _getIndex($pid_1);
$errors[$index]->[0] = $err;
print $pid_1, " " if $__DBUG;
$self->_printError($err) if $__DBUG;
}
print "\nCycle 2\n" if $__DBUG;
$self->_showStats(\%cycle_2) if $__DBUG;
foreach my $pid_2 (keys %cycle_2) {
my $err = Net::Z3950::AsyncZ::ErrMsg->new($exitCode{$pid_2});
my $index = _getIndex($pid_2);
$errors[$index]->[1] = $err;
print $pid_2, " " if $__DBUG;
$self->_printError($err) if $__DBUG;
}
_saveErrors(@errors);
$__DBUG =0;
}
sub _printError {
my $self = shift;
my $err = shift;
my $errno = $err->{errno};
my $num = sprintf( "[%3d]",$errno);
print "$num ";
print $err->{msg} if $err->{msg};
print " NET" if $err->isNetwork();
print " SYSTEM" if $err->isSystem();
print " TRY AGAIN" if $err->isTryAgain();
print " SUCCESS" if $err->isSuccess();
print " --Z3950 ERROR " if $err->isZ3950();
print " --RETRY " if $err->doRetry();
print "\n";
}
sub childhandler {
while((my $retv = waitpid(-1,WNOHANG))>0) {
$exitCode{$retv} = $? >> 8;
$? = $exitCode{$retv}, die
if Net::Z3950::AsyncZ::ErrMsg::_abort($exitCode{$retv});
}
$SIG{CHLD} = \&childhandler;
}
use Carp;
sub DESTROY {
my $self = shift;
# Because each process uses this DESTROY method, we have to
# wait for the main loop to end before closing its error log
# and before killing any potential zombie processes
return if !$self->{unlooped};
print "DESTROY\n" if $__DBUG;
foreach my $pid (keys %exitCode) {
if( kill 0 => $pid) {
kill 9 => $pid if ($exitCode{$pid} < 0 || $exitCode{$pid} > 0);
print "killing $pid\n" if ($exitCode{$pid} < 0 || $exitCode{$pid} > 0) && $__DBUG;
}
}
kill KILL => $self->{monitor_pid} if $self->{monitor};
$self->{share} = undef if defined $self->{share};
sleep(1); # allow time for remaining killed processes to be reaped
}
sub _monitor {
my $self = shift;
$SIG{ALRM} = sub {
my $pid = getppid();
# print "killing: $pid\n";
kill HUP => $pid;
kill KILL => $$;
};
my $pid;
if($pid = fork) {
return $pid;
}
else {
die "Unable to fork" unless defined $pid;
alarm($self->{monitor});
while (1) { sleep(10); }
}
}
sub is_mem_left {
my $vmstat;
if($^O =~ /linux/) {
$vmstat = "vmstat 1 3 | ";
}
else {
$vmstat = "vmstat -S 1 3| ";
}
open VMSTAT, $vmstat or die "can't open vmstat";
my (@si,@so,$si_index,$so_index,@fields);
my $count=0;
while(<VMSTAT>) {
sleep(1); # helps to insure that vmstat produces 3 lines of output
s/^\s*// and s/\s*$//;
s/\s+/;/g;
if(/si/i && /so/i) {
@fields = split /;/;
for(my $i=0; $i< scalar @fields; $i++) {
$si_index = $i if $fields[$i] =~ /^si$/i;
$so_index = $i if $fields[$i] =~ /^so$/i;
}
}
elsif(/\d/) {
@fields = split /;/;
$si[$count] = $fields[$si_index];
$so[$count] = $fields[$so_index];
$count++;
}
}
close VMSTAT;
sleep 3 and return 1 if $count < 2; # fix for when vmstat returns after only one cycle
return 0 if abs($si[2] - $si[1]) >= 20;
return 0 if abs($so[2] - $so[1]) >= 20;
return 1;
}
1;
__END__
sub _showStats {
my $self = shift;
my $table = shift;
print "\nStats\n";
foreach my $pid (keys %$table) {
print "$pid:\t";
print "$table->{$pid}->[0]",
" result: $table->{$pid}->[1]",
" index: $table->{$pid}->[2]",
" retry index: $table->{$pid}->[3]";
print "\texit code: $exitCode{$pid}\n" if exists $exitCode{$pid};
}
}