Mojo::UserAgent - Async IO HTTP 1.1 And WebSocket User Agent


Mojolicious documentation Contained in the Mojolicious distribution.

Index


Code Index:

NAME

Top

Mojo::UserAgent - Async IO HTTP 1.1 And WebSocket User Agent

SYNOPSIS

Top

  use Mojo::UserAgent;
  my $ua = Mojo::UserAgent->new;

  # Grab the latest Mojolicious release :)
  my $latest = 'http://latest.mojolicio.us';
  print $ua->max_redirects(3)->get($latest)->res->body;

  # Quick JSON request
  my $trends = 'https://api.twitter.com/1/trends.json';
  print $ua->get($trends)->res->json->{trends}->[0]->{name};

  # Extract data from HTML and XML resources
  print $ua->get('mojolicio.us')->res->dom->html->head->title->text;

  # Scrape the latest headlines from a news site
  my $news = 'http://digg.com';
  $ua->max_redirects(3);
  $ua->get($news)->res->dom('h3.story-item-title > a[href]')->each(
    my $e = shift;
    print "$e->{href}:\n";
    print $e->text, "\n";
  });

  # Form post with exception handling
  my $cpan   = 'http://search.cpan.org/search';
  my $search = {q => 'mojo'};
  my $tx     = $ua->post_form($cpan => $search);
  if (my $res = $tx->success) { print $res->body }
  else {
    my ($message, $code) = $tx->error;
    print "Error: $message";
  }

  # TLS certificate authentication
  $ua->cert('tls.crt')->key('tls.key')->get('https://mojolicio.us');

  # Websocket request
  $ua->websocket('ws://websockets.org:8787' => sub {
    my $tx = pop;
    $tx->on_finish(sub { Mojo::IOLoop->stop });
    $tx->on_message(sub {
      my ($tx, $message) = @_;
      print "$message\n";
      $tx->finish;
    });
    $tx->send_message('hi there!');
  });
  Mojo::IOLoop->start;

DESCRIPTION

Top

Mojo::UserAgent is a full featured async io HTTP 1.1 and WebSocket user agent with IPv6, TLS, epoll and kqueue support.

Optional modules IO::KQueue, IO::Epoll, IO::Socket::IP and IO::Socket::SSL are supported transparently and used if installed.

ATTRIBUTES

Top

Mojo::UserAgent implements the following attributes.

app

  my $app = $ua->app;
  $ua     = $ua->app(MyApp->new);

A Mojo application to associate this user agent with. If set, local requests will be processed in this application.

cert

  my $cert = $ua->cert;
  $ua      = $ua->cert('tls.crt');

Path to TLS certificate file, defaults to the value of MOJO_CERT_FILE.

http_proxy

  my $proxy = $ua->http_proxy;
  $ua       = $ua->http_proxy('http://sri:secret@127.0.0.1:8080');

Proxy server to use for HTTP and WebSocket requests.

https_proxy

  my $proxy = $ua->https_proxy;
  $ua       = $ua->https_proxy('http://sri:secret@127.0.0.1:8080');

Proxy server to use for HTTPS and WebSocket requests.

ioloop

  my $loop = $ua->ioloop;
  $ua      = $ua->ioloop(Mojo::IOLoop->new);

Loop object to use for blocking io operations, by default a Mojo::IOLoop object will be used.

keep_alive_timeout

  my $keep_alive_timeout = $ua->keep_alive_timeout;
  $ua                    = $ua->keep_alive_timeout(15);

Maximum amount of time in seconds a connection can be inactive before being dropped, defaults to 15.

key

  my $key = $ua->key;
  $ua     = $ua->key('tls.crt');

Path to TLS key file, defaults to the value of MOJO_KEY_FILE.

log

  my $log = $ua->log;
  $ua     = $ua->log(Mojo::Log->new);

A Mojo::Log object used for logging, by default the application log will be used.

max_connections

  my $max_connections = $ua->max_connections;
  $ua                 = $ua->max_connections(5);

Maximum number of keep alive connections that the user agent will retain before it starts closing the oldest cached ones, defaults to 5.

max_redirects

  my $max_redirects = $ua->max_redirects;
  $ua               = $ua->max_redirects(3);

Maximum number of redirects the user agent will follow before it fails, defaults to the value of MOJO_MAX_REDIRECTS or 0.

name

  my $name = $ua->name;
  $ua      = $ua->name('Mojolicious');

Value for User-Agent request header, defaults to Mojolicious (Perl).

no_proxy

  my $no_proxy = $ua->no_proxy;
  $ua          = $ua->no_proxy(['localhost', 'intranet.mojolicio.us']);

