Verby::Dispatcher - Takes steps and executes them. Sort of like what make(1) is to a


Verby documentation Contained in the Verby distribution.

Index


Code Index:

NAME

Top

Verby::Dispatcher - Takes steps and executes them. Sort of like what make(1) is to a Makefile.

SYNOPSIS

Top

	use Verby::Dispatcher;
	use Verby::Config::Data; # or something equiv

	my $c = Verby::Config::Data->new(); # ... needs the "logger" field set

	my $d = Verby::Dispatcher->new;
	$d->config_hub($c);

	$d->add_steps(@steps);

	$d->do_all;

DESCRIPTION

Top

ATTRIBUTES

Top

If provided with a POE::Component::ResourcePool instance, that resource pool will be used to handle resource allocation.

The resources in Verby::Step method is used to declare the required resources for each step.

Returns the Set::Object that is used for internal bookkeeping of the steps involved.

Returns the Set::Object that is used to track which steps are satisfied.

The configuration hub that all contexts inherit from.

Defaults to an empty parameter set.

The global context objects.

Defaults to a derivation of config_hub.

METHODS

Top

new

Returns a new Verby::Dispatcher. Duh!

add_steps *@steps
add_step *@steps

Add a number of steps into the dispatcher pool.

Anything returned from depends in Verby::Step is aggregated recursively here, and added into the batch too.

do_all

Calculate all the dependencies, and then dispatch in order.


Verby documentation Contained in the Verby distribution.

#!/usr/bin/perl

package Verby::Dispatcher;
use Moose;

our $VERSION = "0.05";

use Set::Object;
use Verby::Context;
use Carp qw/croak/;
use Tie::RefHash;

use POE;

require overload;

has step_set => (
	isa => "Set::Object",
	is	=> "ro",
	default => sub { Set::Object->new },
);

has satisfied_set => (
	isa => "Set::Object",
	is	=> "ro",
	default => sub { Set::Object->new },
);

has cxt_of_step => (
	isa => "HashRef",
	is	=> "ro",
	default => sub {
		tie my %cxt_of_step, "Tie::RefHash";
		return \%cxt_of_step;
	},
);

has derivable_cxts => (
	isa => "HashRef",
	is	=> "ro",
	default => sub {
		tie my %derivable_cxts, "Tie::RefHash";
		return \%derivable_cxts;
	},
);

has config_hub => (
	isa => "Object",
	is	=> "rw",
	default => sub {
		require Verby::Config::Data;
		Verby::Config::Data->new;
	},
);

has global_context => (
	isa => "Object",
	is	=> "ro",
	lazy	=> 1,
	default => sub { $_[0]->config_hub->derive("Verby::Context") },
);

has resource_pool => (
	isa => "POE::Component::ResourcePool",
	is  => "ro",
	predicate => "has_resource_pool",
);

sub add_step {
	my $self = shift;

	my $steps = $self->step_set;

	foreach my $step (@_) {
		next if $steps->includes($step);

		$self->add_step($step->depends);

		(my $logger = $self->global_context->logger)->debug("adding step $step");
		$steps->insert($step);
	}
}

sub add_steps {
	my $self = shift;
	$self->add_step(@_);
}

sub get_cxt {
	my $self = shift;
	my $step = shift;

	$self->cxt_of_step->{$step} ||= Verby::Context->new($self->get_derivable_cxts($step));
}

sub get_derivable_cxts {
	my $self = shift;
	my $step = shift;

	@{ $self->derivable_cxts->{$step} ||= (
		$step->provides_cxt
			? [ Verby::Context->new($self->get_parent_cxts($step)) ]
			: [ $self->get_parent_cxts($step) ]
	)};
}

sub get_parent_cxts {
	my $self = shift;
	my $step = shift;

	if ( my @cxts = map { $self->get_derivable_cxts($_) } $step->depends ) {
		return @cxts;
	} else {
		return $self->global_context;
	}
}

