/usr/local/CPAN/MogileFS-Client/MogileFS/Backend.pm


package MogileFS::Backend;

use strict;
no strict 'refs';

use Carp;
use IO::Socket::INET;
use Socket qw( MSG_NOSIGNAL PF_INET IPPROTO_TCP SOCK_STREAM );
use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
use POSIX ();
use MogileFS::Client;

use fields ('hosts',        # arrayref of "$host:$port" of mogilefsd servers
            'host_dead',    # "$host:$port" -> $time  (of last connect failure)
            'lasterr',      # string: \w+ identifier of last error
            'lasterrstr',   # string: english of last error
            'sock_cache',   # cached socket to mogilefsd tracker
            'pref_ip',      # hashref; { ip => preferred ip }
            'timeout',      # time in seconds to allow sockets to become readable
            'last_host_connected',  # "ip:port" of last host connected to
            'hooks',        # hash: hookname -> coderef
            );

use vars qw($FLAG_NOSIGNAL $PROTO_TCP);
eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };

sub new {
    my MogileFS::Backend $self = shift;
    $self = fields::new($self) unless ref $self;

    return $self->_init(@_);
}

sub reload {
    my MogileFS::Backend $self = shift;
    return undef unless $self;

    return $self->_init(@_);
}

sub _init {
    my MogileFS::Backend $self = shift;

    my %args = @_;

    # FIXME: add actual validation
    {
        $self->{hosts} = $args{hosts} or
            _fail("constructor requires parameter 'hosts'");

        _fail("'hosts' argument must be an arrayref")
            unless ref $self->{hosts} eq 'ARRAY';

        _fail("'hosts' argument must be of form: 'host:port'")
            if grep(! /:\d+$/, @{$self->{hosts}});

        _fail("'timeout' argument must be a number")
            if $args{timeout} && $args{timeout} !~ /^\d+$/;
        $self->{timeout} = $args{timeout} || 3;
    }

    $self->{host_dead} = {};

    return $self;
}

sub run_hook {
    my MogileFS::Backend $self = shift;
    my $hookname = shift || return;

    my $hook = $self->{hooks}->{$hookname};
    return unless $hook;

    eval { $hook->(@_) };

    warn "MogileFS::Backend hook '$hookname' threw error: $@\n" if $@;
}

sub add_hook {
    my MogileFS::Backend $self = shift;
    my $hookname = shift || return;

    if (@_) {
        $self->{hooks}->{$hookname} = shift;
    } else {
        delete $self->{hooks}->{$hookname};
    }
}

sub set_pref_ip {
    my MogileFS::Backend $self = shift;
    $self->{pref_ip} = shift;
    $self->{pref_ip} = undef
        unless $self->{pref_ip} &&
               ref $self->{pref_ip} eq 'HASH';
}

sub _wait_for_readability {
    my ($fileno, $timeout) = @_;
    return 0 unless $fileno && $timeout;

    my $rin = '';
    vec($rin, $fileno, 1) = 1;
    # FIXME: signals/ptrace attach can interrupt the select.  we should resume selecting
    # and keep track of hires time remaining
    my $nfound = select($rin, undef, undef, $timeout);

    # undef/0 are failure, 1 is success
    return $nfound ? 1 : 0;
}

sub do_request {
    my MogileFS::Backend $self = shift;
    my ($cmd, $args) = @_;

    _fail("invalid arguments to do_request")
        unless $cmd && $args;

    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;

    my $sock = $self->{sock_cache};
    my $argstr = _encode_url_string(%$args);
    my $req = "$cmd $argstr\r\n";
    my $reqlen = length($req);
    my $rv = 0;

    if ($sock) {
        # try our cached one, but assume it might be bogus
        $self->run_hook('do_request_start', $cmd, $self->{last_host_connected});
        _debug("SOCK: cached = $sock, REQ: $req");
        $rv = send($sock, $req, $FLAG_NOSIGNAL);
        if ($! || ! defined $rv) {
            # undef is error, but $! may not be populated, we've found
            $self->run_hook('do_request_send_error', $cmd, $self->{last_host_connected});
            undef $self->{sock_cache};
        } elsif ($rv != $reqlen) {
            $self->run_hook('do_request_length_mismatch', $cmd, $self->{last_host_connected});
            return _fail("send() didn't return expected length ($rv, not $reqlen)");
        }
    }

    unless ($rv) {
        $sock = $self->_get_sock
            or return _fail("couldn't connect to mogilefsd backend");
        $self->run_hook('do_request_start', $cmd, $self->{last_host_connected});
        _debug("SOCK: $sock, REQ: $req");
        $rv = send($sock, $req, $FLAG_NOSIGNAL);
        if ($!) {
            $self->run_hook('do_request_send_error', $cmd, $self->{last_host_connected});
            return _fail("error talking to mogilefsd tracker: $!");
        } elsif ($rv != $reqlen) {
            $self->run_hook('do_request_length_mismatch', $cmd, $self->{last_host_connected});
            return _fail("send() didn't return expected length ($rv, not $reqlen)");
        }
        $self->{sock_cache} = $sock;
    }

    # wait up to 3 seconds for the socket to come to life
    unless (_wait_for_readability(fileno($sock), $self->{timeout})) {
        close($sock);
        $self->run_hook('do_request_read_timeout', $cmd, $self->{last_host_connected});
        undef $self->{sock_cache};
        return _fail("timed out after $self->{timeout}s against $self->{last_host_connected} when sending command: [$req]");
    }

    # guard against externally-modified $/ changes.  patch from
    # Andreas J. Koenig.  in practice nobody should do this, though,
    # and this line should be unnecessary.
    local $/ = "\n";

    my $line = <$sock>;

    $self->run_hook('do_request_finished', $cmd, $self->{last_host_connected});

    _debug("RESPONSE: $line");

    unless (defined $line) {
        undef $self->{sock_cache};
        return _fail("socket closed on read");
    }

    # ERR <errcode> <errstr>
    if ($line =~ /^ERR\s+(\w+)\s*(\S*)/) {
        $self->{'lasterr'} = $1;
        $self->{'lasterrstr'} = $2 ? _unescape_url_string($2) : undef;
        _debug("LASTERR: $1 $2");
        return undef;
    }

    # OK <arg_len> <response>
    if ($line =~ /^OK\s+\d*\s*(\S*)/) {
        my $args = _decode_url_string($1);
        _debug("RETURN_VARS: ", $args);
        return $args;
    }

    undef $self->{sock_cache};
    _fail("invalid response from server: [$line]");
    return undef;
}

