/usr/local/CPAN/POE-Component-Server-FTP/POE/Component/Server/FTP/DataSession.pm


package POE::Component::Server::FTP::DataSession;

###########################################################################
### POE::Component::Server::FTP::DataSession
### L.M.Orchard (deus_x@pobox.com)
### David Davis (xantus@cpan.org)
###
### TODO:
### -- get rid of *_limit and use params instead
###
### Copyright (c) 2001 Leslie Michael Orchard.  All Rights Reserved.
### This module is free software; you can redistribute it and/or
### modify it under the same terms as Perl itself.
###
### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
###########################################################################

use strict;
use IO::Socket::INET;
use IO::Scalar;
use POE qw(Session Wheel::ReadWrite Filter::Stream Driver::SysRW Wheel::SocketFactory);
use Time::HiRes qw(time);

use Data::Dumper;

# Create a new DataSession

sub new {
	my ($type, $para, $opt) = @_;
	my $self = bless { }, $type;

	my $ses = POE::Session->create(
		 #options =>{ trace=>1 },
		 args => [ $para, $opt ],
		 object_states => [
			$self => {
				_start			=> '_start',
				_stop			=> '_stop',

				_drop			=> '_drop',
				start_LIST		=> 'start_LIST',
				start_NLST		=> 'start_NLST',
				start_STOR		=> 'start_STOR',
				start_RETR		=> 'start_RETR',

				execute			=> 'execute',
				data_send		=> 'data_send',

				data_receive	=> 'data_receive',
				data_flushed	=> 'data_flushed',
				data_error		=> 'data_error',
				data_throttle	=> 'data_throttle',
				data_resume		=> 'data_resume',

				stop_socket		=> 'stop_socket',

				_sock_up		=> '_sock_up',
				_sock_down		=> '_sock_down',

				send_stats		=> 'send_stats',
			}
		],
	);

	return $ses->ID;
}

sub _start {
	my ($kernel, $heap, $para, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];

# generating a port num
#	my $x = pack('n',$port);
#	my $p1 = ord(substr($x,0,1));
#	my $p2 = ord(substr($x,1,1));

	$heap->{send_recv_okay} = 0;
	$heap->{listening} = 0;
	$heap->{rest} = 0;
	$heap->{total_bytes} = 0;
	$heap->{bps} = 0;
	$heap->{send_done} = 0;
	$heap->{type} = 'dl'; # default to download
	$heap->{c_session} = $_[SENDER]->ID;
	%{$heap->{params}} = %{$para};

	if ($opt->{data_port}) {
		$kernel->call($heap->{c_session} => _write_log => 4 => "starting a PORT data session");
		# PORT command
		my ($h1, $h2, $h3, $h4, $p1, $p2) = split(',', $opt->{data_port});

		my $peer_addr = $h1.".".$h2.".".$h3.".".$h4;
		$heap->{port} = ($p1<<8)+$p2;
		$heap->{remote_ip} = $peer_addr;

		$heap->{data} = POE::Wheel::SocketFactory->new(
			SocketDomain => AF_INET,
			SocketType => SOCK_STREAM,
			SocketProtocol => 'tcp',
			RemoteAddress => $peer_addr,
			RemotePort => $heap->{port},
			SuccessEvent => '_sock_up',
			FailureEvent => '_sock_down',
		);

		$heap->{cmd} = $opt->{cmd};
		$heap->{rest} = $opt->{rest} if ($opt->{rest});
		$heap->{filename} = $opt->{filename};
		$heap->{file_path} = $opt->{fs}->{file_path};
	} else {
		$kernel->call($heap->{c_session} => _write_log => 4 => "starting a PASV data session");
		# PASV command
		$heap->{port} = ($opt->{port1}<<8)+$opt->{port2};

		$heap->{data} = POE::Wheel::SocketFactory->new(
			BindAddress    => INADDR_ANY, # Sets the bind() address
			BindPort       => $heap->{port}, # Sets the bind() port
			SuccessEvent   => '_sock_up', # Event to emit upon accept()
			FailureEvent   => '_sock_down', # Event to emit upon error
			SocketDomain   => AF_INET, # Sets the socket() domain
			SocketType     => SOCK_STREAM, # Sets the socket() type
			SocketProtocol => 'tcp', # Sets the socket() protocol
			Reuse          => 'off', # Lets the port be reused
		);

		$heap->{listening} = 1;
		# the command is issued on the next call via
		# a direct post to our session
	}

	$heap->{filesystem} = $opt->{fs};
	$heap->{block_size} = 8 * 1024;
	$heap->{opt} = $opt->{opt};
}