Domains that don't require a proxy server to be used. Note that this attribute is EXPERIMENTAL and might change without warning!

on_start

  my $cb = $ua->on_start;
  $ua    = $ua->on_start(sub {...});

Callback to be invoked whenever a new transaction is about to start, this includes automatically prepared proxy CONNECT requests and followed redirects.

  $ua->on_start(sub {
    my ($ua, $tx) = @_;
    $tx->req->headers->header('X-Bender', 'Bite my shiny metal ass!');
  });

websocket_timeout

  my $websocket_timeout = $ua->websocket_timeout;
  $ua                   = $ua->websocket_timeout(300);

Maximum amount of time in seconds a WebSocket connection can be inactive before being dropped, defaults to 300.

METHODS

Top

Mojo::UserAgent inherits all methods from Mojo::Base and implements the following new ones.

build_form_tx

  my $tx = $ua->build_form_tx('http://kraih.com/foo' => {test => 123});
  my $tx = $ua->build_form_tx(
    'http://kraih.com/foo',
    'UTF-8',
    {test => 123}
  );
  my $tx = $ua->build_form_tx(
    'http://kraih.com/foo',
    {test => 123},
    {Accept => '*/*'}
  );
  my $tx = $ua->build_form_tx(
    'http://kraih.com/foo',
    'UTF-8',
    {test => 123},
    {Accept => '*/*'}
  );
  my $tx = $ua->build_form_tx(
    'http://kraih.com/foo',
    {file => {file => '/foo/bar.txt'}}
  );
  my $tx = $ua->build_form_tx(
    'http://kraih.com/foo',
    {file => {content => 'lalala'}}
  );
  my $tx = $ua->build_form_tx(
    'http://kraih.com/foo',
    {myzip => {file => $asset, filename => 'foo.zip'}}
  );

Versatile Mojo::Transaction::HTTP builder for forms.

  my $tx = $ua->build_form_tx('http://kraih.com/foo' => {test => 123});
  $tx->res->body(sub { print $_[1] });
  $ua->start($tx);

