| Net-Netfilter-NetFlow documentation | Contained in the Net-Netfilter-NetFlow distribution. |
Oliver Gorwits <oliver.gorwits@oucs.ox.ac.uk>
Copyright (c) The University of Oxford 2009.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
| Net-Netfilter-NetFlow documentation | Contained in the Net-Netfilter-NetFlow distribution. |
package Net::Netfilter::NetFlow::Process; use strict; use warnings FATAL => 'all'; use base 'Exporter'; our @EXPORT = qw( conntrack_init ct2ft ptee ); use POSIX; # core use Time::HiRes 'gettimeofday'; # core use IPC::Run 'run'; use Log::Dispatch::Config; use Log::Dispatch::Configurator::Any; use Net::Netfilter::NetFlow::Utils; use Net::Netfilter::NetFlow::ConntrackFormat; # poke conntrack kernel hooks into waking up (bug?) sub conntrack_init { my $config = shift; my $conntrack = can_run($config->{conntrack}->{progname}) or die "Failed to find a local copy of conntrack in the path\n"; run [$conntrack, format_args($config->{conntrack}, 'init_')], '>', '/dev/null', '2>&1'; } # convert the conntrack output to flow-tools CSV input format sub ct2ft { my $config = shift; my $got_alrm = 0; my $tracker = {}; # respond to SIGALRM (thanks go to perlipc man page) my $alrm_handler = sub { ++$got_alrm }; # POSIX unmasks the sigprocmask properly my $action = POSIX::SigAction->new($alrm_handler); POSIX::sigaction(&POSIX::SIGALRM, $action); my $ttl = $config->{ct2ft}->{ttl} || 60 * 60 * 24 * 7; # seven days; alarm $ttl; # XXX alarm will not fire until we have input to process while (<>) { # pruge tracked connections older than TTL seconds if ($got_alrm) { alarm 0; foreach my $p (keys %{$tracker}) { foreach my $k (keys %{$tracker->{$p}}) { delete $tracker->{$p}->{$k} if $tracker->{$p}->{$k} < ($^T - $ttl); } } $got_alrm = 0; alarm $ttl; } chomp; s/[^\s\d.A-Z]//g; next if m/^\s+$/; my $line = $_; my @fields = split /\s+/, $line; next unless scalar @fields > 12; next unless $fields[1] =~ m/^(NEW|DESTROY)$/; my $mode = $1; next unless $fields[2] =~ m/^(1|6|17)$/; my $proto = $1; next if $proto == 1 and (($fields[5] ne '8') and ($fields[6] ne '8')); # only interested in ECHO if ($mode eq 'NEW') { my $key = join ',', @fields[ @{$ct_new_key{$proto}} ]; $tracker->{$proto}->{$key} = $fields[0]; next; } my $key = join ',', @fields[ @{$ct_destroy_key{$proto}} ]; next unless exists $tracker->{$proto}->{$key}; my ($start_secs, $start_micsecs) = split /\./, $tracker->{$proto}->{$key}; my ($end_secs, $end_micsecs) = split /\./, $fields[0]; #Â secs and nanosecs (^9) since 1970 my ($unix_secs, $micsecs) = gettimeofday; my $unix_nsecs = $micsecs * 1_000; #Â millisecs (^3) since "boot" my $sysuptime = (($unix_secs - $^T) * 1_000) + int ($micsecs / 1_000); #Â flow start/end in millisecs since "boot" my $first = (($start_secs - $^T) * 1_000) + int ($start_micsecs / 1_000); my $last = (($end_secs - $^T) * 1_000) + int ($end_micsecs / 1_000); for my $dir (qw( private_src public_src dst )) { my ($dpkts, $doctets, $srcaddr, $dstaddr, $srcport, $dstport) = @fields[ @{$ct_mask_fields{$proto}{$dir}} ]; print join ',', $unix_secs, $unix_nsecs, $sysuptime, $config->{flow_send}->{args}->[0] || '127.0.0.1', $dpkts, $doctets, $first, $last, $srcaddr, $dstaddr, '0.0.0.0', # NEXTHOP 0, # INPUT (SNMP idx) 0, # OUTPUT (SNMP idx) $srcport || 0, # might be ICMP $dstport || 0, # might be ICMP $proto, 0, # TOS 0; # TCP_FLAGS print "\n"; } } # while (<>) } # set up output tee to local syslog, and next process in pipe sub ptee { my $config = shift; Log::Dispatch::Config->configure_and_watch( Log::Dispatch::Configurator::Any->new($config->{ptee}->{conf}) ); my $dispatcher = Log::Dispatch::Config->instance; while (<>) { $dispatcher->notice($_); } } __END__