AnyEvent::MP::Transport - actual transport protocol handler


AnyEvent-MP documentation Contained in the AnyEvent-MP distribution.

Index


Code Index:

NAME

Top

AnyEvent::MP::Transport - actual transport protocol handler

SYNOPSIS

Top

   use AnyEvent::MP::Transport;

DESCRIPTION

Top

This module implements (and documents) the actual transport protocol for AEMP.

See the "PROTOCOL" section below if you want to write another client for this protocol.

FUNCTIONS/METHODS

Top

$listener = mp_listener $host, $port, <constructor-args>

Creates a listener on the given host/port using AnyEvent::Socket::tcp_server.

See new, below, for constructor arguments.

Defaults for peerhost, peerport and fh are provided.

$guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
new AnyEvent::MP::Transport
   # immediately starts negotiation
   my $transport = new AnyEvent::MP::Transport
      # mandatory
      fh       => $filehandle,
      local_id => $identifier,
      on_recv  => sub { receive-callback },
      on_error => sub { error-callback },

      # optional
      on_eof   => sub { clean-close-callback },
      on_connect => sub { successful-connect-callback },
      greeting => { key => value },

      # tls support
      tls_ctx  => AnyEvent::TLS,
      peername => $peername, # for verification
   ;

PROTOCOL

Top

The AEMP protocol is comparatively simple, and consists of three phases which are symmetrical for both sides: greeting (followed by optionally switching to TLS mode), authentication and packet exchange.

The protocol is designed to allow both full-text and binary streams.

The greeting consists of two text lines that are ended by either an ASCII CR LF pair, or a single ASCII LF (recommended).

GREETING

All the lines until after authentication must not exceed 4kb in length, including line delimiter. Afterwards there is no limit on the packet size that can be received.

First Greeting Line

Example:

   aemp;0;rain;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;timeout=12;peeraddr=10.0.0.1:48082

The first line contains strings separated (not ended) by ; characters. The first five strings are fixed by the protocol, the remaining strings are KEY=VALUE pairs. None of them may contain ; characters themselves (when escaping is needed, use %3b to represent ; and %25 to represent %)-

The fixed strings are:

protocol identification

The constant aemp to identify this protocol.

protocol version

The protocol version supported by this end, currently 1. If the versions don't match then no communication is possible. Minor extensions are supposed to be handled through additional key-value pairs.

the node ID

This is the node ID of the connecting node.

the acceptable authentication methods

A comma-separated list of authentication methods supported by the node. Note that AnyEvent::MP supports a hex_secret authentication method that accepts a clear-text password (hex-encoded), but will not use this authentication method itself.

The receiving side should choose the first authentication method it supports.

the acceptable framing formats

A comma-separated list of packet encoding/framing formats understood. The receiving side should choose the first framing format it supports for sending packets (which might be different from the format it has to accept).

The remaining arguments are KEY=VALUE pairs. The following key-value pairs are known at this time:

provider=<module-version>

The software provider for this implementation. For AnyEvent::MP, this is AE-0.0 or whatever version it currently is at.

peeraddr=<host>:<port>

The peer address (socket address of the other side) as seen locally.

tls=<major>.<minor>

Indicates that the other side supports TLS (version should be 1.0) and wishes to do a TLS handshake.

Second Greeting Line

After this greeting line there will be a second line containing a cryptographic nonce, i.e. random data of high quality. To keep the protocol text-only, these are usually 32 base64-encoded octets, but it could be anything that doesn't contain any ASCII CR or ASCII LF characters.

The two nonces must be different, and an aemp implementation must check and fail when they are identical.