sub errstr {
    my MogileFS::Backend $self = shift;
    return unless $self->{'lasterr'};
    return join(" ", $self->{'lasterr'}, $self->{'lasterrstr'});
}

sub errcode {
    my MogileFS::Backend $self = shift;
    return $self->{lasterr};
}

sub last_tracker {
    my $self = shift;
    return $self->{last_host_connected};
}

sub err {
    my MogileFS::Backend $self = shift;
    return $self->{lasterr} ? 1 : 0;
}

################################################################################
# MogileFS::Backend class methods
#

sub _fail {
    croak "MogileFS::Backend: $_[0]";
}

*_debug = *MogileFS::Client::_debug;

sub _connect_sock { # sock, sin, timeout
    my ($sock, $sin, $timeout) = @_;
    $timeout ||= 0.25;

    # make the socket non-blocking for the connection if wanted, but
    # unconditionally set it back to blocking mode at the end

    if ($timeout) {
        IO::Handle::blocking($sock, 0);
    } else {
        IO::Handle::blocking($sock, 1);
    }

    my $ret = connect($sock, $sin);

    if (!$ret && $timeout && $!==EINPROGRESS) {

        my $win='';
        vec($win, fileno($sock), 1) = 1;

        if (select(undef, $win, undef, $timeout) > 0) {
            $ret = connect($sock, $sin);
            # EISCONN means connected & won't re-connect, so success
            $ret = 1 if !$ret && $!==EISCONN;
        }
    }

    # turn blocking back on, as we expect to do blocking IO on our sockets
    IO::Handle::blocking($sock, 1) if $timeout;

    return $ret;
}

sub _sock_to_host { # (host)
    my MogileFS::Backend $self = shift;
    my $host = shift;

    # create a socket and try to do a non-blocking connect
    my ($ip, $port) = $host =~ /^(.*):(\d+)$/;
    my $sock = "Sock_$host";
    my $connected = 0;
    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
    my $sin;

    # try preferred ips
    if ($self->{pref_ip} && (my $prefip = $self->{pref_ip}->{$ip})) {
        _debug("using preferred ip $prefip over $ip");
        socket($sock, PF_INET, SOCK_STREAM, $proto);
        $sin = Socket::sockaddr_in($port, Socket::inet_aton($prefip));
        if (_connect_sock($sock, $sin, 0.1)) {
            $connected = 1;
            $self->{last_host_connected} = "$prefip:$port";
        } else {
            _debug("failed connect to preferred ip $prefip");
            close $sock;
        }
    }

    # now try the original ip
    unless ($connected) {
        socket($sock, PF_INET, SOCK_STREAM, $proto);
        my $aton_ip = Socket::inet_aton($ip)
            or return undef;
        $sin = Socket::sockaddr_in($port, $aton_ip);
        return undef unless _connect_sock($sock, $sin);
        $self->{last_host_connected} = $host;
    }

    # just throw back the socket we have so far
    return $sock;
}

# return a new mogilefsd socket, trying different hosts until one is found,
# or undef if they're all dead
sub _get_sock {
    my MogileFS::Backend $self = shift;
    return undef unless $self;

    my $size = scalar(@{$self->{hosts}});
    my $tries = $size > 15 ? 15 : $size;
    my $idx = int(rand() * $size);

    my $now = time();
    my $sock;
    foreach (1..$tries) {
        my $host = $self->{hosts}->[$idx++ % $size];

        # try dead hosts every 5 seconds
        next if $self->{host_dead}->{$host} &&
                $self->{host_dead}->{$host} > $now - 5;

        last if $sock = $self->_sock_to_host($host);

        # mark sock as dead
        _debug("marking host dead: $host @ $now");
        $self->{host_dead}->{$host} = $now;
    }

    return $sock;
}

sub _escape_url_string {
    my $str = shift;
    $str =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
    $str =~ tr/ /+/;
    return $str;
}

sub _unescape_url_string {
    my $str = shift;
    $str =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
    $str =~ tr/+/ /;
    return $str;
}

sub _encode_url_string {
    my %args = @_;
    return "" unless %args;
    return join("&",
                map { _escape_url_string($_) . '=' .
                      _escape_url_string($args{$_}) }
                grep { defined $args{$_} } keys %args
                );
}

sub _decode_url_string {
    my $arg = shift;
    my $buffer = ref $arg ? $arg : \$arg;
    my $hashref = {};  # output hash

    my $pair;
    my @pairs = split(/&/, $$buffer);
    my ($name, $value);
    foreach $pair (@pairs) {
        ($name, $value) = split(/=/, $pair);
        $value =~ tr/+/ /;
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $name =~ tr/+/ /;
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $hashref->{$name} .= $hashref->{$name} ? "\0$value" : $value;
    }

    return $hashref;
}

1;