# # Copyright (C) 2007-2008 Red Hat, Inc. # Author: Andreas Thienemann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Library General Public License as published by # the Free Software Foundation; version 2 only # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU Library General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Copyright 2004, 2005 Red Hat, Inc. # # AUTHOR: Andreas Thienemann # import sys import struct import IPy import binascii import FlowHandler import pprint from logging import * from common import * import traceback info('NetFlow9 protocol disector loaded') class Netflow9Parser(FlowHandler.AbstractFlowParser): name = 'netflow9' "Flowparser template cache" template_cache = {} "Queue for flowsets without a template" flowset_queue = {} "NetFlow Template structure" # Name, ID, length (default length, can be redefined in a template), special unpack string, type flowtmpl_struct = [ ('IN_BYTES', 1, 4, ''), ('IN_PKTS', 2, 4, ''), ('FLOWS', 3, 4, ''), ('PROTOCOL', 4, 1, ''), ('TOS', 5, 1, 'c'), ('TCP_FLAGS', 6, 1, 'c'), ('L4_SRC_PORT', 7, 2, ''), ('IPV4_SRC_ADDR', 8, 4, '', 'ipv4address'), ('SRC_MASK', 9, 1, ''), ('INPUT_SNMP', 10, 2, ''), ('L4_DST_PORT', 11, 2, ''), ('IPV4_DST_ADDR', 12, 4, '', 'ipv4address'), ('DST_MASK', 13, 1, ''), ('OUTPUT_SNMP', 14, 2, ''), ('IPV4_NEXT_HOP', 15, 4, '', 'ipv4address'), ('SRC_AS', 16, 2, ''), ('DST_AS', 17, 2, ''), ('BGP_IPV4_NEXT_HOP', 18, 4, '', 'ipv4address'), ('MUL_DST_PKTS', 19, 4, ''), ('MUL_DST_BYTES', 20, 4, ''), ('LAST_SWITCHED', 21, 4, ''), ('FIRST_SWITCHED', 22, 4, ''), ('OUT_BYTES', 23, 4, ''), ('OUT_PKTS', 24, 4, ''), ('IPV6_SRC_ADDR', 27, 16, ''), ('IPV6_DST_ADDR', 28, 16, ''), ('IPV6_SRC_MASK', 29, 1, ''), ('IPV6_DST_MASK', 30, 1, ''), ('IPV6_FLOW_LABEL', 31, 3, ''), ('ICMP_TYPE', 32, 2, ''), ('MUL_IGMP_TYPE', 33, 1, ''), ('SAMPLING_INTERVAL', 34, 4, ''), ('SAMPLING_ALGORITHM', 35, 1, ''), ('FLOW_ACTIVE_TIMEOUT', 36, 2, ''), ('FLOW_INACTIVE_TIMEOUT', 37, 2, ''), ('ENGINE_TYPE', 38, 1, ''), ('ENGINE_ID', 39, 1, ''), ('TOTAL_BYTES_EXP', 40, 4, ''), ('TOTAL_PKTS_EXP', 41, 4, ''), ('TOTAL_FLOWS_EXP', 42, 4, ''), ('VENDOR_PROPRIETARY_1', 43, 0, ''), ('MPLS_TOP_LABEL_TYPE', 46, 1, ''), ('MPLS_TOP_LABEL_IP_ADDR', 47, 4, ''), ('FLOW_SAMPLER_ID', 48, 1, ''), ('FLOW_SAMPLER_MODE', 49, 1, ''), ('FLOW_SAMPLER_RANDOM_INTERVAL', 50, 4, ''), ('VENDOR_PROPRIETARY_2', 51, 0, ''), ('DST_TOS', 55, 1, ''), ('SRC_MAC', 56, 6, ''), ('DST_MAC', 57, 6, ''), ('SRC_VLAN', 58, 2, ''), ('DST_VLAN', 59, 2, ''), ('IP_PROTOCOL_VERSION', 60, 1, ''), ('DIRECTION', 61, 1, ''), ('IPV6_NEXT_HOP', 62, 16, ''), ('BGP_IPV6_NEXT_HOP', 63, 16, ''), ('IPV6_OPTION_HEADERS', 64, 4, ''), ('VENDOR_PROPRIETARY_3', 65, 0, ''), ('VENDOR_PROPRIETARY_4', 66, 0, ''), ('VENDOR_PROPRIETARY_5', 67, 0, ''), ('VENDOR_PROPRIETARY_6', 68, 0, ''), ('VENDOR_PROPRIETARY_7', 69, 0, ''), ('MPLS_LABEL_1', 70, 3, ''), ('MPLS_LABEL_2', 71, 3, ''), ('MPLS_LABEL_3', 72, 3, ''), ('MPLS_LABEL_4', 73, 3, ''), ('MPLS_LABEL_5', 74, 3, ''), ('MPLS_LABEL_6', 75, 3, ''), ('MPLS_LABEL_7', 76, 3, ''), ('MPLS_LABEL_8', 77, 3, ''), ('MPLS_LABEL_9', 78, 3, ''), ('MPLS_LABEL_10', 79, 3, '') ] # Length to unpack()-format lookup table flowtmpl_struct_len = { 0 : '', 1 : 'B', 2 : 'H', 4 : 'I' } # Build some dictionaries for faster lookups flowtmpl_struct_id2name = {} flowtmpl_struct_name2id = {} flowtmpl_struct_name2eval = {} flowtmpl_struct_id2pos = {} flowtmpl_struct_name2pos = {} for i in range(0, len(flowtmpl_struct)): tupel = flowtmpl_struct[i] flowtmpl_struct_id2name[tupel[1]] = tupel[0] flowtmpl_struct_name2id[tupel[0]] = tupel[1] flowtmpl_struct_name2eval[tupel[0]] = tupel[3] flowtmpl_struct_id2pos[tupel[1]] = i flowtmpl_struct_name2pos[tupel[0]] = i # Iterate through the flows in the FlowSet and decode all flows included in a FlowSet # Can either save the data in the flowcache or return the data as the return value def flowset_iterate(self, flow_hdr, flowset_hdr, flowset_count, flowset_data, return_data=False): debug('flowset_iterate called: %s, %s, %s, %s' % (str(flow_hdr), str(flowset_hdr), str(flowset_count), str(len(flowset_data)))) i = 0 j = 0 h = 4 # Offset for the header # temporary decode storage, to be passed via return if return_data == True: tmp_decode = [] flowset_decode = [] debug('flowset header: %s' % (str(flowset_hdr))) # Walk the flows included in the passed flowset j = h i = h while j < flowset_hdr[1]: flow_decode = {} unpack_string = '!' unpack_tmpl = [] # Iterate through the template and interprete the data accordingly for tmpl_record in self.template_cache[self.flowsrc][flowset_hdr[0]]: rec_id = tmpl_record[0] if rec_id in self.flowtmpl_struct_id2name: rec_name = self.flowtmpl_struct_id2name[tmpl_record[0]] else: debug('No key "%i" found for tmpl_record id' % rec_id) continue rec_len = tmpl_record[1] unpack_tmpl.append((rec_name, rec_len)) rec_data = flowset_data[i:i+rec_len] rec_data_len = len(rec_data) if rec_len is not rec_data_len: warning('Announced length is not the same as the actual length. PROBLEM!') trace = '' rec_value = '' # Build the unpack string unpack_string = unpack_string + self.flowtmpl_struct_len[rec_data_len] i = i + rec_len flow_data = flowset_data[j:i] debug('flow_data: %i, %i, %s, %s, %s' % (j, i, str(unpack_tmpl), str(unpack_string), toHex(flow_data))) j = i # Try to decode the data decode = [] try: debug('decode: %s, %i' % (str(unpack_string), len(flow_data))) decode = struct.unpack(unpack_string, flow_data) except: trace = traceback.format_exc() print trace warning(trace) # Build the lookup dictionary of the decoded flowset # pprint.pprint(self.template_cache) for seq in range(0, len(self.template_cache[self.flowsrc][flowset_hdr[0]])): a = self.flowtmpl_struct_id2name[self.template_cache[self.flowsrc][flowset_hdr[0]][seq][0]] b = decode[seq] # Do we need some special transformation? if len(self.flowtmpl_struct[self.flowtmpl_struct_name2pos[a]]) > 4: if self.flowtmpl_struct[self.flowtmpl_struct_name2pos[a]][4] == 'ipv4address': b = IPy.IP(b).strNormal() flow_decode[a] = b del decode flowset_decode.append(flow_decode) debug('flow from %s decoded: %s' % (self.flowsrc, str(flow_decode))) # Should we return the data or write it out the the flow_decode storage? if return_data == False: self.flowpkg_decode.append(flowset_decode) else: tmp_decode.append(flowset_decode) del flowset_decode if return_data == True: return tmp_decode # Main function to parse the NetFlow9 data paket def parse(self): if self.flowsrc not in self.template_cache: # Create template dictionary for this FlowProbe self.template_cache[self.flowsrc] = {} # The flow header # Version Number (short), Count (short), sysUpTime (int), # UNIX Secs (int), Sequence Number (int), Source ID (int) flow_hdr = struct.unpack('!HHIIII', self.pkgdata[0:20]) debug('Flow Header: (Version, Count, sysUpTime, Unixtime, Sequence No, Source Id) %s' % str(flow_hdr)) # SanityCheck, see if the first two bytes in the packet are 09, our netflow version if flow_hdr[0] != 9: warning("Not a NetFlow v9 flow") return False self.flowpkg_decode.append(dict(zip(('version', 'count', 'uptime', 'unixtime', 'sequence', 'sourceID'), flow_hdr))) self.flowpkg_decode[0]['sourceIP'] = self.flowsrc # The flow data flowdata = self.pkgdata[20:] flowlen = len(flowdata) # The flowset header pos = 0 flowset_count = 0 # Loop through the flowsets if we have more then one per packet while (pos < flowlen): flowset_count = flowset_count + 1 flowset_hdr = struct.unpack('!HH', flowdata[pos:pos+4]) flowset_data = flowdata[pos:pos+flowset_hdr[1]] # We have a template, add it to the template object overwriting any existing ones with the same ID if flowset_hdr[0] == 0: debug("template received from %s" % str(self.flowsrc)) flowtmpl_hdr = struct.unpack('!HH', flowdata[pos+4:pos+8]) self.template_cache[self.flowsrc][flowtmpl_hdr[0]] = [] # Parse the template for i in range(8, flowset_hdr[1], 4): decode = struct.unpack('!HH', flowset_data[i:i+4]) # Do we claim to export more data/longer data then we expect according to the RFC? if decode[1] is not self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2] and self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2] is not 0: warning('Length mismatch: router: %s, tmpl id: %i, record %s, expected %i, actual %i' %(self.flowsrc, flowtmpl_hdr[0], self.flowtmpl_struct_id2name[decode[0]], self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2], decode[1])) self.template_cache[self.flowsrc][flowtmpl_hdr[0]].append(decode) # Check if we have any flowsets in the queue we have to parse if self.flowsrc in self.flowset_queue and flowtmpl_hdr[0] in self.flowset_queue[self.flowsrc]: debug("Queue match from %s for template %i" %(self.flowsrc, flowtmpl_hdr[0])) if len(self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]) > 0: # Build the data and pass it to the writeout function if self.flowsrc in self.flowset_queue: while len(self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]) > 0: queue_flowset_decode = [] queue_flow_hdr, queue_flowset_hdr, queue_flowset_count, queue_flowset_data = self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]].pop(0) queue_flowset_decode.append(dict(zip(('version', 'count', 'uptime', 'unixtime', 'sequence', 'sourceID'), queue_flow_hdr))) queue_flowset_decode[0]['sourceIP'] = self.flowsrc debug('queue: queue_flowset_data_len: %i' % len(queue_flowset_data)) queue_flowset_decode.append(self.flowset_iterate(queue_flow_hdr, queue_flowset_hdr, queue_flowset_count, queue_flowset_data, True)) self.writeout(queue_flowset_decode) # Delete the now empty queue, we don't need it anymore # del self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]] if len(self.flowset_queue[self.flowsrc]) == 0: del self.flowset_queue[self.flowsrc] # We're not a template else: # Check if we already have the necessary template stored if self.flowsrc in self.template_cache and flowset_hdr[0] in self.template_cache[self.flowsrc]: debug("Template known") self.flowset_iterate(flow_hdr, flowset_hdr, flowset_count, flowset_data) else: debug("No Template found from %s, queuing flow" % self.flowsrc) # Initialize queue if self.flowsrc not in self.flowset_queue: self.flowset_queue[self.flowsrc] = {} if flowset_hdr[0] not in self.flowset_queue[self.flowsrc]: self.flowset_queue[self.flowsrc][flowset_hdr[0]] = [] # Add the flowset and necessary headers to the queue for later parsing self.flowset_queue[self.flowsrc][flowset_hdr[0]].append((flow_hdr, flowset_hdr, flowset_count, flowset_data)) pos = flowset_hdr[1] + pos