POE::Component::Server::Bayeux::Request - A single Bayeux request


POE-Component-Server-Bayeux documentation Contained in the POE-Component-Server-Bayeux distribution.

Index


Code Index:

NAME

Top

POE::Component::Server::Bayeux::Request - A single Bayeux request

DESCRIPTION

Top

Objects in this class represent a single Bayeux request made to a Bayeux server. Requests are instantiated with an HTTP::Request and HTTP::Response object. This class is responsible for parsing the request content into a JSON object, creating one or more POE::Component::Server::Bayeux::Message objects that represent the possible message types of the Bayeux protocol, and handling each one in turn.

CLASS METHODS

Top

new ()

Requires 'request' (POE::Component::Server::HTTP::Request), 'response' (POE::Component::Server::HTTP::Response), and 'server_heap'. Returns init()'ed class object.

OBJECT METHODS

Top

handle ()

Call after creating the request. Calls the pre_handle(), handle() methods on each message, possibly completing the request.

complete ()

Completes the request, calling the post_handle() method on the messages that need it.

PRIVATE METHODS

Top

These methods are mainly called by messages during their handle() phase.

client ($id)

Returns a POE::Component::Server::Bayeux::Client object with the given id.

add_response ($response)

Adds a message response onto the stack of responses.

clear_stack ()

Clears all messages and responses.

add_post_handle ($message)

Adds a message to be handled in the post_handle() code.

init ()

Parses the POE::Component::Server::HTTP::Request object, extracting the JSON payload, creating a stack of POE::Component::Server::Bayeux::Message messages.

error ($message)

Convienence method to throw an error, returning to the client.

form_response (@messages)

Encodes the messages into the payload of the response

logger ()

Returns the server's logger.

subscribe ($client_id, $channel)

Passthru to the POE server's subscribe state

unsubscribe ($client_id, $channel)

Passthru to the POE server's unsubscribe state

publish ($client_id, $channel, $data)

Passthru to the POE server's publish state

COPYRIGHT

Top

AUTHOR

Top

Eric Waters <ewaters@uarc.com>


POE-Component-Server-Bayeux documentation Contained in the POE-Component-Server-Bayeux distribution.
package POE::Component::Server::Bayeux::Request;

use strict;
use warnings;
use HTTP::Status; # for RC_OK
use HTTP::Request::Common;
use CGI::Simple::Util qw(unescape);
use JSON::Any qw(XS);
use Data::UUID;
use Params::Validate;

use POE qw(Component::Server::Bayeux::Message::Factory);

use base qw(Class::Accessor);
__PACKAGE__->mk_accessors(qw(
    id
    is_complete
    is_error
    ip
    http_request
    http_response
    json_response
    messages
    responses
    heap
    delay
    post_handle
    json_comment_filtered
));

## Class Globals ###

my $json = JSON::Any->new();
my $uuid = Data::UUID->new();

## Class Methods ###

sub new {
    my $class = shift;

    my %args = validate(@_, {
        request => 1,
        response => 1,
        server_heap => 1,
        ip => 0,
    });

    my %self = (
        http_request => $args{request},
        http_response => $args{response},
        heap => $args{server_heap},
        messages => [],
        responses => [],
        id => $uuid->create_str,
        ip => $args{ip},
    );

    my $self = bless \%self, $class;
    $self->init();
    return $self;
}

## Object Methods, Public ###

sub handle {
    my ($self) = @_;

    my $heap = $self->heap;

    # Some messages (/meta/connect, for example) need to be handled in a specific
    # order.  Allow each message to affect the queueing.
    foreach my $message (@{ $self->messages }) {
        $message->pre_handle();
    }

    # Starting at the beginning of the message array, process each message in
    # turn.  Messages will interact with the Request $self object, adding responses
    # and in some cases affecting other messages still in the stack.

    while (my $message = shift @{ $self->messages }) {
        $message->handle();
    }

    if ($self->delay) {
        $poe_kernel->post($heap->{manager}, 'delay_request', $self->id, $self->delay);
        $self->delay(0);
        $self->is_complete(0);
        $self->http_response->streaming(1);
    }
    else {
        $self->complete();
    }
}

sub complete {
    my ($self) = @_;

    $self->form_response( @{ $self->responses } );
    $self->is_complete(1);
    if ($self->http_response->streaming) {
        $self->http_response->send( $self->http_response );
        $self->http_response->close();
    }

    # Ensure no KeepAlive
    $self->http_request->header(Connection => 'close');

    if ($self->post_handle) {
        while (my $message = shift @{ $self->post_handle }) {
            $message->post_handle();
        }
    }
}

## Object Methods, Private ###