sub create_poe_sessions {
	my ( $self ) = @_;

	my $g_cxt = $self->global_context;
	$g_cxt->logger->debug("Creating parent POE session");

	POE::Session->create(
		inline_states => {
			_start => sub {
				my ( $kernel, $heap ) = @_[KERNEL, HEAP];
				my $self = $heap->{verby_dispatcher};

				# FIXME
				# handle sigint

				my $g_cxt = $self->global_context;

				my $all_steps = $self->step_set;
				my $satisfied = $self->satisfied_set;

				my $pending = $all_steps->difference( $satisfied );

				foreach my $step ( $pending->members ) {
					$g_cxt->logger->debug("Creating POE session for step $step");

					POE::Session->create(
						inline_states => {
							_start => sub {
								my ( $kernel, $session) = @_[KERNEL, SESSION];

								$kernel->sig("VERBY_STEP_FINISHED" => "step_finished");
								$kernel->refcount_increment( $session->ID, "unresolved_dependencies" );

								$kernel->yield("try_executing_step");
							},
							step_finished => sub {
								my ( $kernel, $heap, $done ) = @_[KERNEL, HEAP, ARG1];

								my $deps = $heap->{dependencies};

								if ( $deps->includes($done) ) {
									$deps->remove( $done );
									$kernel->yield("try_executing_step") unless $deps->size;
								}
							},
							try_executing_step => sub {
								my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];

								return if $heap->{dependencies}->size; # don't run if we're waiting
								return if $heap->{ran}++; # don't run twice

								$heap->{g_cxt}->logger->debug("All dependencies of '$step' have finished, starting");

								$kernel->sig("VERBY_STEP_FINISHED"); # we're no longer waiting for other steps to finish
								$kernel->refcount_decrement( $session->ID, "unresolved_dependencies" );

								if ( my $pool = $heap->{resource_pool} and my @req = $heap->{step}->resources ) {
									$heap->{resource_request} = $pool->request(
										params => { @req },
										event  => "execute_step",
									);
								} else {
									$kernel->call( $session, "execute_step" );
								}
							},
							execute_step => sub {
								my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];

								# this may create child sessions. If it doesn't this session will go away
								$heap->{verby_dispatcher}->start_step( $heap->{step}, \@_ );
							},
							_stop => sub {
								my ( $kernel, $heap ) = @_[KERNEL, HEAP];
								my $step = $heap->{step};

								if ( my $request = delete $heap->{resource_request} ) {
									$request->dismiss;
								}

								$heap->{g_cxt}->logger->info("step $step has finished.");

								$_->() for @{ $heap->{post_hooks} };

								return $step;
							},
							DIE     => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") },
							_child  => sub { $_[HEAP]{g_cxt}->logger->debug("Step $_[HEAP]{step} _child event: $_[ARG0]") },
						},
						heap => {
							%{ $heap },
							step         => $step,
							dependencies => Set::Object->new( $step->depends )->difference($satisfied),
							ran          => 0,
							post_hooks   => [],
						},
					);
				}
			},
			_child => sub {
				my ( $kernel, $session, $heap, $type, $step ) = @_[KERNEL, SESSION, HEAP, ARG0, ARG2];

				if ( $type eq "lose" ) {
					$heap->{satisfied}->insert($step);
					$kernel->signal( $session, "VERBY_STEP_FINISHED", $step );
				}
			},
			DIE   => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") },
			_stop => sub { $_[HEAP]{g_cxt}->logger->debug("parent POE session closing") },
		},
		heap => {
			verby_dispatcher => $self,
			g_cxt            => $g_cxt, # convenience
			satisfied        => $self->satisfied_set,
			( $self->has_resource_pool ? ( resource_pool => $self->resource_pool ) : () ),
		}
	);
}

sub do_all {
	my $self = shift;
	$self->create_poe_sessions;
	$self->global_context->logger->debug("Starting POE main loop");
	$poe_kernel->run;
}

sub start_step {
	my ( $self, $step, $poe ) = @_;

	my $g_cxt = $self->global_context;
	my $cxt = $self->get_cxt($step);

	if ($step->is_satisfied($cxt, $poe)){
		$g_cxt->logger->debug("step $step has already been satisfied, running isn't necessary.");
		return;
	}

	$g_cxt->logger->debug("starting step $step");
	$step->do($cxt, $poe);
}

sub _set_members_query {
	my $self = shift;
	my $set = shift;
	return wantarray ? $set->members : $set->size;
}

sub steps {
	my $self = shift;
	$self->_set_members_query($self->step_set);
}

sub is_satisfied {
	my $self = shift;
	my $step = shift;

	croak "$step is not registered at all"
		unless $self->step_set->contains($step);

	$self->satisfied_set->contains($step);
}

__PACKAGE__

__END__