sub _sock_up {
	my ($kernel, $heap, $session, $socket) = @_[KERNEL, HEAP, SESSION, ARG0];

	my $buffer_max = 4 * 1024;
	my $buffer_min = 128;

	$heap->{data} = POE::Wheel::ReadWrite->new(
		Handle			=> $socket,
		Driver			=> POE::Driver::SysRW->new(),
		Filter			=> POE::Filter::Stream->new(),
		InputEvent		=> 'data_receive',
		ErrorEvent		=> 'data_error',
		FlushedEvent	=> 'data_flushed',
		HighMark		=> $buffer_max,
		LowMark			=> $buffer_min,
		HighEvent		=> 'data_throttle',
		LowEvent		=> 'data_resume',
	);

	my ($port, $ip) = (sockaddr_in(getsockname($socket)));
	$heap->{remote_ip} = inet_ntoa($ip);
	$heap->{remote_port} = $port;
	
	$kernel->call($heap->{params}{'Alias'}, notify => ftpd_dcon_connected => {
		dcon_session => $session->ID,
		con_session => $heap->{c_session},
		remote_ip => $heap->{remote_ip},
		port => $heap->{remote_port},
	});

	if ($heap->{listening} == 0) {
		$kernel->call($heap->{c_session} => _write_log => 4 => "data session started for $heap->{cmd} ($heap->{opt})");
		$kernel->yield('start_'.(uc $heap->{cmd}), $heap->{opt});
	} else {
		# TODO check if correct IP connected if that option is on
		$kernel->call($heap->{c_session} => _write_log => 4 => "received connection from $heap->{remote_ip}");
	}
}

sub _sock_down {
	my ($kernel, $heap) = @_[KERNEL, HEAP];
	$kernel->call($heap->{c_session} => _write_log => 4 => "socket down");
	delete $heap->{data};
}

sub send_stats {
	my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];

	$kernel->call($heap->{params}{'Alias'}, notify => ftpd_bps_stats => {
		type => $heap->{type},
		bps => $heap->{bps},
		session => $session->ID,
		con_session => $heap->{c_session},
		remote_ip => $heap->{remote_ip},
		remote_port => $heap->{remote_port},
		xfer_time => $heap->{xfer_time},
		total_bytes => $heap->{total_bytes},
		time => time(),
		send_done => $heap->{send_done},
		rest => $heap->{rest},
		file_size => $heap->{file_size},
		file_stat => $heap->{file_stat},
		filename => $heap->{filename},
		file_path => $heap->{file_path},
	});

	unless ($heap->{send_done} == 1) {
		$kernel->delay_set(send_stats => 2);
	}
}

sub start_LIST {
	my ($kernel, $heap, $dirfile) = @_[KERNEL, HEAP, ARG0];
	my $fs = $heap->{filesystem};

	my $out = "";
	foreach ($fs->list_details($dirfile)) {
		$out .= "$_\r\n";
	}

	$heap->{input_fh} = IO::Scalar->new(\$out);
	$heap->{send_done} = 0;
	$heap->{send_recv_okay} = 1;
	$kernel->yield('execute');
}

sub start_NLST {
	my ($kernel, $heap, $dirfile) = @_[KERNEL, HEAP, ARG0];
	my $fs = $heap->{filesystem};

	my $out = "";
	foreach ($fs->list($dirfile)) {
		$out .= "$_\r\n";
	}

	$heap->{input_fh} = IO::Scalar->new(\$out);
	$heap->{send_done} = 0;
	$heap->{send_recv_okay} = 1;
	$kernel->yield('execute');
}

