HTTP::Async - process multiple HTTP requests in parallel without blocking.


HTTP-Async documentation Contained in the HTTP-Async distribution.

Index


Code Index:

NAME

Top

HTTP::Async - process multiple HTTP requests in parallel without blocking.

SYNOPSIS

Top

Create an object and add some requests to it:

    use HTTP::Async;
    my $async = HTTP::Async->new;

    # create some requests and add them to the queue.
    $async->add( HTTP::Request->new( GET => 'http://www.perl.org/'         ) );
    $async->add( HTTP::Request->new( GET => 'http://www.ecclestoad.co.uk/' ) );

and then EITHER process the responses as they come back:

    while ( my $response = $async->wait_for_next_response ) {
        # Do some processing with $response
    }

OR do something else if there is no response ready:

    while ( $async->not_empty ) {
        if ( my $response = $async->next_response ) {
            # deal with $response
        } else {
            # do something else
        {
    }

OR just use the async object to fetch stuff in the background and deal with the responses at the end.

    # Do some long code...
    for ( 1 .. 100 ) {
      some_function();
      $async->poke;            # lets it check for incoming data.
    }

    while ( my $response = $async->wait_for_next_response ) {
        # Do some processing with $response
    }    

DESCRIPTION

Top

Although using the conventional LWP::UserAgent is fast and easy it does have some drawbacks - the code execution blocks until the request has been completed and it is only possible to process one request at a time. HTTP::Async attempts to address these limitations.

It gives you a 'Async' object that you can add requests to, and then get the requests off as they finish. The actual sending and receiving of the requests is abstracted. As soon as you add a request it is transmitted, if there are too many requests in progress at the moment they are queued. There is no concept of starting or stopping - it runs continuously.

Whilst it is waiting to receive data it returns control to the code that called it meaning that you can carry out processing whilst fetching data from the network. All without forking or threading - it is actually done using select lists.

Default settings:

Top

There are a number of default settings that should be suitable for most uses. However in some circumstances you might wish to change these.

            slots: 20
          timeout: 180 (seconds)
 max_request_time: 300 (seconds)
    max_redirects: 7
    poll_interval: 0.05 (seconds)
       proxy_host: ''
       proxy_port: ''

METHODS

Top

new

    my $async = HTTP::Async->new( %args );

Creates a new HTTP::Async object and sets it up. Variations from the default can be set by passing them in as %args.

slots, timeout, max_request_time, poll_interval, max_redirects, proxy_host and proxy_port

    $old_value = $async->slots;
    $new_value = $async->slots( $new_value );

Get/setters for the $async objects config settings. Timeout is for inactivity and is in seconds.

Slots is the maximum number of parallel requests to make.

add

    my @ids      = $async->add(@requests);
    my $first_id = $async->add(@requests);

Adds requests to the queues. Each request is given an unique integer id (for this $async) that can be used to track the requests if needed. If called in list context an array of ids is returned, in scalar context the id of the first request added is returned.

add_with_opts

    my $id = $async->add_with_opts( $request, \%opts );

This method lets you add a single request to the queue with options that differ from the defaults. For example you might wish to set a longer timeout or to use a specific proxy. Returns the id of the request.

poke

    $async->poke;

At fairly frequent intervals some housekeeping needs to performed - such as reading recieved data and starting new requests. Calling poke lets the object do this and then return quickly. Usually you will not need to use this as most other methods do it for you.

You should use poke if your code is spending time elsewhere (ie not using the async object) to allow it to keep the data flowing over the network. If it is not used then the buffers may fill up and completed responses will not be replaced with new requests.

next_response

    my $response          = $async->next_response;
    my ( $response, $id ) = $async->next_response;

Returns the next response (as a HTTP::Response object) that is waiting, or returns undef if there is none. In list context it returns a (response, id) pair, or an empty list if none. Does not wait for a response so returns very quickly.

wait_for_next_response

    my $response          = $async->wait_for_next_response( 3.5 );
    my ( $response, $id ) = $async->wait_for_next_response( 3.5 );

As next_response but only returns if there is a next response or the time in seconds passed in has elapsed. If no time is given then it blocks. Whilst waiting it checks the queues every c<poll_interval> seconds. The times can be fractional seconds.

to_send_count, to_return_count, in_progress_count and total_count

    my $pending = $async->to_send_count;

Returns the number of items in the various stages of processing.

info

    print $async->info;

Prints a line describing what the current state is.

empty, not_empty

    while ( $async->not_empty ) { ...; }
    while (1) { ...; last if $async->empty; }

Returns true or false depending on whether there are request or responses still on the object.

DESTROY

The destroy method croaks if an object is destroyed but is not empty. This is to help with debugging.

SEE ALSO

Top

HTTP::Async::Polite - a polite form of this module. Slows the scraping down by domain so that the remote server is not overloaded.

GOTCHAS

Top

The responses may not come back in the same order as the requests were made.

THANKS

Top

Egor Egorov contributed patches for proxies, catching connections that die before headers sent and more.

Tomohiro Ikebe from livedoor.jp submitted patches (and a test) to properly handle 304 responses.

AUTHOR

Top

Edmund von der Burg <evdb@ecclestoad.co.uk>.

http://www.ecclestoad.co.uk/

LICENCE AND COPYRIGHT

Top

DISCLAIMER OF WARRANTY

Top

BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION.

IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.


HTTP-Async documentation Contained in the HTTP-Async distribution.
use strict;
use warnings;

package HTTP::Async;

our $VERSION = '0.09';

use Carp;
use Data::Dumper;
use HTTP::Response;
use IO::Select;
use Net::HTTP::NB;
use Net::HTTP;
use URI;
use Time::HiRes qw( time sleep );

sub new {
    my $class = shift;
    my $self  = bless {

        opts => {
            slots            => 20,
            max_redirects    => 7,
            timeout          => 180,
            max_request_time => 300,
            poll_interval    => 0.05,
        },

        id_opts => {},

        to_send     => [],
        in_progress => {},
        to_return   => [],

        current_id   => 0,
        fileno_to_id => {},
    }, $class;

    $self->_init(@_);

    return $self;
}

sub _init {
    my $self = shift;
    my %args = @_;
    $self->_set_opt( $_ => $args{$_} ) for sort keys %args;
    return $self;
}

sub _next_id { return ++$_[0]->{current_id} }

my %GET_SET_KEYS =
  map { $_ => 1 }
  qw( slots poll_interval
  timeout max_request_time max_redirects
  proxy_host proxy_port );

sub _add_get_set_key {
    my $class = shift;
    my $key   = shift;
    $GET_SET_KEYS{$key} = 1;
}

sub _get_opt {
    my $self = shift;
    my $key  = shift;
    my $id   = shift;
    die "$key not valid for _get_opt" unless $GET_SET_KEYS{$key};

    # If there is an option set for this id then use that, otherwise fall back
    # to the defaults.
    return $self->{id_opts}{$id}{$key}
      if $id && defined $self->{id_opts}{$id}{$key};

    return $self->{opts}{$key};

}

sub _set_opt {
    my $self = shift;
    my $key  = shift;
    die "$key not valid for _set_opt" unless $GET_SET_KEYS{$key};
    $self->{opts}{$key} = shift if @_;
    return $self->{opts}{$key};
}

foreach my $key ( keys %GET_SET_KEYS ) {
    eval "
        sub $key {
                my \$self = shift;
                return scalar \@_
                    ? \$self->_set_opt( '$key', \@_ )
                    : \$self->_get_opt( '$key' );
        }
        ";
}

sub add {
    my $self    = shift;
    my @returns = ();

    foreach my $req (@_) {
        push @returns, $self->add_with_opts( $req, {} );
    }

    return wantarray ? @returns : $returns[0];
}

sub add_with_opts {
    my $self = shift;
    my $req  = shift;
    my $opts = shift;
    my $id   = $self->_next_id;

    push @{ $$self{to_send} }, [ $req, $id ];
    $self->{id_opts}{$id} = $opts;
    $self->poke;

    return $id;
}

sub poke {
    my $self = shift;

    $self->_process_in_progress;
    $self->_process_to_send;

    return 1;
}

sub next_response {
    my $self = shift;
    return $self->_next_response(0);
}

sub wait_for_next_response {
    my $self     = shift;
    my $wait_for = shift;

    $wait_for = $self->max_request_time
      if !defined $wait_for;

    return $self->_next_response($wait_for);
}

sub _next_response {
    my $self        = shift;
    my $wait_for    = shift || 0;
    my $end_time    = time + $wait_for;
    my $resp_and_id = undef;

    while ( !$self->empty ) {
        $resp_and_id = shift @{ $$self{to_return} };

        # last if we have a response or we have run out of time.
        last
          if $resp_and_id
          || time > $end_time;

        # sleep for the default sleep time.
        # warn "sleeping for " . $self->poll_interval;
        sleep $self->poll_interval;
    }

    # If there is no result return false.
    return unless $resp_and_id;

    # We have a response - delete the options for it from the store.
    delete $self->{id_opts}{ $resp_and_id->[1] };

    # If we have a result return list or response depending on
    # context.
    return wantarray
      ? @$resp_and_id
      : $resp_and_id->[0];
}

sub to_send_count   { my $s = shift; $s->poke; scalar @{ $$s{to_send} }; }
sub to_return_count { my $s = shift; $s->poke; scalar @{ $$s{to_return} }; }

sub in_progress_count {
    my $s = shift;
    $s->poke;
    scalar keys %{ $$s{in_progress} };
}

sub total_count {
    my $self = shift;

    my $count = 0                   #
      + $self->to_send_count        #
      + $self->in_progress_count    #
      + $self->to_return_count;

    return $count;
}

sub info {
    my $self = shift;

    return sprintf(
        "HTTP::Async status: %4u,%4u,%4u (send, progress, return)\n",
        $self->to_send_count,        #
        $self->in_progress_count,    #
        $self->to_return_count
    );
}

sub empty {
    my $self = shift;
    return $self->total_count ? 0 : 1;
}

sub not_empty {
    my $self = shift;
    return !$self->empty;
}

sub DESTROY {
    my $self  = shift;
    my $class = ref $self;

    carp "$class object destroyed but still in use"
      if $self->total_count;

    carp "$class INTERNAL ERROR: 'id_opts' not empty"
      if scalar keys %{ $self->{id_opts} };

    return;
}

# Go through all the values on the select list and check to see if
# they have been fully received yet.

sub _process_in_progress {
    my $self = shift;

  HANDLE:
    foreach my $s ( $self->_io_select->can_read(0) ) {

        my $id = $self->{fileno_to_id}{ $s->fileno };
        die unless $id;
        my $hashref = $$self{in_progress}{$id};
        my $tmp     = $hashref->{tmp} ||= {};

        # warn Dumper $hashref;

        # Check that we have not timed-out.
        if (   time > $hashref->{timeout_at}
            || time > $hashref->{finish_by} )
        {

            # warn sprintf "Timeout: %.3f > %.3f",    #
            #   time, $hashref->{timeout_at};

            $self->_add_error_response_to_return(
                id       => $id,
                code     => 504,
                request  => $hashref->{request},
                previous => $hashref->{previous},
                content  => 'Timed out',
            );

            $self->_io_select->remove($s);
            delete $$self{fileno_to_id}{ $s->fileno };
            next HANDLE;
        }

        # If there is a code then read the body.
        if ( $$tmp{code} ) {
            my $buf;
            my $n = $s->read_entity_body( $buf, 1024 * 16 );    # 16kB
            $$tmp{is_complete} = 1 unless $n;
            $$tmp{content} .= $buf;

            # warn "Received " . length( $buf ) ;

            # Reset the timeout.
            # warn( "reseting the timeout " . time );
            $hashref->{timeout_at} = time + $self->_get_opt( 'timeout', $id );

            # warn $buf;
        }

        # If no code try to read the headers.
        else {
            $s->flush;

            my ( $code, $message, %headers );

            eval {
                ( $code, $message, %headers ) =
                  $s->read_response_headers( laxed => 1, junk_out => [] );
            };

            if ($@) {
                $self->_add_error_response_to_return(
                    'code'     => 504,
                    'content'  => $@,
                    'id'       => $id,
                    'request'  => $hashref->{request},
                    'previous' => $hashref->{previous}
                );
                $self->_io_select->remove($s);
                delete $$self{fileno_to_id}{ $s->fileno };
                next HANDLE;
            }

            if ($code) {

                # warn "Got headers: $code $message " . time;

                $$tmp{code}    = $code;
                $$tmp{message} = $message;
                my @headers_array = map { $_, $headers{$_} } keys %headers;
                $$tmp{headers} = \@headers_array;

                # Reset the timeout.
                $hashref->{timeout_at} =
                  time + $self->_get_opt( 'timeout', $id );
            }
        }

        # If the message is complete then create a request and add it
        # to 'to_return';
        if ( $$tmp{is_complete} ) {
            delete $$self{fileno_to_id}{ $s->fileno };
            $self->_io_select->remove($s);

            # warn Dumper $$hashref{content};

            my $response =
              HTTP::Response->new(
                @$tmp{ 'code', 'message', 'headers', 'content' } );

            $response->request( $hashref->{request} );
            $response->previous( $hashref->{previous} ) if $hashref->{previous};

            # If it was a redirect and there are still redirects left
            # create a new request and unshift it onto the 'to_send'
            # array.
            if (
                $response->is_redirect            # is a redirect
                && $hashref->{redirects_left} > 0 # and we still want to follow
                && $response->code != 304         # not a 'not modified' reponse
              )
            {

                $hashref->{redirects_left}--;

                my $loc = $response->header('Location');
                my $uri = $response->request->uri;

                warn "Problem: " . Dumper( { loc => $loc, uri => $uri } )
                  unless $uri && ref $uri && $loc && !ref $loc;

                my $url = _make_url_absolute( url => $loc, ref => $uri );

                my $request = HTTP::Request->new( 'GET', $url );

                $self->_send_request( [ $request, $id ] );
                $hashref->{previous} = $response;
            }
            else {
                $self->_add_to_return_queue( [ $response, $id ] );
                delete $$self{in_progress}{$id};
            }

            delete $hashref->{tmp};
        }
    }

    return 1;
}

sub _add_to_return_queue {
    my $self       = shift;
    my $req_and_id = shift;
    push @{ $$self{to_return} }, $req_and_id;
    return 1;
}

# Add all the items waiting to be sent to 'to_send' up to the 'slots'
# limit.

sub _process_to_send {
    my $self = shift;

    while ( scalar @{ $$self{to_send} }
        && $self->slots > scalar keys %{ $$self{in_progress} } )
    {
        $self->_send_request( shift @{ $$self{to_send} } );
    }

    return 1;
}

sub _send_request {
    my $self     = shift;
    my $r_and_id = shift;
    my ( $request, $id ) = @$r_and_id;

    my $uri = URI->new( $request->uri );

    my %args = ();

    # We need to use a different request_uri for proxied requests. Decide to use
    # this if a proxy port or host is set.
    #
    #   http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
    $args{Host}     = $uri->host;
    $args{PeerAddr} = $self->_get_opt( 'proxy_host', $id );
    $args{PeerPort} = $self->_get_opt( 'proxy_port', $id );

    my $request_is_to_proxy =
      ( $args{PeerAddr} || $args{PeerPort} )    # if either are set...
      ? 1                                       # ...then we are a proxy request
      : 0;                                      # ...otherwise not

    # If we did not get a setting from the proxy then use the uri values.
    $args{PeerAddr} ||= $uri->host;
    $args{PeerPort} ||= $uri->port;

    my $s = eval { Net::HTTP::NB->new(%args) };

    # We could not create a request - fake up a 503 response with
    # error as content.
    if ( !$s ) {

        $self->_add_error_response_to_return(
            id       => $id,
            code     => 503,
            request  => $request,
            previous => $$self{in_progress}{$id}{previous},
            content  => $@,
        );

        return 1;
    }

    my %headers = %{ $request->{_headers} };

    # Decide what to use as the request_uri
    my $request_uri = $request_is_to_proxy    # is this a proxy request....
      ? $uri->as_string                       # ... if so use full url
      : _strip_host_from_uri($uri);    # ...else strip off scheme, host and port

    croak "Could not write request to $uri '$!'"
      unless $s->write_request( $request->method, $request_uri, %headers,
        $request->content );

    $self->_io_select->add($s);

    $$self{fileno_to_id}{ $s->fileno }   = $id;
    $$self{in_progress}{$id}{request}    = $request;
    $$self{in_progress}{$id}{timeout_at} =
      time + $self->_get_opt( 'timeout', $id );
    $$self{in_progress}{$id}{finish_by} =
      time + $self->_get_opt( 'max_request_time', $id );

    $$self{in_progress}{$id}{redirects_left} =
      $self->_get_opt( 'max_redirects', $id )
      unless exists $$self{in_progress}{$id}{redirects_left};

    return 1;
}

sub _strip_host_from_uri {
    my $uri = shift;

    my $scheme_and_auth = quotemeta( $uri->scheme . '://' . $uri->authority );
    my $url             = $uri->as_string;

    $url =~ s/^$scheme_and_auth//;
    $url = "/$url" unless $url =~ m{^/};

    return $url;
}

sub _io_select {
    my $self = shift;
    return $$self{io_select} ||= IO::Select->new();
}

sub _make_url_absolute {
    my %args = @_;

    my $in  = $args{url};
    my $ref = $args{ref};

    return $in if $in =~ m{ \A http:// }xms;

    my $ret = $ref->scheme . '://' . $ref->authority;
    return $ret . $in if $in =~ m{ \A / }xms;

    $ret .= $ref->path;
    return $ret . $in if $in =~ m{ \A [\?\#\;] }xms;

    $ret =~ s{ [^/]+ \z }{}xms;
    return $ret . $in;
}

sub _add_error_response_to_return {
    my $self = shift;
    my %args = @_;

    use HTTP::Status;

    my $response =
      HTTP::Response->new( $args{code}, status_message( $args{code} ),
        undef, $args{content} );

    $response->request( $args{request} );
    $response->previous( $args{previous} ) if $args{previous};

    $self->_add_to_return_queue( [ $response, $args{id} ] );
    delete $$self{in_progress}{ $args{id} };

    return $response;

}

1;