Win32::ProcFarm::PerpetualPool - manages a pool of child processes for perpetual jobs


Win32-ProcFarm documentation Contained in the Win32-ProcFarm distribution.

Index


Code Index:

NAME

Top

Win32::ProcFarm::PerpetualPool - manages a pool of child processes for perpetual jobs

SYNOPSIS

Top

	use Win32::ProcFarm::PerpetualPool;

	$Pool = Win32::ProcFarm::PerpetualPool->new($poolsize, $portnum, $scriptname, Win32::GetCwd,
		command => 'whatever',
		list_check_intvl => 30,
		exit_check_intvl => 5,
		list_sub => sub { return ('Fred', 'Julie', 'Joe') },
		exit_sub => sub { return -e 'killme'; },
		result_sub => sub { print join(', ' @_)."\n"; },
	);

	$Pool->start_pool(0.1);

DESCRIPTION

Top

Installation instructions

This installs with MakeMaker as part of Win32::ProcFarm.

To install via MakeMaker, it's the usual procedure - download from CPAN, extract, type "perl Makefile.PL", "nmake" then "nmake install". Don't do an "nmake test" because the I haven't written a test suite yet.

More usage instructions

This is a version of Win32::ProcFarm::Pool designed for continuous operation. You supply a single command name and a subroutine that returns a list of keys. The keys are passed as the sole parameter to the command (it is presumed that the child process can do whatever needs to be done based on that single key). The subroutine that returns the list of keys will be periodically executed (every 120 seconds by default, but adjustable via list_intvl) and the running list updated as needed. Whenever a job finishes, that key is added back onto the end of the waiting pool.

METHODS

Top


Win32-ProcFarm documentation Contained in the Win32-ProcFarm distribution.

#############################################################################
#
# Win32::ProcFarm::PerpetualPool - manages a pool of child processes for perpetual jobs
#
# Author: Toby Everett
# Revision: 2.15
# Last Change: Patched to fix problems when Win32::GetTickCount > 2**31
#############################################################################
# Copyright 1999, 2000 Toby Everett.  All rights reserved.
#
# This file is distributed under the Artistic License. See
# http://www.ActiveState.com/corporate/artistic_license.htm or
# the license that comes with your perl distribution.
#
# For comments, questions, bugs or general interest, feel free to
# contact Toby Everett at teverett@alascom.att.com
#############################################################################


use Win32::ProcFarm::Pool;
use Win32::ProcFarm::TickCount;

package Win32::ProcFarm::PerpetualPool;

use strict;
use vars qw($VERSION @ISA);

$VERSION = '2.15';

@ISA = qw(Win32::ProcFarm::Pool);

sub new {
	my $class = shift;

	my($num_threads, $port_num, $script, $curdir, %options) = @_;
	my $self = $class->SUPER::new($num_threads, $port_num, $script, $curdir, %options);

	foreach my $i (qw(command list_check_intvl list_sub exit_check_intvl exit_sub)) {
		exists $options{$i} and $self->{$i} = $options{$i};
	}

	$self->{current_hash} = {};
	$self->{death_hash} = {};
	$self->{next_list_check} = Win32::GetTickCount();
	$self->{next_exit_check} = Win32::GetTickCount();
	$self->{state} = 'running';
	defined $self->{list_check_intvl} or $self->{list_check_intvl} = 60;
	defined $self->{exit_check_intvl} or $self->{exit_check_intvl} = 2;

	bless $self, $class;
	return $self;
}

sub list_check {
	my $self = shift;

	$self->{state} eq 'running' or return;

	my %temp;
	@temp{$self->{list_sub}->($self)} = ();

	foreach my $i (keys %temp) {
		if (!exists $self->{current_hash}->{$i}) {
			$self->add_waiting_job($i, $self->{command}, $i);
			$self->{current_hash}->{$i} = undef;
		}
	}

	foreach my $i (keys %{$self->{current_hash}}) {
		if (!exists $temp{$i}) {
			$self->{death_hash}->{$i} = undef;
			delete $self->{current_hash}->{$i};
		}
	}
}

sub exit_check {
	my $self = shift;

	$self->{state} eq 'running' or return;

	ref($self->{exit_sub}) eq 'CODE' or return;
	if ($self->{exit_sub}->($self)) {
		$self->{waiting_pool} = [];
		$self->{death_hash} = {};
		$self->{state} = 'stopping';
	}
}

sub start_pool {
	my $self = shift;
	my($sleep) = @_;

	while (1) {
		$self->cleanse_and_dispatch();
		if ($self->{state} eq 'stopping' && ($self->count_waiting + $self->count_running) == 0 ) {
			$self->{state} = 'stopped';
			last;
		}
		$sleep and Win32::Sleep($sleep*1000);
	}
}

sub cleanse_and_dispatch {
	my $self = shift;

	if (Win32::ProcFarm::TickCount::compare($self->{next_list_check}, Win32::GetTickCount()) == -1) {
		$self->list_check();
		$self->{next_list_check} = Win32::GetTickCount() + $self->{list_check_intvl} * 1000;
	}

	if (Win32::ProcFarm::TickCount::compare($self->{next_exit_check}, Win32::GetTickCount()) == -1) {
		$self->exit_check();
		$self->{next_exit_check} = Win32::GetTickCount() + $self->{exit_check_intvl} * 1000;
	}

	return $self->SUPER::cleanse_and_dispatch;
}

sub cleanse_thread {
	my $self = shift;
	my($thread) = @_;

	$thread->{Parent}->get_state eq 'fin' or return 0;

	my @temp = $thread->{Parent}->get_retval;
	if (ref($self->{result_sub}) eq 'CODE') {
		$self->{result_sub}->($thread->{key}, @temp);
	}

	if ($self->{state} eq 'running') {
		$self->add_waiting_job($thread->{key}, $self->{command}, $thread->{key});
	}

	$thread->{key} = undef;
	return 1;
}

sub dispatch_job {
	my $self = shift;
	my($thread) = @_;

	$thread->{Parent}->get_state eq 'idle' or return 0;
	my $job = $self->get_next_job() or return 0;
	$thread->{Parent}->execute($job->{command}, @{$job->{params}});
	$thread->{key} = $job->{key};
	return 1;
}

sub get_next_job {
	my $self = shift;

	my $job;
	while ($job = shift(@{$self->{waiting_pool}})) {
		if (exists $self->{death_hash}->{$job->{key}}) {
			delete $self->{death_hash}->{$job->{key}};
			next;
		}
		return $job;
	}
	return undef;
}

1;