POE::Component::Client::NNTP::Tail
# Copyright (c) 2008 by David Golden. All rights reserved.
# Licensed under Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License was distributed with this file or you may obtain a
# copy of the License from http://www.apache.org/licenses/LICENSE-2.0
package POE::Component::Client::NNTP::Tail;
use 5.006;
use strict;
use warnings;
our $VERSION = '0.01';
$VERSION = eval $VERSION; ## no critic
use Carp::POE;
use Params::Validate;
use POE qw(Component::Client::NNTP);
my %spawn_args = (
# required
Group => 1,
NNTPServer => 1,
# optional with defaults
Interval => { default => 60 },
# purely optional
Port => 0,
LocalAddr => 0,
Alias => 0,
Debug => 0,
);
sub spawn {
my $class = shift;
my %opts = validate( @_, \%spawn_args );
POE::Session->create(
heap => \%opts,
package_states => [
# nntp component events
$class => {
nntp_connected => '_nntp_connected',
nntp_registered => '_nntp_registered',
nntp_socketerr => '_nntp_socketerr',
nntp_disconnected => '_nntp_disconnected',
nntp_200 => '_nntp_server_ready',
nntp_201 => '_nntp_server_ready',
nntp_211 => '_nntp_group_selected',
nntp_220 => '_nntp_got_article',
nntp_221 => '_nntp_got_head',
nntp_411 => '_nntp_no_group',
nntp_423 => '_nntp_no_article',
nntp_430 => '_nntp_no_article',
},
# session events
$class => [ qw( _start _stop _child ) ],
# internal events
$class => [ qw( _poll _reconnect ) ],
# API events
$class => [ qw( register unregister get_article shutdown ) ],
],
);
}
sub _debug {
my $where = (caller(1))[3];
$where =~ s{.*::}{P::C::C::N::T::};
my @args = @_[ARG0 .. $#_];
for ( @args ) {
$_ = 'undef' if not defined $_;
}
my $args = @args ? join( " " => "", (map { "'$_'" } @args), "" ) : "";
warn "$where->($args)\n";
return;
}
#--------------------------------------------------------------------------#
# session events
#--------------------------------------------------------------------------#
sub _start {
my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
&_debug if $heap->{Debug};
# alias defaults to group name if not otherwise set
$heap->{Alias} = $heap->{Group} unless exists $heap->{Alias};
$kernel->alias_set($heap->{Alias});
# setup NNTP including optional args if defined;
my %nntp_args;
for my $k ( qw/NNTPServer Port LocalAddr/ ) {
$nntp_args{$k} = $heap->{$k} if exists $heap->{$k};
}
my $alias = "NNTP-Client-" . $session->ID;
$heap->{nntp} = POE::Component::Client::NNTP->spawn($alias,\%nntp_args);
$heap->{nntp_id} = $heap->{nntp}->session_id;
# start NNTP connection
$kernel->yield( '_reconnect' );
return;
}
# ignore these
sub _child {
my ( $kernel, $heap ) = @_[KERNEL, HEAP];
&_debug if $heap->{Debug};
}
sub _stop {
my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
&_debug if $heap->{Debug};
$kernel->call( $session, 'shutdown' ) if $heap->{nntp};
return;
}
#--------------------------------------------------------------------------#
# events from our clients
#--------------------------------------------------------------------------#
#--------------------------------------------------------------------------#
# register -- [EVENT]
#
# EVENT - event to dispatch to the registered session on receipt of new
# headers; defaults to "new_header"
#--------------------------------------------------------------------------#
sub register {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
my ($event) = $_[ARG0] || 'new_header';
$kernel->refcount_increment( $sender->ID, __PACKAGE__ );
$heap->{listeners}{$sender->ID} = $event;
return;
}
#--------------------------------------------------------------------------#
# unregister --
#
# removes sender registration
#--------------------------------------------------------------------------#
sub unregister {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
$kernel->refcount_decrement( $sender->ID, __PACKAGE__ );
delete $heap->{listeners}{$sender->ID};
return;
}
#--------------------------------------------------------------------------#
# get_article -- ARTICLE_ID, EVENT
#
# request ARTICLE_ID be retrieved and returned via EVENT or 'got_article
# if not specified
#--------------------------------------------------------------------------#
sub get_article {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
my ($article_id, $return_event) = @_[ARG0, ARG1];
$return_event ||= 'got_article';
# store requesting session and desired return event
push @{$heap->{requests}{$article_id}}, [$sender, $return_event];
$kernel->post( $heap->{nntp_id} => article => $article_id );
return;
}
#--------------------------------------------------------------------------#
# shudown
#--------------------------------------------------------------------------#
sub shutdown {
my ( $kernel, $heap ) = @_[KERNEL, HEAP];
&_debug if $heap->{Debug};
# unregister anyone that didn't do it themselves
for my $listener ( keys %{ $heap->{listeners} } ) {
$kernel->refcount_decrement( $listener, __PACKAGE__ );
delete $heap->{listeners}{$listener};
}
$kernel->call( $heap->{nntp_id} => 'unregister' => 'all' );
$kernel->call( $heap->{nntp_id} => 'shutdown' );
delete $heap->{nntp};
$kernel->alias_remove($_) for $kernel->alias_list;
return;
}
#--------------------------------------------------------------------------#
# our internal events
#--------------------------------------------------------------------------#
# if connected, check for new messages, otherwise reconnect
sub _poll {
my ( $kernel, $heap ) = @_[KERNEL, HEAP];
&_debug if $heap->{Debug};
if ( $heap->{connected} ) {
$kernel->post( $heap->{nntp_id} => group => $heap->{Group} );
}
else {
$kernel->yield( '_reconnect' );
}
return;
}
# connect to NNTP server
sub _reconnect {
my ( $kernel, $heap ) = @_[KERNEL, HEAP];
&_debug if $heap->{Debug};
$kernel->post( $heap->{nntp_id} => 'connect' );
return;
}
#--------------------------------------------------------------------------#
# events from NNTP client
#--------------------------------------------------------------------------#
# ignore event
sub _nntp_registered {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
return;
}
# ignore event
sub _nntp_connected {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
return;
}
# if connection can't be made, wait for next poll period to try again
sub _nntp_socketerr {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
my ($error) = $_[ARG0];
warn "Socket error: $error\n";
$heap->{connected} = 0;
$kernel->delay( '_reconnect' => $heap->{Interval} );
return;
}
# if we time-out, just note it and wait for next poll to reconnect
sub _nntp_disconnected {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
$heap->{connected} = 0;
return;
}
# once connected, start polling loop
sub _nntp_server_ready {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
$heap->{connected} = 1;
$kernel->yield( '_poll' );
undef;
}
# if the group doesn't exist, then we shut ourselves down
sub _nntp_no_group {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
warn "No such newsgroup $heap->{Group}";
$kernel->yield( 'shutdown' );
return;
}
# if the article doesn't exist, warn about it
sub _nntp_no_article {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
warn "Couldnt find article in $heap->{Group}\n";
return;
}
# if there are new articles, request their headers
# also schedules the next check
sub _nntp_group_selected {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
my ($estimate,$first,$last,$group) = split( /\s+/, $_[ARG0] );
# first time, we won't know last_article, so skip to the end
if ( exists $heap->{last_article} ) {
# fetch new headers or articles only if people are listening
for my $article_id ( $heap->{last_article} + 1 .. $last ) {
if ( scalar keys %{ $heap->{listeners} } ) {
$kernel->post( $sender => head => $article_id );
}
}
}
$heap->{last_article} = $last;
$kernel->delay( '_poll' => $heap->{Interval} );
return;
}
# notify listeners of new header
sub _nntp_got_head {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
my ($response, $lines) = @_[ARG0, ARG1];
my ($article_id) = split " ", $response;
for my $who ( keys %{ $heap->{listeners} } ) {
$kernel->post( $who => $heap->{listeners}{$who} => $article_id, $lines );
}
return;
}
# return article to request queue
sub _nntp_got_article {
my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
&_debug if $heap->{Debug};
my ($response, $lines) = @_[ARG0, ARG1];
my ($article_id) = split " ", $response;
# dispatch for all entries in the request queue for this article
for my $request ( @{$heap->{requests}{$article_id}} ) {
my ($who, $event) = @$request;
$kernel->post( $who, $event, $article_id, $lines );
}
# clear the request queue
delete $heap->{requests}{$article_id};
return;
}
1;
__END__