| XML-Filter-Essex documentation | Contained in the XML-Filter-Essex distribution. |
XML::Handler::Essex::Threaded - Threading support for perls >= 5.8.0
Loaded only if XML::Handler::Essex detects that threads.pm is loaded
Copyright 2002, R. Barrie Slaymaker, Jr., All Rights Reserved
You may use this module under the terms of the BSD, Artistic, oir GPL licenses, any version.
Barrie Slaymaker <barries@slaysys.com>
Copyright 2003, R. Barrie Slaymaker, Jr., All Rights Reserved
You may use this module under the terms of the BSD, Artistic, oir GPL licenses, any version.
Barrie Slaymaker <barries@slaysys.com>
| XML-Filter-Essex documentation | Contained in the XML-Filter-Essex distribution. |
package XML::Handler::Essex::Threaded; $VERSION = 0.000_1;
@ISA = qw( XML::Handler::Essex ); use strict; use threads::shared; warn "Essex: threads enabled\n"; sub DESTROY { my $self = shift; return unless $self->{IsChild}; warn "Essex $self: DESTROYing parent\n" if debugging; { lock @{$self->{Events}}; my @event : shared = ( SEPPUKU ); @{$self->{Events}} = ( \@event ); threads::shared::cond_signal( @{$self->{Events}} ); } warn "Essex $self: waiting for child to exit\n" if debugging; lock @{$self->{Results}}; until ( @{$self->{Results}} ) { warn "Essex $self: cond_wait on Results\n" if debugging; threads::shared::cond_wait( @{$self->{Results}} ); } warn "Essex $self: child exited\n" if debugging; } # This is design to have the parent block until the child finishes # handling the event. This is because the child (and especially filters # downstream of the child) are likely to be non-threadsafe. So we only # allow the child to run when there's an event to process, and we block # until it is finish. sub _start_child { my $self = shift; warn "Essex $self: starting child thread\n" if debugging; # the = [] is to give threads::shared something to chew on that # won't make it throw up. # We pass the type of the event or # result in other variables because they don't need to be shared # and sharing them can break bless()ing. my @events: shared; my @results: shared; $self->{Events} = \@events; $self->{Results} = \@results; # The BOD sync loop in threaded_execute will send us a thread started # "result". Ignore it, other than to sync on it. lock @{$self->{Results}}; $self->{Thread} = threads->create( $self->can( "threaded_execute" ), $self ); warn "Essex $self: waiting for child thread to start\n" if debugging; until ( @{$self->{Results}} ) { warn "Essex $self: cond_wait on Results\n" if debugging; threads::shared::cond_wait( @{$self->{Results}} ); } @{$self->{Results}} = (); warn "Essex $self: child thread started\n" if debugging; } ## This next sub works around a limitation in 5.8.0's share() that ## seems to prevent me from using it; I get core dumps when I try. ## TODO: try again with 5.8.0. sub _r_share { my $t = reftype $_[0]; return if ( tied $_[0] or "" ) eq "threads::shared::tie"; unless ( $t ) { my $foo: shared = $_[0]; $_[0] = $foo; } elsif ( $t eq "HASH" ) { _r_share( $_ ) for values %{$_[0]}; my %foo: shared = %{$_[0]}; $_[0] = \%foo; } elsif ( $t eq "ARRAY" ) { my @foo: shared = map _r_share( $_ ), @{$_[0]}; $_[0] = \@foo; } elsif ( $t eq "SCALAR" ) { my $foo: shared = ${$_[0]}; $_[0] = \$foo; } else { Carp::confess "Essex: can't share $t"; } } sub _send_event_to_child { my $self = shift; $self->_start_child unless $self->{Thread}; { lock @{$self->{Events}}; for ( @{$self->{PendingEvents}}, \@_ ) { _r_share( $_->[1] ); my @event: shared = @$_; warn "Essex $self: sending $event[0] => CHILD \n" if debugging; push @{$self->{Events}}, \@event; } @{$self->{PendingEvents}} = (); warn "Essex $self: signalling Events\n" if debugging; threads::shared::cond_signal( @{$self->{Events}} ); } # unlock # Receive result from child. warn "Essex $self: waiting for result from child\n" if debugging; lock @{$self->{Results}}; until (@{$self->{Results}} ) { warn "Essex $self: cond_wait on Results\n" if debugging; threads::shared::cond_wait( @{$self->{Results}} ); } my ( $result_type, $result ); do { ( $result_type, $result ) = ( shift( @{$self->{Results}} ), shift( @{$self->{Results}} ), ); warn "Essex $self: ", @{$self->{Results}} ? "ignoring" : "got", " $result_type result ", defined $result ? "'$result'" : "undef", " from child\n" if debugging; } while @{$self->{Results}}; $result_type eq "exception" ? die $result : return $result; } sub _send_result_to_parent { my $self = shift; return unless defined $self->{PendingResultType}; warn "Essex $self: sending PARENT <= $self->{PendingResultType} result ", defined $self->{PendingResult} ? "'$self->{PendingResult}'" : "undef", "\n" if debugging; lock @{$self->{Results}}; _r_share( $self->{PendingResult} ); push @{$self->{Results}}, @{$self}{qw( PendingResultType PendingResult)}; @$self{"PendingResultType","PendingResult"} = (); warn "Essex $self: signalling Results\n" if debugging; threads::shared::cond_signal( @{$self->{Results}} ); } # NOTE: returns \@event, whereas _send_event_to_child takes @event. # This is to speed the queue fudging that threaded_execute does on # start_document. sub _recv_event_from_parent { my $self = shift; my $event; die EOD . "\n" if $self->{PendingResultType} eq "end_document"; warn "Essex $self: getting event from parent\n" if debugging; lock @{$self->{Events}}; # only send a result if there's one to send and there are no # events waiting. # TODO when and if we allow multiple events to stack up: # If there are events waiting, then we need to # wait until the last event to send the result. $self->_send_result_to_parent; # if defined $self->{PendingResultType};# && ! @{$self->{Events}}; # We don't block if there're already events; this is used # at start_document because the execute routine scans the input # until it sees the start_document, then queues up a new # set_document_locator and start_document. until ( @{$self->{Events}} ) { warn "Essex $self: cond_wait on Events\n" if debugging; threads::shared::cond_wait( @{$self->{Events}} ); } ## TODO: Lock Events? The parent should not be running now, ## it should be waiting for results. return $self->SUPER::_recv_event_from_parent; } # Result handling: # # We track the event just received so threaded_execute() can tell what # to do when the main routine exits or throws an exception. If it was # # We don't know if execute() actually does anything, or if it will do # anything if entered a second time, so we need to return the # result here. # # BOD notes. We wait for the BOD here and not in _recv_event_from_parent # because we can't be sure that main() will ever call _recv_event_from_parent. # It might throw an exception or return a result instead. This has the # side effect of making it seem like (to the programmer) the child thread # is respawned each start_document() event: main() will not be entered until # after the BOD is sent, and that's sent in start_document(). Whether or # not that's good or bad depends on what's happening in main(), but it's # a necessary implementation detail. It also reserves the right to actually # implement threading that way one day, either as an option for the caller # or due to some shift in perl's threading implementation. sub threaded_execute { my $self = shift; $self->{IsChild}; threads->self->detach; # This thread only exits if it gets an undefined eventtype. This is # to avoid the extreme cost of starting a thread on every document if # the caller is handling a series of documents. my $pending_end_document_result = "Essex: default end_document result"; $self->{PendingResultType} = $self->{PendingResult} = "thread started"; EXECUTE: while (1) { while (1) { # Wait for BOD. if ( $self->{PendingResultType} eq "end_document" ) { $self->{PendingResultType} = "end_document again"; $self->{PendingResult} = $pending_end_document_result; $pending_end_document_result = "Essex: default end_document result"; } my $event = eval { $self->_recv_event_from_parent }; unless ( defined $event ) { if ( $@ eq EOD . "\n" ) { lock @{$self->{Events}}; shift @{$self->{Events}}; } elsif ( $@ eq BOD . "\n") { lock @{$self->{Events}}; shift @{$self->{Events}}; last; } elsif ( $@ eq SEPPUKU . "\n") { last EXECUTE; } else { die $@; } } } $pending_end_document_result = "Essex: default end_document result"; eval { my $event; undef $pending_end_document_result; warn "Essex $self: running execute()\n" if debugging; my $r = $self->SUPER::execute( @_ ); warn "Essex $self: execute() exited with ", defined $r ? "'$r'" : "undef", "\n" if debugging; $pending_end_document_result = $r; 1; } or do { warn "Essex $self: execute() exited with exception $@\n" if debugging; if ( $@ eq EOD . "\n") { lock @{$self->{Events}}; shift @{$self->{Events}}; } elsif ( $@ eq SEPPUKU . "\n") { last; } else { $self->{PendingResultType} = "exception"; $self->{PendingResult} = $@; } }; } $self->{PendingResultType} = $self->{PendingResult} = SEPPUKU; $self->_send_result_to_parent; } # Hopefully, this handles inline set_document_locator events relatively # gracefully, by queueing them up until the next event arrives. This is # necessary because set_document_locator events can arrive *before* the # start_document, and we need to wait for the next event to see whether # to insert the BOD before the set_document_locator. This is all so that # the initial set_document_locator event(s) will arrive before the # start_document event in the main() routine, given that we need to # send the BOD psuedo event in case the main() routine is still running. sub set_document_locator { push @{shift->{PendingEvents}}, [ "set_document_locator", @_ ]; return "Essex: document locator queued"; } sub start_document { my $self = shift; unshift @{$self->{PendingEvents}}, [ BOD ]; $self->SUPER::start_document( @_ ); } sub end_document { my $self = shift; ## Must send EOD after the end_document so that we get the end_document ## result back first otherwise it would be lost because ## _recv_event_from_parent does not send results back if there are any ## other events in the queue. If this were not so, we could add a hack ## here to queue up both end_document and EOD at once. my $r = _send_event_to_child( $self, "end_document", @_ ); my @event: shared = ( EOD ); lock @{$self->{Events}}; push @{$self->{Events}}, \@event; threads::shared::cond_signal( @{$self->{Events}} ); return $r; }
1;
1;