| Makefile-Parallel documentation | Contained in the Makefile-Parallel distribution. |
Makefile::Parallel - A distributed parallel makefile
This module should not be called directly. Please see the perldoc of the pmake program on the /examples directory of this distribution.
Main function. Accepts a file to parse and a hash reference with options.
TODO: Document options
Tries to recover the journal of the last makefile run.
This function is responsible to clean all the temporary files created by the PBS system. It should be used only on the PBS scheduler method.
Loop it baby :D
This function is responsible of reaping the jobs that are finnished. If the job needs to run something at the end (example, find i <- grep | awk...) it is executed and the job queue is expanded.
This function goes through the finnished job and tries to find asShell commands to run, doing all the expands necessary
This function goes through the finnished job and tries to find asPerl commands to run, doing all the expands necessary
this function evaluates a perl action and retruns a list of strings. the action can:
.return a ARRAY reference, .print a list of lines to STDOUT (to be splited end chomped) .or return a string (to be splited and chomped)
This function is responsible of expanding all the jobs when a variable is evaluated. It expands both forks and joins.
Print a pretty report bla bla bla
This function is responsible for dispatching the jobs that can run.
This function checks if the specified job is already done in the finnished list.
This sub is called at the program exit
Saves the scheduler state to disk.
Calculates the MD5 of the current makefile
This one finds out if a job can run (all the dependencies are met).
Launch a process (really??)
Builds a preety graphviz file after the execution of the makefile
This function is called everytime the user send a SIGINT to this process. The objective is to kill all the running processes and wait for them to die.
Ruben Fonseca, <root@cpan.org>
Alberto Simões <ambs@cpan.org>
José João Almeida <jj@di.uminho.pt>
Please report any bugs or feature requests to
bug-makefile-parallel@rt.cpan.org, or through the web interface at
http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Makefile-Parallel.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.
Copyright 2006-2011 Ruben Fonseca, et al, all rights reserved.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| Makefile-Parallel documentation | Contained in the Makefile-Parallel distribution. |
package Makefile::Parallel; use Makefile::Parallel::Grammar; use Log::Log4perl; use Proc::Simple; use Clone qw(clone); use Time::HiRes qw(gettimeofday tv_interval); use Time::Interval; use Time::Piece::ISO; use GraphViz; use Digest::MD5; use Data::Dumper; use warnings; use strict; our $VERSION = '0.08';
# Module Stuff our @ISA = qw(Exporter); our %EXPORT_TAGS = ( 'all' => [ qw() ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( process_makefile ); my $logger; my $queue; my $running = {}; # Holds the running ID's (ID -> info) my $finnished = {}; # Holds the finnished ID's (ID -> info) my $scheduler; # Holds the scheduler engine my $counter = 0; # Holds the order of the executed processes my $filename; # Holds the filename of the makefile my $debug; # TRUE if we got debug enabled # This stuff deals with the interruption (Ctrl + C) $SIG{INT} = \&process_interrupt; my $interrupted = 0;
sub process_makefile { my ($file, $options) = @_; # Set sensible defaults $options ||= {}; $options->{scheduler} ||= 'LOCAL'; $options->{local} ||= '1'; # Default CPU's on local mode $options->{dump} ||= 0; $options->{clean} ||= 0; $options->{clock} ||= 10; $options->{debug} ||= 0; $options->{continue} ||= 0; # TODO: Give more flexibility if($options->{scheduler} eq 'PBS') { use Makefile::Parallel::Scheduler::PBS; $scheduler = Makefile::Parallel::Scheduler::PBS->new(); $scheduler->{mail} = $options->{mail} if $options->{mail}; } else { use Makefile::Parallel::Scheduler::Local; $scheduler = Makefile::Parallel::Scheduler::Local->new({ max => $options->{local} }); } # Debug settings if($options->{debug}) { # Clean logs... ## FIXME - do not rely on OS. `rm -rf log/`; mkdir "log"; my $conf = q( log4perl.category.PMake = DEBUG, Logfile, Screen log4perl.appender.Logfile = Log::Log4perl::Appender::File log4perl.appender.Logfile.filename = log/makefile.log log4perl.appender.Logfile.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Logfile.layout.ConversionPattern = [%d] [%p] %F(%L) %m%n log4perl.appender.Screen = Log::Log4perl::Appender::Screen log4perl.appender.Screen.stderr = 0 log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n ); Log::Log4perl::init(\$conf); $debug = 1; } else { my $conf = q( log4perl.category.PMake = INFO, Screen log4perl.appender.Screen = Log::Log4perl::Appender::Screen log4perl.appender.Screen.stderr = 0 log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n ); Log::Log4perl::init(\$conf); $debug = 0; } $logger = Log::Log4perl::get_logger("PMake"); # Parse the file $logger->info("Trying to parse \"$file\""); $queue = Makefile::Parallel::Grammar->parseFile($file); if($queue) { $logger->info("Parse ok.. proceeding to plan the scheduling"); } else { $logger->error("Parse failed, aborting..."); return } $filename = $file; # Copy perl routines to perl actions if(defined $queue->[-1]{perl}) { for my $job (@{$queue}) { if(defined $job->{action}[0]{perl}) { $job->{perl} = $queue->[-1]{perl}; } } delete $queue->[-1]; } # Dump if the user want it die Dumper $queue if($options->{dump}); # Clean the temporary files if we are PBS clean() if($options->{clean}); # Recover the journal if the user wants to continue journal_recover() if ($options->{continue}); # Enter the loop while(1) { # $logger->debug("New loop starting"); loop(); # $logger->debug("Loop processed, sleeping"); sleep $options->{clock}; } }
sub journal_recover { my $journal = do "$filename.journal" or die "Can't open $filename.journal: $!"; my $md5 = calc_makefile_md5(); if($journal->{md5} ne $md5) { $logger->warn("MD5 Check Failed... The original Makefile was changed!! CONTINUE AT YOUR OWN RISK!"); } # Restore the finnished list $finnished = $journal->{finnished}; $counter = $journal->{counter}; # Ignore jobs already concluded # 1a passagem - cálculo das variáveis for my $job (@{$queue}) { next unless $job; if(is_finnished($job->{rule}{id})) { # If we got asShell to run, run it! find_and_run_asShell($job->{rule}{id}); # If we got asPerl to run, run it! find_and_run_asPerl($job->{rule}{id}); } } # 2a passagem - remoção dos já executados my $new_queue = []; for my $job (@{$queue}) { next unless $job; push @{$new_queue}, $job unless is_finnished($job->{rule}{id}); } $queue = $new_queue; $logger->warn("Journal recovered.. Cross your fingers now..."); }
sub clean { $scheduler->clean($queue); $logger->info("Temporary files cleaned"); exit(0); }
sub loop { reap_dead_bodies(); dispatch(); write_journal(); }
sub reap_dead_bodies { # Search all running procs for someone who died for my $runid (keys %{$running}) { if($scheduler->poll($running->{$runid}, $logger)) { # Still running } else { # No running anymore, remove from running and save # Save time stats my $t1 = [gettimeofday]; my $elapsed = tv_interval($running->{$runid}->{starttime}, $t1); $running->{$runid}->{stoptime} = $t1; $elapsed = parseInterval(seconds => int($elapsed), Small => 1); $running->{$runid}->{elapsed} = $elapsed; # Give user some feedback $logger->info("Process " . $scheduler->get_id($running->{$runid}) . " (" . $running->{$runid}->{rule}->{id} . ") has terminated [$elapsed]"); $finnished->{$runid} = $running->{$runid}; delete $running->{$runid}; # Don't do nothing more if it was interrupted next if($finnished->{$runid}{interrupted}); # Verify the exit status $scheduler->get_dead_job_info($finnished->{$runid}); if($finnished->{$runid}{exitstatus} && !$finnished->{$runid}{interrupted}) { # Pumm!! Cancelar tudo! $logger->fatal("Process " . $scheduler->get_id($finnished->{$runid}) . " exited with exit status " . $finnished->{$runid}{exitstatus} . "! Aborting all queue..."); process_interrupt(1); # Forced; $finnished->{$runid}{fatal} = 1; # To graphviz later... } # If we got asShell to run, run it! find_and_run_asShell($runid); # If we got asPerl to run, run it! find_and_run_asPerl($runid); } } }
sub find_and_run_asShell { my ($runid) = @_; for my $action (@{$finnished->{$runid}->{action}}) { if($action->{asShell} && !(defined $finnished->{__var__}->{$action->{def}})) { $logger->info("Running shell action $action->{asShell}"); $finnished->{__var__}->{$action->{def}} = []; open P, "$action->{asShell} |"; while(<P>) { chomp; $logger->warn("Return value from the shell action is not a integer") unless /^\d+$/; push @{$finnished->{__var__}->{$action->{def}}}, $_; } close P; # Now expand the queue expand_forks($action->{def}); } } }
sub find_and_run_asPerl { my ($runid) = @_; for my $action (@{$finnished->{$runid}->{action}}) { if($action->{asPerl} && !(defined $finnished->{__var__}->{$action->{def}})) { $logger->info("Running perl action $action->{asPerl}"); $finnished->{__var__}->{$action->{def}} = []; $finnished->{__var__}{$action->{def}} = paction_list($action->{asPerl}); # Now expand the queue expand_forks($action->{def}); } } }
sub paction_list{ my $act=shift; my $var=""; my $final=[]; open(A,'>', \$var); my $old= select A; my $res = eval( "package main; no strict; " . $act ); die $@ if $@; close A; select $old; if (ref($res) eq "ARRAY"){ $final = $res; } elsif($var =~ /\S/) { for(split("\n",$var)){ push (@$final, $_) if /\S/; } } else{ for(split("\n",$res)){ push (@$final, $_) if /\S/; } } $final; }
sub expand_forks { my ($var) = @_; my $values = $finnished->{__var__}->{$var}; # For all queue items that has a $var, expand my $index = -1; for my $job (@{$queue}) { $index++; next unless $job; if($job->{rule}{vars} && (grep { $_ eq "\$$var" } @{$job->{rule}{vars}} )) { $logger->info("Found a fork on $job->{rule}->{id}. Expanding..."); # Expand, expand, expand $job->{rule}{vars} = [ grep { $_ ne "\$$var" } @{$job->{rule}{vars} }]; delete $job->{rule}{vars} unless scalar @{$job->{rule}{vars}}; delete $queue->[$index]; my $count = 0; my @added_jobs = (); for my $index (@{$values}) { my $newjob = clone($job); $count++; # Actualiazr o id $newjob->{rule}{id} .= $index; # Actualizar a linha a executar for my $act (@{$newjob->{action}}) { if($act->{shell}){ $act->{shell} =~ s/\$$var\b/$index/g; } elsif($act->{perl}){ $act->{perl} =~ s/\$$var\b/$index/g; } } # Expand pipelines for my $dep (@{$newjob->{depend_on}}) { if ($dep->{vars} && (grep { $_ eq "\$$var"} @{$dep->{vars}} )) { # Expand the dependencie $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}} ]; delete $dep->{vars} unless scalar @{$dep->{vars}}; $dep->{id} .= $index; } } push @{$queue}, $newjob; push @added_jobs, $newjob->{rule}{id}; } $logger->info("Expanded.. Created new $count jobs: @added_jobs"); } # Find joiners my $pos = 0; for my $dep (@{$job->{depend_on}}) { if ($dep->{vars} && (grep { $_ eq "\$$var" } @{$dep->{vars}} )) { $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}}]; # Expand the dependencies delete $job->{depend_on}->[$pos]; for my $index (@{$values}) { my @vars = (scalar @{$dep->{vars}})?(vars => $dep->{vars}):(); push @{$job->{depend_on}}, { @vars, id => $dep->{id} . $index }; } } $pos++; } } # Now find constructors like @var for my $job (@{$queue}) { next unless $job; # Search on actions for my $action (@{$job->{action}}) { if($action->{shell} && $action->{shell} =~ /\@$var\b/) { my $string = ''; map { $string .= "$_ " } @{$values}; $action->{shell} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been action expanded with $string"); } elsif($action->{perl} && $action->{perl} =~ /\@$var\b/) { my $string = join(",", map { "q{$_}" } @{$values}); $action->{perl} =~ s/\@$var\b/($string)/g; $logger->info("The job $job->{rule}->{id} has been action expanded with ($string)"); } } # Search on asShell for my $action (@{$job->{action}}) { if($action->{asShell} && $action->{asShell} =~ /\@$var\b/) { my $string = ''; map { $string .= "$_ " } @{$values}; $action->{asShell} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been shell expanded with $string"); } } # Search on asPerl for my $action (@{$job->{action}}) { if($action->{asPerl} && $action->{asPerl} =~ /\@$var\b/) { my $string = 'qw/'; map { $string .= "$_ " } @{$values}; $string .= "/"; $action->{asPerl} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been Perl expanded with $string"); } } } }
sub report { $logger->info("Creating HTML report"); open REPORT, ">$filename.html" or die "Can't create $filename.html"; print REPORT "<table>\n"; print REPORT "<tr><td>ID</td><td>Start Time</td><td>End Time</td><td>Elapsed</td></tr>\n"; my ($id,$start,$stop,$interval); for my $job (sort sortcallback keys %{$finnished}) { next unless $finnished->{$job}{rule}; $id = $finnished->{$job}{rule}{id}; $start = (localtime($finnished->{$job}{starttime}[0]))->iso; $stop = (localtime($finnished->{$job}{stoptime}[0]))->iso; $interval = $finnished->{$job}{realtime} || $finnished->{$job}{elapsed}; print REPORT "<tr><td>$id</td><td>$start</td><td>$stop</td><td>$interval</td></tr>\n"; } print REPORT "</table>\n"; close REPORT; } sub sortcallback { my $foo = $finnished->{$a}; my $bar = $finnished->{$b}; return 0 if(!$foo->{order} && !$bar->{order}); return -1 unless $foo->{order}; return 1 unless $bar->{order}; return $foo->{order} <=> $bar->{order}; }
sub dispatch { my $new_queue = []; # If we aren't running nothing and ($interrupted || $queue empty) exit if((scalar keys %{$running}) == 0 && ($interrupted || (scalar @{$queue} == 0))) { $logger->info("Terminating the pipeline"); at_exit(); } # We don't wanna dispatch NOTHING if we have interrupted return if $interrupted; for my $job (@{$queue}) { next unless $job; # Find if the job dependencies are finnished if(can_run_job($job->{rule}->{id}, $job->{depend_on})) { $logger->info(Dumper($job)) unless $job->{rule}{id}; $logger->info("The job \"" . $job->{rule}->{id} . "\" is ready to run. Launching"); launch($job); $job->{starttime} = [gettimeofday]; # Jump to the next job in queue next; } # This job can't run yet.. add it to the new queue push @{$new_queue}, $job; } # Return the new queue, the jobs that can't be dispatched yet $queue = $new_queue; }
sub is_finnished { my ($jobid) = @_; for my $job (keys %{$finnished}) { next unless $finnished->{$job}{rule}; return 1 if($finnished->{$job}{rule}{id} eq $jobid); } return 0; }
sub at_exit { graphviz(); report(); write_journal(); exit(0); }
sub write_journal { my $journal = {}; $journal->{md5} = calc_makefile_md5(); # Pass all interrupted and failled processes back to queue my $acabados = clone($finnished); for my $job (keys %{$acabados}) { next unless $acabados->{$job}{rule}; if($acabados->{$job}{fatal} || $acabados->{$job}{interrupted}) { delete $acabados->{$job}; } } delete $acabados->{__var__}; $journal->{finnished} = $acabados; $journal->{counter} = $counter; open F, ">$filename.journal"; print F (Dumper $journal); close F; }
sub calc_makefile_md5 { open F, "<$filename"; my $ctx = Digest::MD5->new; $ctx->addfile(*F); close F; return $ctx->b64digest; }
sub can_run_job { my ($id, $deps) = @_; return 0 unless $scheduler->can_run(); for my $dep (@{$deps}) { next unless $dep; next unless $dep->{id}; return 0 unless defined $finnished->{$dep->{id}} } return 1; }
sub launch { my ($job) = @_; # Launch the process $scheduler->launch($job, $debug); $job->{order} = $counter++; # Save in the running list $running->{$job->{rule}->{id}} = $job; $logger->info("Launched \"" . $job->{rule}->{id} . "\" (" . $scheduler->get_id($job) . ")"); }
sub graphviz { my $time_for = {}; # Holds the walltime for the job id my $g = GraphViz->new(rankdir => 1); $logger->info("Creating GraphViz nodes"); # Create all nodes for my $job (keys %{$finnished}) { next unless $finnished->{$job}{rule}; my $id = $finnished->{$job}{rule}{id}; $time_for->{$id} = $finnished->{$job}{realtime} || $finnished->{$job}{elapsed}; my $color = 'black'; $color = 'red' if $finnished->{$job}{fatal}; $color = 'yellow' if $finnished->{$job}{interrupted}; $g->add_node($id, label => "$id\n$time_for->{$id}" , shape => 'box', color => $color); } $logger->info("Creating GraphViz edges"); # Create edges for my $job (keys %{$finnished}) { next unless $finnished->{$job}{rule}; for my $dep (@{$finnished->{$job}{depend_on}}) { next unless $dep; $g->add_edge($dep->{id}, $finnished->{$job}{rule}{id}); } } open F, ">$filename.ps"; print F $g->as_ps; close F; open F, ">$filename.dot"; print F $g->as_text; close F; $logger->info("GraphViz file created on $filename.ps"); }
sub process_interrupt { my $forced = shift; $forced = 0 if $forced eq "INT"; # Hack O:-) if(!$interrupted || $forced) { if(!$forced) { $logger->warn("Interrupt pressed, enter QUIT to quit, other thing to continue"); my $linha = <STDIN>; chomp($linha); if($linha ne 'QUIT') { $logger->info("Interrupt canceled... Keeping the loop"); return; } } $interrupted = 1; $logger->info("Interrupt pressed, cleaning all the running processes"); for my $runid (keys %{$running}) { $logger->info("Terminating job " . $scheduler->get_id($running->{$runid})); $running->{$runid}{interrupted} = 1; $scheduler->interrupt($running->{$runid}); } } else { $logger->warn("Interrupt already called, please wait while cleaning"); } }
1; # End of Makefile::Parallel