/usr/local/CPAN/dvdrip/Video/DVDRip/Cluster/Pipe.pm
# $Id: Pipe.pm 2187 2006-08-16 19:34:38Z joern $
#-----------------------------------------------------------------------
# Copyright (C) 2001-2006 Jörn Reder <joern AT zyn.de>.
# All Rights Reserved. See file COPYRIGHT for details.
#
# This program is part of Video::DVDRip, which is free software; you can
# redistribute it and/or modify it under the same terms as Perl itself.
#-----------------------------------------------------------------------
package Video::DVDRip::Cluster::Pipe;
use Locale::TextDomain qw (video.dvdrip);
use base Video::DVDRip::Base;
use Event;
use constant NICE => -1;
use FileHandle;
use Carp;
use strict;
my $LIFO_SIZE = 40;
sub command { shift->{command} }
sub timeout { shift->{timeout} }
sub cb_finished { shift->{cb_finished} }
sub cb_line_read { shift->{cb_line_read} }
sub no_log { shift->{no_log} }
sub set_no_log { shift->{no_log} = $_[1] }
sub lifo { shift->{lifo} }
sub lifo_idx { shift->{lifo_idx} }
sub fh { shift->{fh} }
sub pid { shift->{pid} }
sub line_buffer { shift->{line_buffer} }
sub event_waiter { shift->{event_waiter} }
sub closed { shift->{closed} }
sub set_fh { shift->{fh} = $_[1] }
sub set_pid { shift->{pid} = $_[1] }
sub set_event_waiter { shift->{event_waiter} = $_[1] }
sub set_line_buffer { shift->{line_buffer} = $_[1] }
sub set_closed { shift->{closed} = $_[1] }
sub new {
my $class = shift;
my %par = @_;
my ( $command, $cb_line_read, $cb_finished, $timeout, $no_log )
= @par{ 'command', 'cb_line_read', 'cb_finished', 'timeout',
'no_log' };
$timeout ||= 30;
my $self = {
timeout => $timeout,
command => $command,
cb_line_read => $cb_line_read,
cb_finished => $cb_finished,
no_log => $no_log,
event_waiter => undef,
output_lifo => [ (undef) x $LIFO_SIZE ],
lifo_idx => -1,
};
return bless $self, $class;
}
sub open {
my $self = shift;
my $timeout = $self->timeout;
my $command = $self->command;
my $fh = FileHandle->new;
my $pid;
# we use fork & exec, because we want to have
# STDERR on STDOUT in the child.
$pid = open( $fh, "-|" );
croak "can't fork child process" if not defined $pid;
if ( not $pid ) {
# we are the child. Copy STDERR to STDOUT
close STDERR;
open( STDERR, ">&STDOUT" )
or croak "can't dup STDOUT to STDERR";
my $command = $self->command;
$command = "execflow $command" if $command !~ /execflow/;
exec($command)
or croak "can't exec program: $!";
}
# we are the parent and go further, holding the
# pid of our child in $pid
$self->set_fh($fh);
$self->set_pid($pid);
my %timeout_options;
%timeout_options = (
timeout => $timeout,
timeout_cb => sub { $self->timeout_expired },
)
if $timeout;
$self->set_event_waiter(
Event->io(
fd => $fh,
poll => 'r',
desc => "command execution",
nice => NICE,
cb => sub { $self->input( $_[1] ) },
%timeout_options,
)
);
$self->log(
3,
__x("execute command: {command} (timeout={timeout})",
command => $command,
timeout => $timeout
)
)
unless $self->no_log;
return $self;
}
sub add_lifo_line {
my $self = shift;
$self->{lifo}
->[ $self->{lifo_idx} = ( $self->{lifo_idx} + 1 ) % $LIFO_SIZE ]
= $_[0];
1;
}
sub output_tail {
my $self = shift;
my $tail = '';
my $lifo_idx = $self->{lifo_idx};
my $i = $lifo_idx;
while () {
last if not defined $self->{lifo}->[$i];
$tail .= $self->{lifo}->[ $i++ ];
$i = $i % $LIFO_SIZE;
last if $i == $lifo_idx;
}
return $tail;
}
sub timeout_expired {
my $self = shift;
$self->log( __ "Command cancelled due to timeout" )
unless $self->no_log;
kill 15, $self->pid;
$self->cancel;
1;
}
sub input {
my $self = shift;
my ($abort) = @_;
my $fh = $self->fh;
my $line_buffer;
# eof or abort?
if ( $abort or !sysread( $fh, $line_buffer, 4096 ) ) {
$self->close;
return;
}
# read next line
my ( $rc, $got_empty_line );
# get job's PID
my ($pid) = ( $line_buffer =~ /DVDRIP_JOB_PID=(\d+)/ );
if ( defined $pid ) {
$self->set_pid($pid);
$self->log( __x( "Job has PID {pid}", pid => $pid ) )
unless $self->no_log;
$line_buffer =~ s/DVDRIP_JOB_PID=(\d+)//;
$rc =~ s/DVDRIP_JOB_PID=(\d+)//;
}
# append line to lifo
$self->add_lifo_line($line_buffer);
# call the line_read callback, if we have one
my $cb_line_read = $self->cb_line_read;
&$cb_line_read($line_buffer) if $cb_line_read;
1;
}
sub close {
my $self = shift;
return if $self->closed;
my $fh = $self->fh;
$self->event_waiter->cancel if $self->event_waiter;
$self->set_event_waiter(undef);
close $fh;
waitpid $self->pid, 0;
$self->set_closed(1);
my $cb_finished = $self->cb_finished;
&$cb_finished() if $cb_finished;
$self->log( 5, "command finished: " . $self->command )
unless $self->no_log;
1;
}
sub cancel {
my $self = shift;
my $pid = $self->pid;
if ($pid) {
$self->log(
__x("Aborting command. Sending signal 1 to PID {pid}...",
pid => $pid
)
)
unless $self->no_log;
kill 1, $pid;
}
$self->close;
1;
}
1;