sub client {
    my ($self, $id) = @_;

    return POE::Component::Server::Bayeux::Client->new(
        request => $self,
        id => $id,
        server_heap => $self->heap,
    );
}

sub add_response {
    my ($self, $response) = @_;

    push @{ $self->responses }, $response;
}

sub clear_stack {
    my ($self) = @_;

    $self->messages([]);
    $self->responses([]);
}

sub add_post_handle {
    my ($self, $message) = @_;

    push @{ $self->{post_handle} }, $message;
}

sub init {
    my ($self) = @_;

    my $request = $self->{http_request};
    my $response = $self->{http_response};

    ## Extract the JSON payload

    my $params;
    my $json_requests = [];

    my $payload;

    # Parse the content type string

    my $content_type = $request->content_type;

    # Support 'text/json; charset=UTF-8'
    my %content_type_opts;
    my @content_type_parts = split /\s*;\s*/, $content_type;
    if (int @content_type_parts > 1) {
        $content_type = shift @content_type_parts;
        foreach my $part (@content_type_parts) {
            my ($key, $value) = split /=/, $part;
            # May or may not be key/value pairs, and are case-sensitive
            $content_type_opts{$key} = $value;
        }
    }

    if ($content_type eq 'application/x-www-form-urlencoded') {
        # POST or GET
        if (my $content = $request->content) {
            $params = $content;
        }
        elsif ($request->uri =~ m!\?message=!) {
            ($params) = $request->uri =~ m/\?(.*)/;
        }

        if (! $params) {
            return $self->error("No content found in HTTP request (content type '$content_type')");
        }

        # Decode the urlencoded key-value pairs
        my %content;
        foreach my $pair (split /&/, $params) {
            my ($key, $value) = split /=/, $pair, 2;
            next unless $key && $value;
            $content{ unescape($key) } = unescape($value);
        }

        if (! $content{message}) {
            return $self->error("No 'message' key pair found in content");
        }
        $payload = $content{message};
    }
    elsif ($content_type eq 'text/json') {
        $payload = $request->content;
    }
    else {
        return $self->error("Unsupported connection content-type '$content_type'");
    }

    # Decode the payload
    eval {
        $json_requests = $json->decode($payload);
    };
    if ($@) {
        return $self->error("Failed to decode JSON payload: $@" );
    }
    if (! $json_requests || ! ref $json_requests) {
        return $self->error("Invalid JSON payload; must be array or object");
    }
    if (ref $json_requests eq 'HASH') {
        $json_requests = [ $json_requests ];
    }

    $self->logger->debug("New remote request from ".$self->ip.", id ".$self->id.":", $json_requests);

    foreach my $message (@$json_requests) {
        my $bayeux_message;
        eval {
            $bayeux_message = POE::Component::Server::Bayeux::Message::Factory->create(
                request => $self,
                data => $message,
            );
        };
        if ($@ || ! $bayeux_message
            || $bayeux_message->isa('POE::Component::Server::Bayeux::Message::Invalid')
        ) {
            $self->error("Invalid message found" . ($@ ? " ($@)" : ''));
            $self->logger->debug("Remote request was invalid");
            last;
        }
        else {
            push @{ $self->{messages} }, $bayeux_message;
        }
    }
}

sub error {
    my ($self, $error) = @_;

    return if $self->is_error;

    $self->form_response(
        {
            error => $error,
            successful => JSON::XS::false,
        }
    );

    $self->is_error(1);
    $self->is_complete(1);
}

sub form_response {
    my ($self, @message) = @_;

    my $response = $self->http_response;
    $self->json_response( \@message );
    my $encoded = $json->encode( \@message );

    my $type = 'text/json';
    if ($self->json_comment_filtered) {
        $encoded = "/*$encoded*/";
        $type = 'text/json-comment-filtered';
    }

    $response->header( 'Content-Type' => "$type; charset=utf-8" );
    $response->code(RC_OK);
    $response->content( $encoded );
}

sub logger {
    my ($self) = @_;

    return $self->heap->{logger};
}

## POE passthru methods

sub subscribe {
    my ($self, $client_id, $channel) = @_;
    $poe_kernel->post($self->heap->{manager}, 'subscribe', {
        client_id => $client_id,
        channel => $channel,
    });
}

sub unsubscribe {
    my ($self, $client_id, $channel) = @_;
    $poe_kernel->post($self->heap->{manager}, 'unsubscribe', {
        client_id => $client_id,
        channel => $channel,
    });
}

sub publish {
    my ($self, $client_id, $channel, $data) = @_;
    $poe_kernel->post($self->heap->{manager}, 'publish', {
        client_id => $client_id,
        channel => $channel,
        data => $data,
    });
}

1;