root/trunk/FlowParser/NetFlow9.py

Revision 1, 13.6 kB (checked in by ixs, 16 years ago)

initial checkin of RH revision

  • Property svn:executable set to *
Line 
1 #
2 # Copyright (C) 2007-2008 Red Hat, Inc.
3 # Author: Andreas Thienemann <athienem@redhat.com>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU Library General Public License as published by
7 # the Free Software Foundation; version 2 only
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Library General Public License for more details.
13 #
14 # You should have received a copy of the GNU Library General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 # Copyright 2004, 2005 Red Hat, Inc.
18 #
19 # AUTHOR: Andreas Thienemann <athienem@redhat.com>
20 #
21
22 import sys
23 import struct
24 import IPy
25 import binascii
26 import FlowHandler
27 import pprint
28 from logging import *
29 from common import *
30 import traceback
31
32 info('NetFlow9 protocol disector loaded')
33
34 class Netflow9Parser(FlowHandler.AbstractFlowParser):
35     name = 'netflow9'
36
37     "Flowparser template cache"
38     template_cache = {}
39
40     "Queue for flowsets without a template"
41     flowset_queue = {}
42
43     "NetFlow Template structure"
44     # Name, ID, length (default length, can be redefined in a template), special unpack string, type
45     flowtmpl_struct = [
46         ('IN_BYTES', 1, 4, ''),
47         ('IN_PKTS', 2, 4, ''),
48         ('FLOWS', 3, 4, ''),
49         ('PROTOCOL', 4, 1, ''),
50         ('TOS', 5, 1, 'c'),
51         ('TCP_FLAGS', 6, 1, 'c'),
52         ('L4_SRC_PORT', 7, 2, ''),
53         ('IPV4_SRC_ADDR', 8, 4, '', 'ipv4address'),
54         ('SRC_MASK', 9, 1, ''),
55         ('INPUT_SNMP', 10, 2, ''),
56         ('L4_DST_PORT', 11, 2, ''),
57         ('IPV4_DST_ADDR', 12, 4, '', 'ipv4address'),
58         ('DST_MASK', 13, 1, ''),
59         ('OUTPUT_SNMP', 14, 2, ''),
60         ('IPV4_NEXT_HOP', 15, 4, '', 'ipv4address'),
61         ('SRC_AS', 16, 2, ''),
62         ('DST_AS', 17, 2, ''),
63         ('BGP_IPV4_NEXT_HOP', 18, 4, '', 'ipv4address'),
64         ('MUL_DST_PKTS', 19, 4, ''),
65         ('MUL_DST_BYTES', 20, 4, ''),
66         ('LAST_SWITCHED', 21, 4, ''),
67         ('FIRST_SWITCHED', 22, 4, ''),
68         ('OUT_BYTES', 23, 4, ''),
69         ('OUT_PKTS', 24, 4, ''),
70         ('IPV6_SRC_ADDR', 27, 16, ''),
71         ('IPV6_DST_ADDR', 28, 16, ''),
72         ('IPV6_SRC_MASK', 29, 1, ''),
73         ('IPV6_DST_MASK', 30, 1, ''),
74         ('IPV6_FLOW_LABEL', 31, 3, ''),
75         ('ICMP_TYPE', 32, 2, ''),
76         ('MUL_IGMP_TYPE', 33, 1, ''),
77         ('SAMPLING_INTERVAL', 34, 4, ''),
78         ('SAMPLING_ALGORITHM', 35, 1, ''),
79         ('FLOW_ACTIVE_TIMEOUT', 36, 2, ''),
80         ('FLOW_INACTIVE_TIMEOUT', 37, 2, ''),
81         ('ENGINE_TYPE', 38, 1, ''),
82         ('ENGINE_ID', 39, 1, ''),
83         ('TOTAL_BYTES_EXP', 40, 4, ''),
84         ('TOTAL_PKTS_EXP', 41, 4, ''),
85         ('TOTAL_FLOWS_EXP', 42, 4, ''),
86         ('VENDOR_PROPRIETARY_1', 43, 0, ''),
87         ('MPLS_TOP_LABEL_TYPE', 46, 1, ''),
88         ('MPLS_TOP_LABEL_IP_ADDR', 47, 4, ''),
89         ('FLOW_SAMPLER_ID', 48, 1, ''),
90         ('FLOW_SAMPLER_MODE', 49, 1, ''),
91         ('FLOW_SAMPLER_RANDOM_INTERVAL', 50, 4, ''),
92         ('VENDOR_PROPRIETARY_2', 51, 0, ''),
93         ('DST_TOS', 55, 1, ''),
94         ('SRC_MAC', 56, 6, ''),
95         ('DST_MAC', 57, 6, ''),
96         ('SRC_VLAN', 58, 2, ''),
97         ('DST_VLAN', 59, 2, ''),
98         ('IP_PROTOCOL_VERSION', 60, 1, ''),
99         ('DIRECTION', 61, 1, ''),
100         ('IPV6_NEXT_HOP', 62, 16, ''),
101         ('BGP_IPV6_NEXT_HOP', 63, 16, ''),
102         ('IPV6_OPTION_HEADERS', 64, 4, ''),
103         ('VENDOR_PROPRIETARY_3', 65, 0, ''),
104         ('VENDOR_PROPRIETARY_4', 66, 0, ''),
105         ('VENDOR_PROPRIETARY_5', 67, 0, ''),
106         ('VENDOR_PROPRIETARY_6', 68, 0, ''),
107         ('VENDOR_PROPRIETARY_7', 69, 0, ''),
108         ('MPLS_LABEL_1', 70, 3, ''),
109         ('MPLS_LABEL_2', 71, 3, ''),
110         ('MPLS_LABEL_3', 72, 3, ''),
111         ('MPLS_LABEL_4', 73, 3, ''),
112         ('MPLS_LABEL_5', 74, 3, ''),
113         ('MPLS_LABEL_6', 75, 3, ''),
114         ('MPLS_LABEL_7', 76, 3, ''),
115         ('MPLS_LABEL_8', 77, 3, ''),
116         ('MPLS_LABEL_9', 78, 3, ''),
117         ('MPLS_LABEL_10', 79, 3, '')
118     ]
119
120     # Length to unpack()-format lookup table
121     flowtmpl_struct_len = {
122         0 : '',
123         1 : 'B',
124         2 : 'H',
125         4 : 'I'
126     }
127
128     # Build some dictionaries for faster lookups
129     flowtmpl_struct_id2name = {}
130     flowtmpl_struct_name2id = {}
131     flowtmpl_struct_name2eval = {}
132     flowtmpl_struct_id2pos = {}
133     flowtmpl_struct_name2pos = {}
134     for i in range(0, len(flowtmpl_struct)):
135         tupel = flowtmpl_struct[i]
136         flowtmpl_struct_id2name[tupel[1]] = tupel[0]
137         flowtmpl_struct_name2id[tupel[0]] = tupel[1]
138         flowtmpl_struct_name2eval[tupel[0]] = tupel[3]
139         flowtmpl_struct_id2pos[tupel[1]] = i
140         flowtmpl_struct_name2pos[tupel[0]] = i
141
142
143     # Iterate through the flows in the FlowSet and decode all flows included in a FlowSet
144     # Can either save the data in the flowcache or return the data as the return value
145     def flowset_iterate(self, flow_hdr, flowset_hdr, flowset_count, flowset_data, return_data=False):
146
147         debug('flowset_iterate called: %s, %s, %s, %s' % (str(flow_hdr), str(flowset_hdr), str(flowset_count), str(len(flowset_data))))
148
149         i = 0
150         j = 0
151         h = 4 # Offset for the header
152
153         # temporary decode storage, to be passed via return
154         if return_data == True:
155             tmp_decode = []
156
157         flowset_decode = []
158
159         debug('flowset header: %s' % (str(flowset_hdr)))
160
161         # Walk the flows included in the passed flowset
162         j = h
163         i = h
164         while j < flowset_hdr[1]:
165
166             flow_decode = {}
167
168             unpack_string = '!'
169             unpack_tmpl = []
170
171             # Iterate through the template and interprete the data accordingly
172             for tmpl_record in self.template_cache[self.flowsrc][flowset_hdr[0]]:
173                 rec_id = tmpl_record[0]
174                 if rec_id in self.flowtmpl_struct_id2name:
175                     rec_name = self.flowtmpl_struct_id2name[tmpl_record[0]]
176                 else:
177                     debug('No key "%i" found for tmpl_record id' % rec_id)
178                     continue
179                 rec_len = tmpl_record[1]
180                 unpack_tmpl.append((rec_name, rec_len))
181                 rec_data = flowset_data[i:i+rec_len]
182                 rec_data_len = len(rec_data)
183                 if rec_len is not rec_data_len:
184                     warning('Announced length is not the same as the actual length. PROBLEM!')
185
186                 trace = ''
187                 rec_value = ''
188
189                 # Build the unpack string
190                 unpack_string = unpack_string + self.flowtmpl_struct_len[rec_data_len]
191                 i = i + rec_len
192
193             flow_data = flowset_data[j:i]
194             debug('flow_data: %i, %i, %s, %s, %s' % (j, i, str(unpack_tmpl), str(unpack_string), toHex(flow_data)))
195             j = i
196
197             # Try to decode the data
198             decode = []
199             try:
200                 debug('decode: %s, %i' % (str(unpack_string), len(flow_data)))
201                 decode = struct.unpack(unpack_string, flow_data)
202             except:
203                 trace = traceback.format_exc()
204                 print trace
205                 warning(trace)
206
207        
208             # Build the lookup dictionary of the decoded flowset
209 #            pprint.pprint(self.template_cache)
210             for seq in range(0, len(self.template_cache[self.flowsrc][flowset_hdr[0]])):
211                 a = self.flowtmpl_struct_id2name[self.template_cache[self.flowsrc][flowset_hdr[0]][seq][0]]
212                 b = decode[seq]
213
214                 # Do we need some special transformation?
215                 if len(self.flowtmpl_struct[self.flowtmpl_struct_name2pos[a]]) > 4:
216                     if self.flowtmpl_struct[self.flowtmpl_struct_name2pos[a]][4] == 'ipv4address':
217                         b = IPy.IP(b).strNormal()
218
219                 flow_decode[a] = b
220
221             del decode
222             flowset_decode.append(flow_decode)
223             debug('flow from %s decoded: %s' % (self.flowsrc, str(flow_decode)))
224
225         # Should we return the data or write it out the the flow_decode storage?
226         if return_data == False:
227            self.flowpkg_decode.append(flowset_decode)
228         else:
229            tmp_decode.append(flowset_decode)
230         del flowset_decode
231                
232         if return_data == True:
233             return tmp_decode
234
235     # Main function to parse the NetFlow9 data paket
236     def parse(self):
237         if self.flowsrc not in self.template_cache:
238             # Create template dictionary for this FlowProbe
239             self.template_cache[self.flowsrc] = {}
240
241         # The flow header
242         # Version Number (short), Count (short), sysUpTime (int),
243         # UNIX Secs (int), Sequence Number (int), Source ID (int)
244         flow_hdr = struct.unpack('!HHIIII', self.pkgdata[0:20])
245
246         debug('Flow Header: (Version, Count, sysUpTime, Unixtime, Sequence No, Source Id) %s' % str(flow_hdr))
247
248         # SanityCheck, see if the first two bytes in the packet are 09, our netflow version
249         if flow_hdr[0] != 9:
250             warning("Not a NetFlow v9 flow")
251             return False
252
253         self.flowpkg_decode.append(dict(zip(('version', 'count', 'uptime', 'unixtime', 'sequence', 'sourceID'), flow_hdr)))
254         self.flowpkg_decode[0]['sourceIP'] = self.flowsrc
255        
256         # The flow data
257         flowdata = self.pkgdata[20:]
258         flowlen = len(flowdata)
259
260         # The flowset header
261         pos = 0
262         flowset_count = 0
263
264         # Loop through the flowsets if we have more then one per packet
265         while (pos < flowlen):
266             flowset_count = flowset_count + 1
267             flowset_hdr = struct.unpack('!HH', flowdata[pos:pos+4])
268             flowset_data = flowdata[pos:pos+flowset_hdr[1]]
269
270             # We have a template, add it to the template object overwriting any existing ones with the same ID
271             if flowset_hdr[0] == 0:
272                 debug("template received from %s" % str(self.flowsrc))
273                 flowtmpl_hdr = struct.unpack('!HH', flowdata[pos+4:pos+8])
274                 self.template_cache[self.flowsrc][flowtmpl_hdr[0]] = []
275
276                 # Parse the template
277                 for i in range(8, flowset_hdr[1], 4):
278                      decode = struct.unpack('!HH', flowset_data[i:i+4])
279                      # Do we claim to export more data/longer data then we expect according to the RFC?
280                      if decode[1] is not self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2] and self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2] is not 0:
281                          warning('Length mismatch: router: %s, tmpl id: %i, record %s, expected %i, actual %i'
282                              %(self.flowsrc, flowtmpl_hdr[0], self.flowtmpl_struct_id2name[decode[0]],
283                                 self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2], decode[1]))
284                      self.template_cache[self.flowsrc][flowtmpl_hdr[0]].append(decode)
285
286                 # Check if we have any flowsets in the queue we have to parse
287                 if self.flowsrc in self.flowset_queue and flowtmpl_hdr[0] in self.flowset_queue[self.flowsrc]:
288                      debug("Queue match from %s for template %i" %(self.flowsrc, flowtmpl_hdr[0]))
289                      if len(self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]) > 0:
290
291                           # Build the data and pass it to the writeout function
292                           if self.flowsrc in self.flowset_queue:
293                               while len(self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]) > 0:
294                                   queue_flowset_decode = []
295                                   queue_flow_hdr, queue_flowset_hdr, queue_flowset_count, queue_flowset_data = self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]].pop(0)
296                                   queue_flowset_decode.append(dict(zip(('version', 'count', 'uptime', 'unixtime', 'sequence', 'sourceID'), queue_flow_hdr)))
297                                   queue_flowset_decode[0]['sourceIP'] = self.flowsrc
298                                   debug('queue: queue_flowset_data_len: %i' % len(queue_flowset_data))
299                                   queue_flowset_decode.append(self.flowset_iterate(queue_flow_hdr, queue_flowset_hdr, queue_flowset_count, queue_flowset_data, True))
300                                   self.writeout(queue_flowset_decode)
301
302                           # Delete the now empty queue, we don't need it anymore
303 #                          del self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]
304                           if len(self.flowset_queue[self.flowsrc]) == 0:
305                              del self.flowset_queue[self.flowsrc]
306
307             # We're not a template
308             else:
309                 # Check if we already have the necessary template stored
310                 if self.flowsrc in self.template_cache and flowset_hdr[0] in self.template_cache[self.flowsrc]:
311                     debug("Template known")
312                     self.flowset_iterate(flow_hdr, flowset_hdr, flowset_count, flowset_data)
313
314                 else:
315                     debug("No Template found from %s, queuing flow" % self.flowsrc)
316                     # Initialize queue
317                     if self.flowsrc not in self.flowset_queue:
318                         self.flowset_queue[self.flowsrc] = {}
319                     if flowset_hdr[0] not in self.flowset_queue[self.flowsrc]:
320                         self.flowset_queue[self.flowsrc][flowset_hdr[0]] = []
321
322                     # Add the flowset and necessary headers to the queue for later parsing
323                     self.flowset_queue[self.flowsrc][flowset_hdr[0]].append((flow_hdr, flowset_hdr, flowset_count, flowset_data))
324
325             pos = flowset_hdr[1] + pos
Note: See TracBrowser for help on using the browser.