Example of a nonce line (yes, it's random-looking because it is random data):

   2XYhdG7/O6epFa4wuP0ujAEx1rXYWRcOypjUYK7eF6yWAQr7gwIN9m/2+mVvBrTPXz5GJDgfGm9d8QRABAbmAP/s

TLS handshake

If, after the handshake, both sides indicate interest in TLS, then the connection must use TLS, or fail to continue.

Both sides compare their nonces, and the side who sent the lower nonce value ("string" comparison on the raw octet values) becomes the client, and the one with the higher nonce the server.

AUTHENTICATION PHASE

After the greeting is received (and the optional TLS handshake), the authentication phase begins, which consists of sending a single ;-separated line with three fixed strings and any number of KEY=VALUE pairs.

The three fixed strings are:

the authentication method chosen

This must be one of the methods offered by the other side in the greeting.

Note that all methods starting with tls_ are only valid iff TLS was successfully handshaked (and to be secure the implementation must enforce this).

The currently supported authentication methods are:

cleartext

This is simply the shared secret, lowercase-hex-encoded. This method is of course very insecure if TLS is not used (and not completely secure even if TLS is used), which is why this module will accept, but not generate, cleartext auth replies.

hmac_md6_64_256

This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash, and requires a shared secret. It is the preferred auth method when a shared secret is available.

First, the shared secret is hashed with MD6:

   key = MD6 (secret)

This secret is then used to generate the "local auth reply", by taking the two local greeting lines and the two remote greeting lines (without line endings), appending \012 to all of them, concatenating them and calculating the MD6 HMAC with the key:

   lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"

This authentication token is then lowercase-hex-encoded and sent to the other side.

Then the remote auth reply is generated using the same method, but local and remote greeting lines swapped:

   rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"

This is the token that is expected from the other side.

tls_anon

This type is only valid iff TLS was enabled and the TLS handshake was successful. It has no authentication data, as the server/client certificate was successfully verified.

This authentication type is somewhat insecure, as it allows a man-in-the-middle attacker to change some of the connection parameters (such as the framing format), although there is no known attack that exploits this in a way that is worse than just denying the service.

By default, this implementation accepts but never generates this auth reply.

tls_md6_64_256

This type is only valid iff TLS was enabled and the TLS handshake was successful.

This authentication type simply calculates:

   lauth = MD6 "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"

and lowercase-hex encodes the result and sends it as authentication data. No shared secret is required (authentication is done by TLS). The checksum exists only to make tinkering with the greeting hard.

the authentication data

The authentication data itself, usually base64 or hex-encoded data, see above.

the framing protocol chosen

This must be one of the framing protocols offered by the other side in the greeting. Each side must accept the choice of the other side, and generate packets in the format it chose itself.

Example of an authentication reply:

   hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json

DATA PHASE

After this, packets get exchanged using the chosen framing protocol. It is quite possible that both sides use a different framing protocol.

FULL EXAMPLE

This is an actual protocol dump of a handshake, followed by a single data packet. The greater than/less than lines indicate the direction of the transfer only.

   > aemp;0;anon/57Cs1CggVJjzYaQp13XXg4;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;provider=AE-0.8;timeout=12;peeraddr=10.0.0.17:4040
   > yLgdG1ov/02shVkVQer3wzeuywZK+oraTdEQBmIqWHaegxSGDG4g+HqogLQbvdypFOsoDWJ1Sh4ImV4DMhvUBwTK

   < aemp;0;ruth;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;provider=AE-0.8;timeout=12;peeraddr=10.0.0.1:37108
   < +xMQXP8ElfNmuvEhsmcp+s2wCJOuQAsPxSg3d2Ewhs6gBnJz+ypVdWJ/wAVrXqlIJfLeVS/CBy4gEGkyWHSuVb1L

   > hmac_md6_64_256;5ad913855742ae5a03a5aeb7eafa4c78629de136bed6acd73eea36c9e98df44a;json

   < hmac_md6_64_256;84cd590976f794914c2ca26dac3a207a57a6798b9171289c114de07cf0c20401;json
   < ["","AnyEvent::MP::_spawn","57Cs1CggVJjzYaQp13XXg4.c","AnyEvent::MP::Global::connect",0,"anon/57Cs1CggVJjzYaQp13XXg4"]
   ...

The shared secret in use was 8ugxrtw6H5tKnfPWfaSr4HGhE8MoJXmzTT1BWq7sLutNcD0IbXprQlZjIbl7MBKoeklG3IEfY9GlJthC0pENzk.

SIMPLE HANDSHAKE FOR NON-PERL NODES

Implementing the full set of options for handshaking can be a daunting task.

If security is not so important (because you only connect locally and control the host, a common case), and you want to interface with an AEMP node from another programming language, then you can also implement a simplified handshake.

For example, in a simple implementation you could decide to simply not check the authenticity of the other side and use cleartext authentication yourself. The the handshake is as simple as sending three lines of text, reading three lines of text, and then you can exchange JSON-formatted messages:

   aemp;1;<nodename>;hmac_md6_64_256;json
   <nonce>
   cleartext;<hexencoded secret>;json

The nodename should be unique within the network, preferably unique with every connection, the <nonce> could be empty or some random data, and the hexencoded secret would be the shared secret, in lowercase hex (e.g. if the secret is "geheim", the hex-encoded version would be "67656865696d").

Note that apart from the low-level handshake and framing protocol, there is a high-level protocol, e.g. for monitoring, building the mesh or spawning. All these messages are sent to the node port (the empty string) and can safely be ignored if you do not need the relevant functionality.

USEFUL HINTS

Since taking part in the global protocol to find port groups is nontrivial, hardcoding port names should be considered as well, i.e. the non-Perl node could simply listen to messages for a few well-known ports.

Alternatively, the non-Perl node could call a (already loaded) function in the Perl node by sending it a special message:

   ["", "Some::Function::name", "myownport", 1, 2, 3]

This would call the function Some::Function::name with the string myownport and some additional arguments.

MONITORING

Monitoring the connection itself is transport-specific. For TCP, all connection monitoring is currently left to TCP retransmit time-outs on a busy link, and TCP keepalive (which should be enabled) for idle connections.

This is not sufficient for listener-less nodes, however: they need to regularly send data (30 seconds, or the monitoring interval, is recommended), so TCP actively probes.

Future implementations of AnyEvent::Transport might query the kernel TCP buffer after a write timeout occurs, and if it is non-empty, shut down the connections, but this is an area of future research :)

