POE::Component::Client::NNTP::Tail


POE-Component-Client-NNTP-Tail documentation Contained in the POE-Component-Client-NNTP-Tail distribution.

Index


Code Index:


POE-Component-Client-NNTP-Tail documentation Contained in the POE-Component-Client-NNTP-Tail distribution.

# Copyright (c) 2008 by David Golden. All rights reserved.
# Licensed under Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License was distributed with this file or you may obtain a 
# copy of the License from http://www.apache.org/licenses/LICENSE-2.0

package POE::Component::Client::NNTP::Tail;
use 5.006;
use strict;
use warnings;

our $VERSION = '0.01';
$VERSION = eval $VERSION; ## no critic

use Carp::POE;
use Params::Validate;
use POE qw(Component::Client::NNTP);

my %spawn_args = (
  # required
  Group         => 1,
  NNTPServer    => 1,
  # optional with defaults
  Interval      => { default => 60 },
  # purely optional
  Port          => 0,
  LocalAddr     => 0,
  Alias         => 0,
  Debug         => 0,
);

sub spawn {
  my $class = shift;
  my %opts = validate( @_, \%spawn_args );

  POE::Session->create(
    heap => \%opts, 
    package_states => [
      # nntp component events
      $class => { 
        nntp_connected    => '_nntp_connected',
        nntp_registered   => '_nntp_registered', 
        nntp_socketerr    => '_nntp_socketerr',
        nntp_disconnected => '_nntp_disconnected',
        nntp_200	        => '_nntp_server_ready',
        nntp_201          => '_nntp_server_ready',
        nntp_211	        => '_nntp_group_selected',
        nntp_220	        => '_nntp_got_article',
        nntp_221	        => '_nntp_got_head',
        nntp_411          => '_nntp_no_group',
        nntp_423          => '_nntp_no_article',
        nntp_430          => '_nntp_no_article',
      },
      # session events
      $class => [ qw( _start _stop _child  ) ],
      # internal events
      $class => [ qw( _poll _reconnect ) ],
      # API events
      $class => [ qw( register unregister get_article shutdown ) ],
    ],
  );
}
  