build_tx

  my $tx = $ua->build_tx(GET => 'mojolicio.us');
  my $tx = $ua->build_tx(POST => 'http://mojolicio.us');
  my $tx = $ua->build_tx(GET => 'http://kraih.com' => {Accept => '*/*'});
  my $tx = $ua->build_tx(
    POST => 'http://kraih.com' => {{Accept => '*/*'} => 'Hi!'
  );

Versatile general purpose Mojo::Transaction::HTTP builder.

  # Streaming response
  my $tx = $ua->build_tx(GET => 'http://mojolicio.us');
  $tx->res->body(sub { print $_[1] });
  $ua->start($tx);

  # Custom socket
  my $tx = $ua->build_tx(GET => 'http://mojolicio.us');
  $tx->connection($socket);
  $ua->start($tx);

build_websocket_tx

  my $tx = $ua->build_websocket_tx('ws://localhost:3000');

Versatile Mojo::Transaction::HTTP builder for WebSocket handshakes. An upgrade to Mojo::Transaction::WebSocket will happen automatically after a successful handshake is performed.

delete

  my $tx = $ua->delete('http://kraih.com');
  my $tx = $ua->delete('http://kraih.com' => {Accept => '*/*'};
  my $tx = $ua->delete('http://kraih.com' => {Accept => '*/*'} => 'Hi!');

Perform blocking HTTP DELETE request and return resulting Mojo::Transaction::HTTP object. You can also append a callback to perform requests non-blocking.

  $ua->delete('http://kraih.com' => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

detect_proxy

  $ua = $ua->detect_proxy;

Check environment variables HTTP_PROXY, http_proxy, HTTPS_PROXY, https_proxy, NO_PROXY and no_proxy for proxy information.

get

  my $tx = $ua->get('http://kraih.com');
  my $tx = $ua->get('http://kraih.com' => {Accept => '*/*'});
  my $tx = $ua->get('http://kraih.com' => {Accept => '*/*'} => 'Hi!');

Perform blocking HTTP GET request and return resulting Mojo::Transaction::HTTP object. You can also append a callback to perform requests non-blocking.

  $ua->get('http://kraih.com' => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

head

  my $tx = $ua->head('http://kraih.com');
  my $tx = $ua->head('http://kraih.com' => {Accept => '*/*'});
  my $tx = $ua->head('http://kraih.com' => {Accept => '*/*'} => 'Hi!');

Perform blocking HTTP HEAD request and return resulting Mojo::Transaction::HTTP object. You can also append a callback to perform requests non-blocking.

  $ua->head('http://kraih.com' => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

need_proxy

  my $need_proxy = $ua->need_proxy('intranet.mojolicio.us');

Check if request for domain would use a proxy server. Note that this method is EXPERIMENTAL and might change without warning!

post

  my $tx = $ua->post('http://kraih.com');
  my $tx = $ua->post('http://kraih.com' => {Accept => '*/*'});
  my $tx = $ua->post('http://kraih.com' => {Accept => '*/*'} => 'Hi!');

Perform blocking HTTP POST request and return resulting Mojo::Transaction::HTTP object. You can also append a callback to perform requests non-blocking.

  $ua->post('http://kraih.com' => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

post_form

  my $tx = $ua->post_form('http://kraih.com/foo' => {test => 123});
  my $tx = $ua->post_form(
    'http://kraih.com/foo'
    'UTF-8',
    {test => 123}
  );
  my $tx  = $ua->post_form(
    'http://kraih.com/foo',
    {test => 123},
    {Accept => '*/*'}
  );
  my $tx  = $ua->post_form(
    'http://kraih.com/foo',
    'UTF-8',
    {test => 123},
    {Accept => '*/*'}
  );
  my $tx = $ua->post_form(
    'http://kraih.com/foo',
    {file => {file => '/foo/bar.txt'}}
  );
  my $tx= $ua->post_form(
    'http://kraih.com/foo',
    {file => {content => 'lalala'}}
  );
  my $tx = $ua->post_form(
    'http://kraih.com/foo',
    {myzip => {file => $asset, filename => 'foo.zip'}}
  );

Perform blocking HTTP POST request with form data and return resulting Mojo::Transaction::HTTP object. You can also append a callback to perform requests non-blocking.

  $ua->post_form('http://kraih.com' => {q => 'test'} => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

put

  my $tx = $ua->put('http://kraih.com');
  my $tx = $ua->put('http://kraih.com' => {Accept => '*/*'});
  my $tx = $ua->put('http://kraih.com' => {Accept => '*/*'} => 'Hi!');

Perform blocking HTTP PUT request and return resulting Mojo::Transaction::HTTP object. You can also append a callback to perform requests non-blocking.

  $ua->put('http://kraih.com' => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

start

  $ua = $ua->start($tx);

Process blocking transaction. You can also append a callback to perform transactions non-blocking.

  $ua->start($tx => sub {
    print pop->res->body;
    Mojo::IOLoop->stop;
  });
  Mojo::IOLoop->start;

test_server

  my $port = $ua->test_server;
  my $port = $ua->test_server('https');

Starts a test server for app if necessary and returns the port number. Note that this method is EXPERIMENTAL and might change without warning!

websocket

  $ua->websocket('ws://localhost:3000' => sub {...});
  $ua->websocket(
    'ws://localhost:3000' => {'User-Agent' => 'Agent 1.0'} => sub {...}
  );

Open a non-blocking WebSocket connection with transparent handshake.

  $ua->websocket('ws://localhost:3000/echo' => sub {
    my $tx = pop;
    $tx->on_finish(sub  { Mojo::IOLoop->stop });
    $tx->on_message(sub {
      my ($tx, $message) = @_;
      print "$message\n";
    });
    $tx->send_message('Hi!');
  });
  Mojo::IOLoop->start;

DEBUGGING

Top

You can set the MOJO_USERAGENT_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.

  MOJO_USERAGENT_DEBUG=1

SEE ALSO

Top

Mojolicious, Mojolicious::Guides, http://mojolicio.us.


Mojolicious documentation Contained in the Mojolicious distribution.

package Mojo::UserAgent;
use Mojo::Base -base;

use Carp 'croak';
use Mojo::Asset::File;
use Mojo::Asset::Memory;
use Mojo::Content::MultiPart;
use Mojo::Content::Single;
use Mojo::CookieJar;
use Mojo::IOLoop;
use Mojo::Log;
use Mojo::Parameters;
use Mojo::Server::Daemon;
use Mojo::Transaction::HTTP;
use Mojo::Transaction::WebSocket;
use Mojo::Util qw/encode url_escape/;
use Mojo::URL;
use Scalar::Util 'weaken';

use constant DEBUG => $ENV{MOJO_USERAGENT_DEBUG} || 0;

# "You can't let a single bad experience scare you away from drugs."
has [qw/app http_proxy https_proxy no_proxy on_start/];
has cert       => sub { $ENV{MOJO_CERT_FILE} };
has cookie_jar => sub { Mojo::CookieJar->new };
has ioloop     => sub { Mojo::IOLoop->new };
has keep_alive_timeout => 15;
has key                => sub { $ENV{MOJO_KEY_FILE} };
has log                => sub { Mojo::Log->new };
has max_connections    => 5;
has max_redirects      => sub { $ENV{MOJO_MAX_REDIRECTS} || 0 };
has name               => 'Mojolicious (Perl)';
has websocket_timeout  => 300;

# Make sure we leave a clean ioloop behind
sub DESTROY { shift->_cleanup }

# "Ah, alcohol and night-swimming. It's a winning combination."
sub build_form_tx {
  my $self = shift;
  my $url  = shift;

  # Callback
  my $cb = pop @_ if ref $_[-1] && ref $_[-1] eq 'CODE';

  # Form
  my $encoding = shift;
  my $form = ref $encoding ? $encoding : shift;
  $encoding = undef if ref $encoding;

  # Parameters
  my $params = Mojo::Parameters->new;
  $params->charset($encoding) if defined $encoding;
  my $multipart;
  for my $name (sort keys %$form) {

    # Array
    if (ref $form->{$name} eq 'ARRAY') {
      $params->append($name, $_) for @{$form->{$name}};
    }

    # Hash
    elsif (ref $form->{$name} eq 'HASH') {
      my $hash = $form->{$name};

      # Enforce "multipart/form-data"
      $multipart = 1;

      # File
      if (my $file = $hash->{file}) {

        # Upgrade
        $file = $hash->{file} = Mojo::Asset::File->new(path => $file)
          unless ref $file;

        # Filename
        $hash->{filename} ||= $file->path if $file->can('path');
      }

      # Memory
      elsif (defined(my $content = delete $hash->{content})) {
        $hash->{file} = Mojo::Asset::Memory->new->add_chunk($content);
      }

      $hash->{'Content-Type'} ||= 'application/octet-stream';
      push @{$params->params}, $name, $hash;
    }

    # Single value
    else { $params->append($name, $form->{$name}) }
  }

  # New transaction
  my $tx      = $self->build_tx(POST => $url);
  my $req     = $tx->req;
  my $headers = $req->headers;
  $headers->from_hash(ref $_[0] eq 'HASH' ? $_[0] : {@_});

  # Multipart
  $headers->content_type('multipart/form-data') if $multipart;
  my $type = $headers->content_type || '';
  if ($type eq 'multipart/form-data') {
    my $form = $params->to_hash;

    # Parts
    my @parts;
    foreach my $name (sort keys %$form) {
      my $part = Mojo::Content::Single->new;
      my $h    = $part->headers;
      my $f    = $form->{$name};

      # File
      my $filename;
      if (ref $f eq 'HASH') {
        $filename = delete $f->{filename} || $name;
        encode $encoding, $filename if $encoding;
        url_escape $filename, $Mojo::URL::UNRESERVED;
        $part->asset(delete $f->{file});
        $h->from_hash($f);
        push @parts, $part;
      }

      # Fields
      else {
        my $type = 'text/plain';
        $type .= qq/;charset=$encoding/ if $encoding;
        $h->content_type($type);

        # Values
        for my $value (ref $f ? @$f : ($f)) {
          $part = Mojo::Content::Single->new(headers => $h);
          encode $encoding, $value if $encoding;
          $part->asset->add_chunk($value);
          push @parts, $part;
        }
      }

      # Content-Disposition
      encode $encoding, $name if $encoding;
      url_escape $name, $Mojo::URL::UNRESERVED;
      my $disposition = qq/form-data; name="$name"/;
      $disposition .= qq/; filename="$filename"/ if $filename;
      $h->content_disposition($disposition);
    }

    # Multipart content
    my $content = Mojo::Content::MultiPart->new;
    $headers->content_type('multipart/form-data');
    $content->headers($headers);
    $content->parts(\@parts);

    # Add content to transaction
    $req->content($content);
  }

  # Urlencoded
  else {
    $headers->content_type('application/x-www-form-urlencoded');
    $req->body($params->to_string);
  }

  return $tx unless wantarray;
  $tx, $cb;
}

# "Homer, it's easy to criticize.
#  Fun, too."
sub build_tx {
  my $self = shift;

  # New transaction
  my $tx  = Mojo::Transaction::HTTP->new;
  my $req = $tx->req;
  $req->method(shift);
  my $url = shift;
  $url = "http://$url" unless $url =~ /^\/|\:\/\//;
  $req->url->parse($url);

  # Callback
  my $cb = pop @_ if ref $_[-1] && ref $_[-1] eq 'CODE';

  # Body
  $req->body(pop @_)
    if @_ & 1 == 1 && ref $_[0] ne 'HASH' || ref $_[-2] eq 'HASH';

  # Headers
  $req->headers->from_hash(ref $_[0] eq 'HASH' ? $_[0] : {@_});

  return $tx unless wantarray;
  $tx, $cb;
}

sub build_websocket_tx {
  my $self = shift;

  # New WebSocket
  my ($tx, $cb) = $self->build_tx(GET => @_);
  my $req = $tx->req;
  my $url = $req->url;
  my $abs = $url->to_abs;
  if (my $scheme = $abs->scheme) {
    $scheme = $scheme eq 'wss' ? 'https' : 'http';
    $req->url($abs->scheme($scheme));
  }

  # Handshake
  Mojo::Transaction::WebSocket->new(handshake => $tx, masked => 1)
    ->client_handshake;

  return $tx unless wantarray;
  $tx, $cb;
}

# "The only thing I asked you to do for this party was put on clothes,
#  and you didn't do it."
sub delete {
  my $self = shift;
  $self->start($self->build_tx('DELETE', @_));
}

sub detect_proxy {
  my $self = shift;

  # Uppercase gets priority
  $self->http_proxy($ENV{HTTP_PROXY}   || $ENV{http_proxy});
  $self->https_proxy($ENV{HTTPS_PROXY} || $ENV{https_proxy});
  if (my $no = $ENV{NO_PROXY} || $ENV{no_proxy}) {
    $self->no_proxy([split /,/, $no]);
  }

  $self;
}

# "'What are you lookin at?' - the innocent words of a drunken child."
sub get {
  my $self = shift;
  $self->start($self->build_tx('GET', @_));
}

sub head {
  my $self = shift;
  $self->start($self->build_tx('HEAD', @_));
}

sub need_proxy {
  my ($self, $host) = @_;

  # No proxy list
  return 1 unless my $no = $self->no_proxy;

  # No proxy needed
  $host =~ /\Q$_\E$/ and return for @$no;

  # Proxy needed
  1;
}

# "Olive oil? Asparagus? If your mother wasn't so fancy,
#  we could just shop at the gas station like normal people."
sub post {
  my $self = shift;
  $self->start($self->build_tx('POST', @_));
}

sub post_form {
  my $self = shift;
  $self->start($self->build_form_tx(@_));
}

# "And I gave that man directions, even though I didn't know the way,
#  because that's the kind of guy I am this week."
sub put {
  my $self = shift;
  $self->start($self->build_tx('PUT', @_));
}

# "Wow, Barney. You brought a whole beer keg.
#  Yeah... where do I fill it up?"
sub start {
  my ($self, $tx, $cb) = @_;

  # Blocking loop
  my $loop = $self->{_loop} ||= $self->ioloop;

  # Non-blocking
  if ($cb) {

    # Switch to non-blocking
    warn "NEW NON-BLOCKING REQUEST\n" if DEBUG;
    $self->_switch_non_blocking unless $self->{_nb};

    # Start
    return $self->_start_tx($tx, $cb);
  }

  # Switch to blocking
  warn "NEW BLOCKING REQUEST\n" if DEBUG;
  $self->_switch_blocking if $self->{_nb};

  # Quick start
  $self->_start_tx($tx, sub { $tx = $_[1] });

  # Start loop
  $loop->start;

  # Cleanup
  $loop->one_tick(0);

  $tx;
}

# "It's like my dad always said: eventually, everybody gets shot."
sub test_server {
  my ($self, $protocol) = @_;

  # Start test server
  unless ($self->{_port}) {
    my $loop = $self->{_loop} || $self->ioloop;
    my $server = $self->{_server} =
      Mojo::Server::Daemon->new(ioloop => $loop, silent => 1);
    my $port = $self->{_port} = $loop->generate_port;
    die "Couldn't find a free TCP port for testing.\n" unless $port;
    $self->{_protocol} = $protocol ||= 'http';
    $server->listen(["$protocol://*:$port"]);
    $server->prepare_ioloop;
  }

  # Prepare application for testing
  my $server = $self->{_server};
  delete $server->{app};
  my $app = $self->app;
  ref $app ? $server->app($app) : $server->app_class($app);
  $self->log($server->app->log);

  $self->{_port};
}

# "Are we there yet?
#  No
#  Are we there yet?
#  No
#  Are we there yet?
#  No
#  ...Where are we going?"
sub websocket {
  my $self = shift;
  $self->start($self->build_websocket_tx(@_));
}

sub _cache {
  my ($self, $name, $id) = @_;

  # Enqueue
  my $cache = $self->{_cache} ||= [];
  if ($id) {

    # Limit keep alive connections
    my $max = $self->max_connections;
    while (@$cache > $max) {
      my $cached = shift @$cache;
      $self->_drop($cached->[1]);
    }

    push @$cache, [$name, $id] if $max;
    return $self;
  }

  # Dequeue
  my $loop = $self->{_loop};
  my $result;
  my @cache;
  for my $cached (@$cache) {

    # Search for name or id
    if (!$result && ($cached->[1] eq $name || $cached->[0] eq $name)) {
      my $id = $cached->[1];

      # Test connection
      if ($loop->test($id)) { $result = $id }

      # Drop corrupted connection
      else { $loop->drop($id) }
    }

    # Cache again
    else { push @cache, $cached }
  }
  $self->{_cache} = \@cache;

  $result;
}

sub _cleanup {
  my $self = shift;
  return unless my $loop = $self->{_loop};

  # Stop server
  delete $self->{_port};
  delete $self->{_server};

  # Cleanup active connections
  warn "DROPPING ALL CONNECTIONS\n" if DEBUG;
  my $cs = $self->{_cs} || {};
  $loop->drop($_) for keys %$cs;

  # Cleanup keep alive connections
  my $cache = $self->{_cache} || [];
  for my $cached (@$cache) {
    $loop->drop($cached->[1]);
  }
}

sub _close { shift->_handle(pop, 1) }

# "Where on my badge does it say anything about protecting people?
#  Uh, second word, chief."
sub _connect {
  my ($self, $tx, $cb) = @_;

  # Keep alive connection
  weaken $self;
  my $loop = $self->{_loop};
  my $id   = $tx->connection;
  my ($scheme, $address, $port) = $self->_tx_info($tx);
  $id ||= $self->_cache("$scheme:$address:$port");
  if ($id && !ref $id) {
    warn "KEEP ALIVE CONNECTION ($scheme:$address:$port)\n" if DEBUG;
    $self->{_cs}->{$id} = {cb => $cb, tx => $tx};
    $tx->kept_alive(1);
    $self->_connected($id);
  }

  # New connection
  else {

    # TLS/WebSocket proxy
    unless (($tx->req->method || '') eq 'CONNECT') {

      # CONNECT request to proxy required
      return if $self->_connect_proxy($tx, $cb);
    }

    # Connect
    warn "NEW CONNECTION ($scheme:$address:$port)\n" if DEBUG;
    $id = $loop->connect(
      address  => $address,
      port     => $port,
      handle   => $id,
      tls      => $scheme eq 'https' ? 1 : 0,
      tls_cert => $self->cert,
      tls_key  => $self->key,
      on_connect => sub { $self->_connected($_[1]) }
    );
    $self->{_cs}->{$id} = {cb => $cb, tx => $tx};
  }

  # Callbacks
  $loop->on_close($id => sub { $self->_close(@_) });
  $loop->on_error($id => sub { $self->_error(@_) });
  $loop->on_read($id => sub { $self->_read(@_) });

  $id;
}

# "Hey, Weener Boy... where do you think you're going?"
sub _connect_proxy {
  my ($self, $old, $cb) = @_;

  # No proxy
  my $req = $old->req;
  return unless my $proxy = $req->proxy;

  # WebSocket and/or HTTPS
  my $url = $req->url;
  return
    unless ($req->headers->upgrade || '') eq 'websocket'
    || ($url->scheme || '') eq 'https';

  # CONNECT request
  my $new = $self->build_tx(CONNECT => $url->clone);
  $new->req->proxy($proxy);

  # Start CONNECT request
  $self->_start_tx(
    $new => sub {
      my ($self, $tx) = @_;

      # CONNECT failed
      unless (($tx->res->code || '') eq '200') {
        $old->req->error('Proxy connection failed.');
        $self->_finish_tx($old, $cb);
        return;
      }

      # TLS upgrade
      if ($tx->req->url->scheme eq 'https') {

        # Connection from keep alive cache
        return unless my $old_id = $tx->connection;

        # Start TLS
        my $new_id = $self->{_loop}->start_tls($old_id);

        # Cleanup
        $old->req->proxy(undef);
        delete $self->{_cs}->{$old_id};
        $tx->connection($new_id);
      }

      # Share connection
      $old->connection($tx->connection);

      # Start real transaction
      $self->_start_tx($old, $cb);
    }
  );

  1;
}

# "I don't mind being called a liar when I'm lying, or about to lie,
#  or just finished lying, but NOT WHEN I'M TELLING THE TRUTH."
sub _connected {
  my ($self, $id) = @_;

  # Store connection information in transaction
  my $loop = $self->{_loop};
  my $tx   = $self->{_cs}->{$id}->{tx};
  $tx->connection($id);
  my $local = $loop->local_info($id);
  $tx->local_address($local->{address});
  $tx->local_port($local->{port});
  my $remote = $loop->remote_info($id);
  $tx->remote_address($remote->{address});
  $tx->remote_port($remote->{port});

  # Keep alive timeout
  $loop->connection_timeout($id => $self->keep_alive_timeout);

  # Write
  $self->_write($id);
}

# "Mrs. Simpson, bathroom is not for customers.
#  Please use the crack house across the street."
sub _drop {
  my ($self, $id, $close) = @_;

  # Drop connection
  my $c = delete $self->{_cs}->{$id};

  # Transaction
  my $tx = $c->{tx};
  if (!$close && $tx && $tx->keep_alive && !$tx->error) {

    # Keep non-CONNECTed connection alive
    $self->_cache(join(':', $self->_tx_info($tx)), $id)
      unless (($tx->req->method || '') =~ /^connect$/i
      && ($tx->res->code || '') eq '200');

    # Still active
    return;
  }

  # Connection close
  $self->_cache($id);
  $self->{_loop}->drop($id);
}

sub _error {
  my ($self, $loop, $id, $error) = @_;

  # Store error in response
  if (my $tx = $self->{_cs}->{$id}->{tx}) { $tx->res->error($error) }

  # Log error
  $self->log->error($error);

  # Finish connection
  $self->_handle($id, $error);
}

# "Oh, I'm in no condition to drive. Wait a minute.
#  I don't have to listen to myself. I'm drunk."
sub _finish_tx {
  my ($self, $tx, $cb) = @_;

  # 400/500
  my $res = $tx->res;
  $res->error($res->message, $res->code)
    if $res->is_status_class(400) || $res->is_status_class(500);

  # Callback
  return unless $cb;
  $self->$cb($tx);
}

# "No children have ever meddled with the Republican Party and lived to tell
#  about it."
sub _handle {
  my ($self, $id, $close) = @_;

  # WebSocket
  my $c   = $self->{_cs}->{$id};
  my $old = $c->{tx};
  if ($old && $old->is_websocket) {

    # Finish transaction
    $old->client_close;
    $self->{_processing} -= 1;
    delete $self->{_cs}->{$id};
    $self->_drop($id, $close);
  }

  # Upgrade connection to WebSocket
  elsif ($old && (my $new = $self->_upgrade($id))) {

    # Finish transaction
    $self->_finish_tx($new, $c->{cb});

    # Parse leftovers
    $new->client_read($old->res->leftovers);
  }

  # Normal connection
  else {

    # Cleanup connection
    $self->_drop($id, $close);

    # Idle connection
    return unless $old;

    # Extract cookies
    if (my $jar = $self->cookie_jar) { $jar->extract($old) }

    # Finished transaction
    $self->{_processing} -= 1;

    # Redirect or callback
    $self->_finish_tx($new || $old, $c->{cb})
      unless $self->_redirect($c, $old);
  }

  # Stop loop
  $self->{_loop}->stop if !$self->{_nb} && !$self->{_processing};
}

# "Have you ever seen that Blue Man Group? Total ripoff of the Smurfs.
#  And the Smurfs, well, they SUCK."
sub _read {
  my ($self, $loop, $id, $chunk) = @_;
  warn "< $chunk\n" if DEBUG;

  # Transaction
  return unless my $c = $self->{_cs}->{$id};
  if (my $tx = $c->{tx}) {

    # Read
    $tx->client_read($chunk);

    # Finish
    if ($tx->is_done) { $self->_handle($id) }

    # Writing
    elsif ($c->{tx}->is_writing) { $self->_write($id) }

    return;
  }

  # Corrupted connection
  $self->_drop($id);
}

sub _redirect {
  my ($self, $c, $old) = @_;

  # Code
  my $res = $old->res;
  return unless $res->is_status_class('300');
  return if $res->code == 305;

  # Location
  return unless my $location = $res->headers->location;
  $location = Mojo::URL->new($location);

  # Fix broken location without authority and/or scheme
  my $req = $old->req;
  my $url = $req->url;
  $location->authority($url->authority) unless $location->authority;
  $location->scheme($url->scheme)       unless $location->scheme;

  # Method
  my $method = $req->method;
  $method = 'GET' unless $method =~ /^GET|HEAD$/i;

  # Max redirects
  my $r = $c->{redirects} || 0;
  my $max = $self->max_redirects;
  return unless $r < $max;

  # New transaction
  my $new = Mojo::Transaction::HTTP->new;
  $new->req->method($method)->url($location);
  $new->previous($old);

  # Start redirected request
  return 1 unless my $new_id = $self->_start_tx($new, $c->{cb});

  # Create new connection
  $self->{_cs}->{$new_id}->{redirects} = $r + 1;

  # Redirecting
  1;
}

# "It's great! We can do *anything* now that Science has invented Magic."
sub _start_tx {
  my ($self, $tx, $cb) = @_;

  # Embedded server
  if ($self->app) {
    my $req = $tx->req;
    my $url = $req->url->to_abs;

    # Relative
    unless ($url->host) {
      $url->scheme($self->{_protocol});
      $url->host('localhost');
      $url->port($self->test_server);
      $req->url($url);
    }
  }

  # Detect proxy
  $self->detect_proxy if $ENV{MOJO_PROXY};

  # Proxy
  my $req    = $tx->req;
  my $url    = $req->url;
  my $scheme = $url->scheme || '';
  if ($self->need_proxy($url->host)) {

    # HTTP proxy
    if (my $proxy = $self->http_proxy) {
      $req->proxy($proxy) if !$req->proxy && $scheme eq 'http';
    }

    # HTTPS proxy
    if (my $proxy = $self->https_proxy) {
      $req->proxy($proxy) if !$req->proxy && $scheme eq 'https';
    }
  }

  # We identify ourselves
  my $headers = $req->headers;
  $headers->user_agent($self->name) unless $headers->user_agent;

  # Inject cookies
  if (my $jar = $self->cookie_jar) { $jar->inject($tx) }

  # Start
  if (my $start = $self->on_start) { $self->$start($tx) }

  # Connect
  return unless my $id = $self->_connect($tx, $cb);

  # Resume callback
  weaken $self;
  $tx->on_resume(sub { $self->_write($id) });

  # Counter
  $self->{_processing} ||= 0;
  $self->{_processing} += 1;

  $id;
}

sub _switch_blocking {
  my $self = shift;

  # Can't switch while processing non-blocking requests
  croak 'Non-blocking requests in progress' if $self->{_processing};
  warn "SWITCHING TO BLOCKING MODE\n" if DEBUG;

  # Normal loop
  $self->_cleanup;
  $self->{_loop} = $self->ioloop;
  $self->{_nb}   = 0;
}

sub _switch_non_blocking {
  my $self = shift;

  # Can't switch while processing blocking requests
  croak 'Blocking request in progress' if $self->{_processing};
  warn "SWITCHING TO NON-BLOCKING MODE\n" if DEBUG;

  # Global loop
  $self->_cleanup;
  $self->{_loop} = Mojo::IOLoop->singleton;
  $self->{_nb}   = 1;
}

sub _tx_info {
  my ($self, $tx) = @_;

  # Proxy info
  my $req    = $tx->req;
  my $url    = $req->url;
  my $scheme = $url->scheme || 'http';
  my $host   = $url->ihost;
  my $port   = $url->port;
  if (my $proxy = $req->proxy) {
    $scheme = $proxy->scheme;
    $host   = $proxy->ihost;
    $port   = $proxy->port;
  }

  # Default port
  $port ||= $scheme eq 'https' ? 443 : 80;

  ($scheme, $host, $port);
}

# "Once the government approves something, it's no longer immoral!"
sub _upgrade {
  my ($self, $id) = @_;

  # No upgrade request
  my $c   = $self->{_cs}->{$id};
  my $old = $c->{tx};
  return unless $old->req->headers->upgrade;

  # Handshake failed
  my $res = $old->res;
  return unless ($res->code || '') eq '101';

  # Upgrade to WebSocket transaction
  my $new = Mojo::Transaction::WebSocket->new(handshake => $old, masked => 1);
  $new->kept_alive($old->kept_alive);

  # WebSocket challenge
  $res->error('WebSocket challenge failed.') and return
    unless $new->client_challenge;
  $c->{tx} = $new;

  # Upgrade connection timeout
  $self->{_loop}->connection_timeout($id, $self->websocket_timeout);

  # Resume callback
  weaken $self;
  $new->on_resume(sub { $self->_write($id) });

  $new;
}

# "Oh well. At least we'll die doing what we love: inhaling molten rock."
sub _write {
  my ($self, $id) = @_;

  # Writing
  return unless my $c  = $self->{_cs}->{$id};
  return unless my $tx = $c->{tx};
  return unless $tx->is_writing;

  # Get chunk
  my $chunk = $c->{tx}->client_write;

  # Still writing
  my $cb;
  if ($tx->is_writing) {
    weaken $self;
    $cb = sub { $self->_write($id) };
  }

  # Write
  $self->{_loop}->write($id, $chunk, $cb);

  # Finish
  $self->_handle($id) if $tx->is_done;
  warn "> $chunk\n"   if DEBUG;
}

1;
__END__