NODE PROTOCOL

The transport simply transfers messages, but to implement a full node, a special node port must exist that understands a number of requests.

If you are interested in implementing this, drop us a note so we finish the documentation.

SEE ALSO

Top

AnyEvent::MP.

AUTHOR

Top

 Marc Lehmann <schmorp@schmorp.de>
 http://home.schmorp.de/


AnyEvent-MP documentation Contained in the AnyEvent-MP distribution.
package AnyEvent::MP::Transport;

use common::sense;

use Scalar::Util ();
use List::Util ();
use MIME::Base64 ();
use Storable ();
use JSON::XS ();

use Digest::MD6 ();
use Digest::HMAC_MD6 ();

use AE ();
use AnyEvent::Socket ();
use AnyEvent::Handle 4.92 ();

use AnyEvent::MP::Config ();

our $PROTOCOL_VERSION = 1;

our @HOOK_CONNECT;   # called at connect/accept time
our @HOOK_GREETING;  # called at greeting1 time
our @HOOK_CONNECTED; # called at data phase
our @HOOK_DESTROY;   # called at destroy time
our %HOOK_PROTOCOL = (
   "aemp-dataconn" => sub {
      require AnyEvent::MP::DataConn;
      &AnyEvent::MP::DataConn::_inject;
   },
);

sub mp_server($$;%) {
   my ($host, $port, %arg) = @_;

   AnyEvent::Socket::tcp_server $host, $port, sub {
      my ($fh, $host, $port) = @_;

      my $tp = new AnyEvent::MP::Transport
         fh       => $fh,
         peerhost => $host,
         peerport => $port,
         %arg,
      ;
      $tp->{keepalive} = $tp;
   }, delete $arg{prepare}
}

sub mp_connect {
   my $release = pop;
   my ($host, $port, @args) = @_;

   new AnyEvent::MP::Transport
      connect  => [$host, $port],
      peerhost => $host,
      peerport => $port,
      release  => $release,
      @args,
   ;
}