sub _debug {
  my $where = (caller(1))[3];
  $where =~ s{.*::}{P::C::C::N::T::};
  my @args = @_[ARG0 .. $#_];
  for ( @args ) { 
    $_ = 'undef' if not defined $_;
  }
  my $args = @args ? join( " " => "", (map { "'$_'" } @args), "" ) : ""; 
  warn "$where->($args)\n";
  return;
}

#--------------------------------------------------------------------------#
# session events
#--------------------------------------------------------------------------#

sub _start {
  my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
  &_debug if $heap->{Debug};

  # alias defaults to group name if not otherwise set
  $heap->{Alias} = $heap->{Group} unless exists $heap->{Alias};
  $kernel->alias_set($heap->{Alias});

  # setup NNTP including optional args if defined;
  my %nntp_args;
  for my $k ( qw/NNTPServer Port LocalAddr/ ) {
    $nntp_args{$k} = $heap->{$k} if exists $heap->{$k};
  }
  my $alias = "NNTP-Client-" . $session->ID;
  $heap->{nntp} = POE::Component::Client::NNTP->spawn($alias,\%nntp_args);
  $heap->{nntp_id} = $heap->{nntp}->session_id;

  # start NNTP connection
  $kernel->yield( '_reconnect' );
  return;
}

# ignore these
sub _child {
  my ( $kernel, $heap ) = @_[KERNEL, HEAP];
  &_debug if $heap->{Debug};
}

sub _stop {
  my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
  &_debug if $heap->{Debug};
  $kernel->call( $session, 'shutdown' ) if $heap->{nntp};
  return;
}

#--------------------------------------------------------------------------#
# events from our clients
#--------------------------------------------------------------------------#

#--------------------------------------------------------------------------#
# register -- [EVENT]
#
# EVENT - event to dispatch to the registered session on receipt of new
#         headers; defaults to "new_header"
#--------------------------------------------------------------------------#

sub register {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  my ($event) = $_[ARG0] || 'new_header';
  $kernel->refcount_increment( $sender->ID, __PACKAGE__ );
  $heap->{listeners}{$sender->ID} = $event;
  return;
}

#--------------------------------------------------------------------------#
# unregister -- 
#
# removes sender registration
#--------------------------------------------------------------------------#

sub unregister {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  $kernel->refcount_decrement( $sender->ID, __PACKAGE__ );
  delete $heap->{listeners}{$sender->ID};
  return;
}

#--------------------------------------------------------------------------#
# get_article -- ARTICLE_ID, EVENT
#
# request ARTICLE_ID be retrieved and returned via EVENT or 'got_article
# if not specified
#--------------------------------------------------------------------------#

sub get_article {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  my ($article_id, $return_event) = @_[ARG0, ARG1];
  $return_event ||= 'got_article';
  # store requesting session and desired return event
  push @{$heap->{requests}{$article_id}}, [$sender, $return_event];
  $kernel->post( $heap->{nntp_id} => article => $article_id );
  return;
}

#--------------------------------------------------------------------------#
# shudown
#--------------------------------------------------------------------------#

sub shutdown {
  my ( $kernel, $heap ) = @_[KERNEL, HEAP];
  &_debug if $heap->{Debug};
  # unregister anyone that didn't do it themselves
  for my $listener ( keys %{ $heap->{listeners} } ) {
    $kernel->refcount_decrement( $listener, __PACKAGE__ );
    delete $heap->{listeners}{$listener};
  }
  $kernel->call( $heap->{nntp_id} => 'unregister' => 'all' );
  $kernel->call( $heap->{nntp_id} => 'shutdown' );
  delete $heap->{nntp};
  $kernel->alias_remove($_) for $kernel->alias_list;
  return;
}

#--------------------------------------------------------------------------#
# our internal events
#--------------------------------------------------------------------------#

# if connected, check for new messages, otherwise reconnect
sub _poll {
  my ( $kernel, $heap ) = @_[KERNEL, HEAP];
  &_debug if $heap->{Debug};
  if ( $heap->{connected} ) {
    $kernel->post( $heap->{nntp_id} => group => $heap->{Group} );
  }
  else {
    $kernel->yield( '_reconnect' );
  }
  return;
}

# connect to NNTP server
sub _reconnect {
  my ( $kernel, $heap ) = @_[KERNEL, HEAP];
  &_debug if $heap->{Debug};
  $kernel->post( $heap->{nntp_id} => 'connect' );
  return;
}

#--------------------------------------------------------------------------#
# events from NNTP client
#--------------------------------------------------------------------------#

# ignore event
sub _nntp_registered {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  return;
}

# ignore event
sub _nntp_connected {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  return;
}

# if connection can't be made, wait for next poll period to try again
sub _nntp_socketerr {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  my ($error) = $_[ARG0];
  warn "Socket error: $error\n";
  $heap->{connected} = 0;
  $kernel->delay( '_reconnect' => $heap->{Interval} );
  return;
}

# if we time-out, just note it and wait for next poll to reconnect
sub _nntp_disconnected {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  $heap->{connected} = 0;
  return;
}

# once connected, start polling loop
sub _nntp_server_ready {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  $heap->{connected} = 1;
  $kernel->yield( '_poll' );
  undef;
}

# if the group doesn't exist, then we shut ourselves down
sub _nntp_no_group {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  warn "No such newsgroup $heap->{Group}";
  $kernel->yield( 'shutdown' );
  return;
}

# if the article doesn't exist, warn about it
sub _nntp_no_article {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  warn "Couldnt find article in $heap->{Group}\n";
  return;
}

# if there are new articles, request their headers
# also schedules the next check
sub _nntp_group_selected {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  my ($estimate,$first,$last,$group) = split( /\s+/, $_[ARG0] );

  # first time, we won't know last_article, so skip to the end
  if ( exists $heap->{last_article} ) {
    # fetch new headers or articles only if people are listening
    for my $article_id ( $heap->{last_article} + 1 .. $last ) {
      if ( scalar keys %{ $heap->{listeners} } ) {
        $kernel->post( $sender => head => $article_id );
      }
    }
  }
  $heap->{last_article} = $last;
  $kernel->delay( '_poll' => $heap->{Interval} );
  return;
}

# notify listeners of new header
sub _nntp_got_head {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  my ($response, $lines) = @_[ARG0, ARG1];
  my ($article_id) = split " ", $response;
  for my $who ( keys %{ $heap->{listeners} } ) {
    $kernel->post( $who => $heap->{listeners}{$who} => $article_id, $lines );
  }
  return;
}

# return article to request queue
sub _nntp_got_article {
  my ($kernel, $heap, $sender) = @_[KERNEL, HEAP, SENDER];
  &_debug if $heap->{Debug};
  my ($response, $lines) = @_[ARG0, ARG1];
  my ($article_id) = split " ", $response;
  # dispatch for all entries in the request queue for this article
  for my $request ( @{$heap->{requests}{$article_id}} ) {
    my ($who, $event) = @$request;
    $kernel->post( $who, $event, $article_id, $lines );
  }
  # clear the request queue
  delete $heap->{requests}{$article_id};
  return;
}
 
1;

__END__