sub start_RETR {
	my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];

	foreach my $f (qw( rest filename )) {
		if (exists($opt->{$f})) {
			$heap->{$f} = $opt->{$f};
		}
	}

	$heap->{file_path} = $heap->{filesystem}->{file_path};
	
	$heap->{input_fh} = $fh;
	$heap->{filesystem}->seek($fh,$heap->{rest},0);

	@{$heap->{file_stat}} = $fh->stat();
	$heap->{file_size} = $heap->{file_stat}[7];

	$heap->{send_done} = 0;
	$heap->{send_recv_okay} = 1;
	$kernel->yield('execute');
}

sub start_STOR {
	my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];

	foreach my $f (qw( rest filename )) {
		if (exists($opt->{$f})) {
			$heap->{$f} = $opt->{$f};
		}
	}
	
	$heap->{file_path} = $heap->{filesystem}->{file_path};
	
	$heap->{output_fh} = $fh;
	$heap->{filesystem}->seek($fh,$heap->{rest},0);
	
	@{$heap->{file_stat}} = $fh->stat();
	# not usefull?
	$heap->{file_size} = $heap->{file_stat}[7];
	
	$heap->{type} = 'ul';
	$heap->{send_recv_okay} = 1;
	$heap->{xfer_time} = time();
	$kernel->yield('execute');
}

sub _stop {
#	my $kernel = $_[KERNEL];
}

# Execute the session's pending upload

sub execute {
	my ($kernel, $heap, $session) =	@_[KERNEL, HEAP, SESSION];

	$kernel->yield("send_stats");
	
	if (defined $heap->{input_fh}) {
		$heap->{xfer_time} = time();
		$kernel->yield('data_send');
	} elsif (!defined $heap->{output_fh}) {
		if ($heap->{listening} == 0) {
			$kernel->call($session->ID => '_drop');
		}
	}
}

sub stop_socket {
	my ($kernel, $session, $heap) =	@_[KERNEL, SESSION, HEAP];

	delete $heap->{time_out};

	if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
		# still a factory?! Time to drop connection
		delete $heap->{data};
	}
}

# Send a block to the remote client

sub data_send {
	my ($kernel, $session, $heap) =	@_[KERNEL, SESSION, HEAP];

	if ( (!defined $heap->{input_fh}) || (! ref $heap->{input_fh} ) ) {
		$kernel->call($session->ID => '_drop');
	} elsif ($heap->{send_recv_okay} && (defined $heap->{data})) {

		# if we haven't connected yet, then data will still be a factory
		if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
			$kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
			if (defined $heap->{time_out}) {
				$heap->{time_out} = $kernel->delay_set(stop_socket => 30);
			}
			$kernel->delay_set('data_send' => 2);
			return;
		}

		if (defined $heap->{time_out}) {
			$kernel->alarm_remove($heap->{time_out});
			delete $heap->{time_out};
		}

		$heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));

		if ($heap->{params}{'DownloadLimit'} > 0) {
			if ($heap->{params}{'LimitSceme'} eq 'ip') {
				if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'dl' => $heap->{remote_ip} => $heap->{bps})) {
					$kernel->yield('data_send');
					return;
				}
			} else {
				if ($heap->{bps} > $heap->{params}{'DownloadLimit'}) {
					$kernel->yield('data_send');
					return;
				}
			}
		}

		### Read in a block from the file.
		my $buf;
		my $len = $heap->{input_fh}->read($buf, $heap->{block_size});

		### If something was read, queue it to be sent, and yield
		### back for another data_send.
		if ($len > 0) {
			$heap->{total_bytes} += $len;
			$heap->{data}->put($buf);
			$kernel->yield('data_send');
		} else {
			# If nothing was read, assume EOF, and shut everything down.
			my $fs = $heap->{filesystem};
			$fs->close_read($heap->{input_fh});
			delete $heap->{input_fh};

			$kernel->call($session->ID => '_drop');
		}
	}
}