sub new {
   my ($class, %arg) = @_;

   my $self = bless \%arg, $class;

   {
      Scalar::Util::weaken (my $self = $self);

      my $config = $AnyEvent::MP::Kernel::CONFIG;

      my $timeout  = $config->{monitor_timeout};
      my $lframing = $config->{framing_format};
      my $auth_snd = $config->{auth_offer};
      my $auth_rcv = $config->{auth_accept};

      $self->{secret} = $config->{secret}
         unless exists $self->{secret};

      my $secret = $self->{secret};

      if (exists $config->{cert}) {
         $self->{tls_ctx} = {
            sslv2   => 0,
            sslv3   => 0,
            tlsv1   => 1,
            verify  => 1,
            cert    => $config->{cert},
            ca_cert => $config->{cert},
            verify_require_client_cert => 1,
         };
      }

      $self->{hdl} = new AnyEvent::Handle
         +($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
         autocork  => $config->{autocork},
         no_delay  => exists $config->{nodelay} ? $config->{nodelay} : 1,
         keepalive => 1,
         on_error  => sub {
            $self->error ($_[2]);
         },
         rtimeout  => $timeout,
      ;

      my $greeting_kv = $self->{local_greeting} ||= {};

      $greeting_kv->{tls}      = "1.0" if $self->{tls_ctx};
      $greeting_kv->{provider} = "AE-$AnyEvent::MP::VERSION"; # MP.pm might not be loaded, so best effort :(
      $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};

      my $protocol = $self->{protocol} || "aemp";

      # can modify greeting_kv
      $_->($self) for $protocol eq "aemp" ? @HOOK_CONNECT : ();

      # send greeting
      my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
                     . ";$AnyEvent::MP::Kernel::NODE"
                     . ";" . (join ",", @$auth_rcv)
                     . ";" . (join ",", @$lframing)
                     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);

      my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";

      $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");

      # expect greeting
      $self->{hdl}->rbuf_max (4 * 1024);
      $self->{hdl}->push_read (line => sub {
         my $rgreeting1 = $_[1];

         my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;

         $self->{remote_node} = $rnode;

         $self->{remote_greeting} = {
            map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
               @kv
         };

         # maybe upgrade the protocol
         if ($protocol eq "aemp" and $aemp =~ /^aemp-\w+$/) {
            # maybe check for existence of the protocol handler?
            $self->{protocol} = $protocol = $aemp;
         }

         $_->($self) for $protocol eq "aemp" ? @HOOK_GREETING : ();

         if ($aemp ne $protocol and $aemp ne "aemp") {
            return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'");
         } elsif ($version != $PROTOCOL_VERSION) {
            return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
         } elsif ($protocol eq "aemp") {
            if ($rnode eq $AnyEvent::MP::Kernel::NODE) {
               return $self->error ("I refuse to talk to myself");
            } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) {
               return $self->error ("$rnode already connected, not connecting again.");
            }
         }

         # read nonce
         $self->{hdl}->push_read (line => sub {
            my $rgreeting2 = $_[1];

            "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
               or return $self->error ("authentication error, echo attack?");

            my $tls = $self->{tls_ctx} && 1 == int $self->{remote_greeting}{tls};

            my $s_auth;
            for my $auth_ (split /,/, $auths) {
               if (grep $auth_ eq $_, @$auth_snd and ($auth_ !~ /^tls_/ or $tls)) {
                  $s_auth = $auth_;
                  last;
               }
            }

            defined $s_auth
               or return $self->error ("$auths: no common auth type supported");

            my $s_framing;
            for my $framing_ (split /,/, $framings) {
               if (grep $framing_ eq $_, @$lframing) {
                  $s_framing = $framing_;
                  last;
               }
            }

            defined $s_framing
               or return $self->error ("$framings: no common framing method supported");

            my $key;
            my $lauth;

            if ($tls) {
               $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
               $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
               return unless $self->{hdl}; # starttls might destruct us

               $lauth =
                  $s_auth eq "tls_anon"       ? ""
                : $s_auth eq "tls_md6_64_256" ? Digest::MD6::md6_hex "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012"
                : return $self->error ("$s_auth: fatal, selected unsupported snd auth method");

            } elsif (length $secret) {
               return $self->error ("$s_auth: fatal, selected unsupported snd auth method")
                  unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.

               $key = Digest::MD6::md6 $secret;
               # we currently only support hmac_md6_64_256
               $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;

            } else {
               return $self->error ("unable to handshake TLS and no shared secret configured");
            }

            $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");

            # read the authentication response
            $self->{hdl}->push_read (line => sub {
               my ($hdl, $rline) = @_;

               my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;

               my $rauth =
                  $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
                : $auth_method eq "cleartext"       ? unpack "H*", $secret
                : $auth_method eq "tls_anon"        ? ($tls ? "" : "\012\012") # \012\012 never matches
                : $auth_method eq "tls_md6_64_256"  ? ($tls ? Digest::MD6::md6_hex "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012" : "\012\012")
                : return $self->error ("$auth_method: fatal, selected unsupported rcv auth method");

               if ($rauth2 ne $rauth) {
                  return $self->error ("authentication failure/shared secret mismatch");
               }

               $self->{s_framing} = $s_framing;

               $hdl->rbuf_max (undef);

               # we rely on TCP retransmit timeouts and keepalives
               $self->{hdl}->rtimeout (undef);

               $self->{remote_greeting}{untrusted} = 1
                  if $auth_method eq "tls_anon";

               if ($protocol eq "aemp" and $self->{hdl}) {
                  # listener-less node need to continuously probe
                  unless (@$AnyEvent::MP::Kernel::LISTENER) {
                     $self->{hdl}->wtimeout ($timeout);
                     $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) });
                  }

                  # receive handling

                  my $push_write = $hdl->can ("push_write");
                  my $push_read  = $hdl->can ("push_read");

                  if ($s_framing eq "json") {
                     $self->{send} = sub {
                        $push_write->($hdl, JSON::XS::encode_json $_[0]);
                     };
                  } else {
                     $self->{send} = sub {
                        $push_write->($hdl, $s_framing => $_[0]);
                     };
                  }

                  if ($r_framing eq "json") {
                     my $coder = JSON::XS->new->utf8;

                     $hdl->on_read (sub {
                        local $AnyEvent::MP::Kernel::SRCNODE = $self->{node};

                        AnyEvent::MP::Kernel::_inject (@$_)
                           for $coder->incr_parse (delete $_[0]{rbuf});

                        ()
                     });
                  } else {
                     my $rmsg; $rmsg = $self->{rmsg} = sub {
                        $push_read->($_[0], $r_framing => $rmsg);

                        local $AnyEvent::MP::Kernel::SRCNODE = $self->{node};
                        AnyEvent::MP::Kernel::_inject (@{ $_[1] });
                     };
                     eval {
                        $push_read->($_[0], $r_framing => $rmsg);
                     };
                     Scalar::Util::weaken $rmsg;
                     return $self->error ("$r_framing: unusable remote framing")
                        if $@;
                  }
               }

               $self->connected;
            });
         });
      });
   }

   $self
}

sub error {
   my ($self, $msg) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
   } else {
      $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d#

      $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
         if $self->{node} && $self->{node}{transport} == $self;
   }

   (delete $self->{release})->()
      if exists $self->{release};
   
#   $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg");
   $self->destroy;
}

sub connected {
   my ($self) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $self->{hdl}->on_error (undef);
      $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
   } else {
      $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");

      my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
      Scalar::Util::weaken ($self->{node} = $node);
      $node->transport_connect ($self);

      $_->($self) for @HOOK_CONNECTED;
   }

   (delete $self->{release})->()
      if exists $self->{release};
}

sub destroy {
   my ($self) = @_;

   (delete $self->{release})->()
      if exists $self->{release};

   $self->{hdl}->destroy
      if $self->{hdl};

   $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY;
}

sub DESTROY {
   my ($self) = @_;

   $self->destroy;
}

1