# Recieve a block from the remote client

sub data_receive {
	my ($kernel, $heap, $session, $data) = @_[KERNEL, HEAP, SESSION, ARG0];

	if ( (!defined $heap->{output_fh}) || (! ref $heap->{output_fh} ) ) {
		$kernel->call($session->ID => '_drop');
	} elsif ($heap->{send_recv_okay} && (defined $heap->{data})) {

		# if we haven't connected yet, then data will still be a factory
		if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
			$kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
			if (defined $heap->{time_out}) {
				$heap->{time_out} = $kernel->delay_set(stop_socket => 30);
			}
			$kernel->delay_set('data_receive' => 1, $data);
			return;
		}

		if (defined $heap->{time_out}) {
			$kernel->alarm_remove($heap->{time_out});
			delete $heap->{time_out};
		}

		$heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));

		if ($heap->{params}{'UploadLimit'} > 0) {
			if ($heap->{params}{'LimitSceme'} eq 'ip') {
				if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'ul' => $heap->{remote_ip} => $heap->{bps})) {
					$kernel->yield('data_receive');
					$heap->{data}->pause_input();
				} else {
					$heap->{data}->resume_input();
				}
			} else {
				if ($heap->{bps} > $heap->{params}{'UploadLimit'}) {
					$kernel->yield('data_receive');
					$heap->{data}->pause_input();
				} else {
					$heap->{data}->resume_input();
				}
			}
		}

		if (defined $data) {
			$heap->{total_bytes} += length($data);

			$heap->{output_fh}->print($data);
		}
	}
}

sub data_error {
	my ($kernel, $heap, $session, $operation, $errnum, $errstr) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
	my $fs = $heap->{filesystem};

	if ($errnum) {
		$kernel->call($heap->{c_session} => _write_log => 4 => "session with $heap->{remote_ip} : $heap->{port} encountered $operation error $errnum: $errstr");
	} else {
		$kernel->call($heap->{c_session} => _write_log => 4 => "client at $heap->{remote_ip} : $heap->{port} disconnected");
	}

	# either way, stop this session
	if (defined $heap->{output_fh}) {
		$fs->close_write($heap->{output_fh});
		delete $heap->{output_fh};
	}

	if (defined $heap->{input_fh}) {
		$fs->close_read($heap->{input_fh});
		delete $heap->{input_fh};
	}
	
	$heap->{send_done} = 1;
	$kernel->call($session->ID => 'send_stats');
	$kernel->alarm_remove_all();

	delete $heap->{data};
}

sub data_flushed {
	my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
	if ($heap->{send_done} == 1) {
		$kernel->call($session->ID => 'send_stats');
		$kernel->alarm_remove_all();
		$kernel->call($heap->{c_session} => _write_log => 4 => "data flushed, dropping connection");
		delete $heap->{data};
	}
}

sub data_throttle {
	$_[HEAP]->{send_recv_okay} = 0;
}

sub data_resume {
	$_[HEAP]->{send_recv_okay} = 1;
	$_[KERNEL]->yield('data_send');
}

sub _drop {
	my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];

	$kernel->alarm_remove_all();
	
	$heap->{send_done} = 1; # for send_stats, so it doesn't delay again
	
	return unless ($heap->{data});

	if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
		# never connected...
		$kernel->call($heap->{c_session} => _write_log => 4 => "Still a SocketFactory in _drop");
		$kernel->call($heap->{c_session} => _write_log => 3 => "Connection timed out");
		delete $heap->{data};
		return;
	}
	
	# if we are fully flushed, go ahead and disconnect
	if ($heap->{data}->get_driver_out_octets() == 0) {
		$kernel->call($heap->{c_session} => _write_log => 4 => "data finished, dropping connection");
		delete $heap->{data};
	} else {
		# if not, then we set a flag and the flushed event
		# drops the connection
		$heap->{send_done} = 1;
	}
}
1;