1 |
# |
---|
2 |
# Copyright (C) 2007-2008 Red Hat, Inc. |
---|
3 |
# Author: Andreas Thienemann <athienem@redhat.com> |
---|
4 |
# |
---|
5 |
# This program is free software; you can redistribute it and/or modify |
---|
6 |
# it under the terms of the GNU Library General Public License as published by |
---|
7 |
# the Free Software Foundation; version 2 only |
---|
8 |
# |
---|
9 |
# This program is distributed in the hope that it will be useful, |
---|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
---|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
---|
12 |
# GNU Library General Public License for more details. |
---|
13 |
# |
---|
14 |
# You should have received a copy of the GNU Library General Public License |
---|
15 |
# along with this program; if not, write to the Free Software |
---|
16 |
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
---|
17 |
# Copyright 2004, 2005 Red Hat, Inc. |
---|
18 |
# |
---|
19 |
# AUTHOR: Andreas Thienemann <athienem@redhat.com> |
---|
20 |
# |
---|
21 |
|
---|
22 |
import sys |
---|
23 |
import struct |
---|
24 |
import IPy |
---|
25 |
import binascii |
---|
26 |
import FlowHandler |
---|
27 |
import pprint |
---|
28 |
from logging import * |
---|
29 |
from common import * |
---|
30 |
import traceback |
---|
31 |
|
---|
32 |
info('NetFlow9 protocol disector loaded') |
---|
33 |
|
---|
34 |
class Netflow9Parser(FlowHandler.AbstractFlowParser): |
---|
35 |
name = 'netflow9' |
---|
36 |
|
---|
37 |
"Flowparser template cache" |
---|
38 |
template_cache = {} |
---|
39 |
|
---|
40 |
"Queue for flowsets without a template" |
---|
41 |
flowset_queue = {} |
---|
42 |
|
---|
43 |
"NetFlow Template structure" |
---|
44 |
# Name, ID, length (default length, can be redefined in a template), special unpack string, type |
---|
45 |
flowtmpl_struct = [ |
---|
46 |
('IN_BYTES', 1, 4, ''), |
---|
47 |
('IN_PKTS', 2, 4, ''), |
---|
48 |
('FLOWS', 3, 4, ''), |
---|
49 |
('PROTOCOL', 4, 1, ''), |
---|
50 |
('TOS', 5, 1, 'c'), |
---|
51 |
('TCP_FLAGS', 6, 1, 'c'), |
---|
52 |
('L4_SRC_PORT', 7, 2, ''), |
---|
53 |
('IPV4_SRC_ADDR', 8, 4, '', 'ipv4address'), |
---|
54 |
('SRC_MASK', 9, 1, ''), |
---|
55 |
('INPUT_SNMP', 10, 2, ''), |
---|
56 |
('L4_DST_PORT', 11, 2, ''), |
---|
57 |
('IPV4_DST_ADDR', 12, 4, '', 'ipv4address'), |
---|
58 |
('DST_MASK', 13, 1, ''), |
---|
59 |
('OUTPUT_SNMP', 14, 2, ''), |
---|
60 |
('IPV4_NEXT_HOP', 15, 4, '', 'ipv4address'), |
---|
61 |
('SRC_AS', 16, 2, ''), |
---|
62 |
('DST_AS', 17, 2, ''), |
---|
63 |
('BGP_IPV4_NEXT_HOP', 18, 4, '', 'ipv4address'), |
---|
64 |
('MUL_DST_PKTS', 19, 4, ''), |
---|
65 |
('MUL_DST_BYTES', 20, 4, ''), |
---|
66 |
('LAST_SWITCHED', 21, 4, ''), |
---|
67 |
('FIRST_SWITCHED', 22, 4, ''), |
---|
68 |
('OUT_BYTES', 23, 4, ''), |
---|
69 |
('OUT_PKTS', 24, 4, ''), |
---|
70 |
('IPV6_SRC_ADDR', 27, 16, ''), |
---|
71 |
('IPV6_DST_ADDR', 28, 16, ''), |
---|
72 |
('IPV6_SRC_MASK', 29, 1, ''), |
---|
73 |
('IPV6_DST_MASK', 30, 1, ''), |
---|
74 |
('IPV6_FLOW_LABEL', 31, 3, ''), |
---|
75 |
('ICMP_TYPE', 32, 2, ''), |
---|
76 |
('MUL_IGMP_TYPE', 33, 1, ''), |
---|
77 |
('SAMPLING_INTERVAL', 34, 4, ''), |
---|
78 |
('SAMPLING_ALGORITHM', 35, 1, ''), |
---|
79 |
('FLOW_ACTIVE_TIMEOUT', 36, 2, ''), |
---|
80 |
('FLOW_INACTIVE_TIMEOUT', 37, 2, ''), |
---|
81 |
('ENGINE_TYPE', 38, 1, ''), |
---|
82 |
('ENGINE_ID', 39, 1, ''), |
---|
83 |
('TOTAL_BYTES_EXP', 40, 4, ''), |
---|
84 |
('TOTAL_PKTS_EXP', 41, 4, ''), |
---|
85 |
('TOTAL_FLOWS_EXP', 42, 4, ''), |
---|
86 |
('VENDOR_PROPRIETARY_1', 43, 0, ''), |
---|
87 |
('MPLS_TOP_LABEL_TYPE', 46, 1, ''), |
---|
88 |
('MPLS_TOP_LABEL_IP_ADDR', 47, 4, ''), |
---|
89 |
('FLOW_SAMPLER_ID', 48, 1, ''), |
---|
90 |
('FLOW_SAMPLER_MODE', 49, 1, ''), |
---|
91 |
('FLOW_SAMPLER_RANDOM_INTERVAL', 50, 4, ''), |
---|
92 |
('VENDOR_PROPRIETARY_2', 51, 0, ''), |
---|
93 |
('DST_TOS', 55, 1, ''), |
---|
94 |
('SRC_MAC', 56, 6, ''), |
---|
95 |
('DST_MAC', 57, 6, ''), |
---|
96 |
('SRC_VLAN', 58, 2, ''), |
---|
97 |
('DST_VLAN', 59, 2, ''), |
---|
98 |
('IP_PROTOCOL_VERSION', 60, 1, ''), |
---|
99 |
('DIRECTION', 61, 1, ''), |
---|
100 |
('IPV6_NEXT_HOP', 62, 16, ''), |
---|
101 |
('BGP_IPV6_NEXT_HOP', 63, 16, ''), |
---|
102 |
('IPV6_OPTION_HEADERS', 64, 4, ''), |
---|
103 |
('VENDOR_PROPRIETARY_3', 65, 0, ''), |
---|
104 |
('VENDOR_PROPRIETARY_4', 66, 0, ''), |
---|
105 |
('VENDOR_PROPRIETARY_5', 67, 0, ''), |
---|
106 |
('VENDOR_PROPRIETARY_6', 68, 0, ''), |
---|
107 |
('VENDOR_PROPRIETARY_7', 69, 0, ''), |
---|
108 |
('MPLS_LABEL_1', 70, 3, ''), |
---|
109 |
('MPLS_LABEL_2', 71, 3, ''), |
---|
110 |
('MPLS_LABEL_3', 72, 3, ''), |
---|
111 |
('MPLS_LABEL_4', 73, 3, ''), |
---|
112 |
('MPLS_LABEL_5', 74, 3, ''), |
---|
113 |
('MPLS_LABEL_6', 75, 3, ''), |
---|
114 |
('MPLS_LABEL_7', 76, 3, ''), |
---|
115 |
('MPLS_LABEL_8', 77, 3, ''), |
---|
116 |
('MPLS_LABEL_9', 78, 3, ''), |
---|
117 |
('MPLS_LABEL_10', 79, 3, '') |
---|
118 |
] |
---|
119 |
|
---|
120 |
# Length to unpack()-format lookup table |
---|
121 |
flowtmpl_struct_len = { |
---|
122 |
0 : '', |
---|
123 |
1 : 'B', |
---|
124 |
2 : 'H', |
---|
125 |
4 : 'I' |
---|
126 |
} |
---|
127 |
|
---|
128 |
# Build some dictionaries for faster lookups |
---|
129 |
flowtmpl_struct_id2name = {} |
---|
130 |
flowtmpl_struct_name2id = {} |
---|
131 |
flowtmpl_struct_name2eval = {} |
---|
132 |
flowtmpl_struct_id2pos = {} |
---|
133 |
flowtmpl_struct_name2pos = {} |
---|
134 |
for i in range(0, len(flowtmpl_struct)): |
---|
135 |
tupel = flowtmpl_struct[i] |
---|
136 |
flowtmpl_struct_id2name[tupel[1]] = tupel[0] |
---|
137 |
flowtmpl_struct_name2id[tupel[0]] = tupel[1] |
---|
138 |
flowtmpl_struct_name2eval[tupel[0]] = tupel[3] |
---|
139 |
flowtmpl_struct_id2pos[tupel[1]] = i |
---|
140 |
flowtmpl_struct_name2pos[tupel[0]] = i |
---|
141 |
|
---|
142 |
|
---|
143 |
# Iterate through the flows in the FlowSet and decode all flows included in a FlowSet |
---|
144 |
# Can either save the data in the flowcache or return the data as the return value |
---|
145 |
def flowset_iterate(self, flow_hdr, flowset_hdr, flowset_count, flowset_data, return_data=False): |
---|
146 |
|
---|
147 |
debug('flowset_iterate called: %s, %s, %s, %s' % (str(flow_hdr), str(flowset_hdr), str(flowset_count), str(len(flowset_data)))) |
---|
148 |
|
---|
149 |
i = 0 |
---|
150 |
j = 0 |
---|
151 |
h = 4 # Offset for the header |
---|
152 |
|
---|
153 |
# temporary decode storage, to be passed via return |
---|
154 |
if return_data == True: |
---|
155 |
tmp_decode = [] |
---|
156 |
|
---|
157 |
flowset_decode = [] |
---|
158 |
|
---|
159 |
debug('flowset header: %s' % (str(flowset_hdr))) |
---|
160 |
|
---|
161 |
# Walk the flows included in the passed flowset |
---|
162 |
j = h |
---|
163 |
i = h |
---|
164 |
while j < flowset_hdr[1]: |
---|
165 |
|
---|
166 |
flow_decode = {} |
---|
167 |
|
---|
168 |
unpack_string = '!' |
---|
169 |
unpack_tmpl = [] |
---|
170 |
|
---|
171 |
# Iterate through the template and interprete the data accordingly |
---|
172 |
for tmpl_record in self.template_cache[self.flowsrc][flowset_hdr[0]]: |
---|
173 |
rec_id = tmpl_record[0] |
---|
174 |
if rec_id in self.flowtmpl_struct_id2name: |
---|
175 |
rec_name = self.flowtmpl_struct_id2name[tmpl_record[0]] |
---|
176 |
else: |
---|
177 |
debug('No key "%i" found for tmpl_record id' % rec_id) |
---|
178 |
continue |
---|
179 |
rec_len = tmpl_record[1] |
---|
180 |
unpack_tmpl.append((rec_name, rec_len)) |
---|
181 |
rec_data = flowset_data[i:i+rec_len] |
---|
182 |
rec_data_len = len(rec_data) |
---|
183 |
if rec_len is not rec_data_len: |
---|
184 |
warning('Announced length is not the same as the actual length. PROBLEM!') |
---|
185 |
|
---|
186 |
trace = '' |
---|
187 |
rec_value = '' |
---|
188 |
|
---|
189 |
# Build the unpack string |
---|
190 |
unpack_string = unpack_string + self.flowtmpl_struct_len[rec_data_len] |
---|
191 |
i = i + rec_len |
---|
192 |
|
---|
193 |
flow_data = flowset_data[j:i] |
---|
194 |
debug('flow_data: %i, %i, %s, %s, %s' % (j, i, str(unpack_tmpl), str(unpack_string), toHex(flow_data))) |
---|
195 |
j = i |
---|
196 |
|
---|
197 |
# Try to decode the data |
---|
198 |
decode = [] |
---|
199 |
try: |
---|
200 |
debug('decode: %s, %i' % (str(unpack_string), len(flow_data))) |
---|
201 |
decode = struct.unpack(unpack_string, flow_data) |
---|
202 |
except: |
---|
203 |
trace = traceback.format_exc() |
---|
204 |
print trace |
---|
205 |
warning(trace) |
---|
206 |
|
---|
207 |
|
---|
208 |
# Build the lookup dictionary of the decoded flowset |
---|
209 |
# pprint.pprint(self.template_cache) |
---|
210 |
for seq in range(0, len(self.template_cache[self.flowsrc][flowset_hdr[0]])): |
---|
211 |
a = self.flowtmpl_struct_id2name[self.template_cache[self.flowsrc][flowset_hdr[0]][seq][0]] |
---|
212 |
b = decode[seq] |
---|
213 |
|
---|
214 |
# Do we need some special transformation? |
---|
215 |
if len(self.flowtmpl_struct[self.flowtmpl_struct_name2pos[a]]) > 4: |
---|
216 |
if self.flowtmpl_struct[self.flowtmpl_struct_name2pos[a]][4] == 'ipv4address': |
---|
217 |
b = IPy.IP(b).strNormal() |
---|
218 |
|
---|
219 |
flow_decode[a] = b |
---|
220 |
|
---|
221 |
del decode |
---|
222 |
flowset_decode.append(flow_decode) |
---|
223 |
debug('flow from %s decoded: %s' % (self.flowsrc, str(flow_decode))) |
---|
224 |
|
---|
225 |
# Should we return the data or write it out the the flow_decode storage? |
---|
226 |
if return_data == False: |
---|
227 |
self.flowpkg_decode.append(flowset_decode) |
---|
228 |
else: |
---|
229 |
tmp_decode.append(flowset_decode) |
---|
230 |
del flowset_decode |
---|
231 |
|
---|
232 |
if return_data == True: |
---|
233 |
return tmp_decode |
---|
234 |
|
---|
235 |
# Main function to parse the NetFlow9 data paket |
---|
236 |
def parse(self): |
---|
237 |
if self.flowsrc not in self.template_cache: |
---|
238 |
# Create template dictionary for this FlowProbe |
---|
239 |
self.template_cache[self.flowsrc] = {} |
---|
240 |
|
---|
241 |
# The flow header |
---|
242 |
# Version Number (short), Count (short), sysUpTime (int), |
---|
243 |
# UNIX Secs (int), Sequence Number (int), Source ID (int) |
---|
244 |
flow_hdr = struct.unpack('!HHIIII', self.pkgdata[0:20]) |
---|
245 |
|
---|
246 |
debug('Flow Header: (Version, Count, sysUpTime, Unixtime, Sequence No, Source Id) %s' % str(flow_hdr)) |
---|
247 |
|
---|
248 |
# SanityCheck, see if the first two bytes in the packet are 09, our netflow version |
---|
249 |
if flow_hdr[0] != 9: |
---|
250 |
warning("Not a NetFlow v9 flow") |
---|
251 |
return False |
---|
252 |
|
---|
253 |
self.flowpkg_decode.append(dict(zip(('version', 'count', 'uptime', 'unixtime', 'sequence', 'sourceID'), flow_hdr))) |
---|
254 |
self.flowpkg_decode[0]['sourceIP'] = self.flowsrc |
---|
255 |
|
---|
256 |
# The flow data |
---|
257 |
flowdata = self.pkgdata[20:] |
---|
258 |
flowlen = len(flowdata) |
---|
259 |
|
---|
260 |
# The flowset header |
---|
261 |
pos = 0 |
---|
262 |
flowset_count = 0 |
---|
263 |
|
---|
264 |
# Loop through the flowsets if we have more then one per packet |
---|
265 |
while (pos < flowlen): |
---|
266 |
flowset_count = flowset_count + 1 |
---|
267 |
flowset_hdr = struct.unpack('!HH', flowdata[pos:pos+4]) |
---|
268 |
flowset_data = flowdata[pos:pos+flowset_hdr[1]] |
---|
269 |
|
---|
270 |
# We have a template, add it to the template object overwriting any existing ones with the same ID |
---|
271 |
if flowset_hdr[0] == 0: |
---|
272 |
debug("template received from %s" % str(self.flowsrc)) |
---|
273 |
flowtmpl_hdr = struct.unpack('!HH', flowdata[pos+4:pos+8]) |
---|
274 |
self.template_cache[self.flowsrc][flowtmpl_hdr[0]] = [] |
---|
275 |
|
---|
276 |
# Parse the template |
---|
277 |
for i in range(8, flowset_hdr[1], 4): |
---|
278 |
decode = struct.unpack('!HH', flowset_data[i:i+4]) |
---|
279 |
# Do we claim to export more data/longer data then we expect according to the RFC? |
---|
280 |
if decode[1] is not self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2] and self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2] is not 0: |
---|
281 |
warning('Length mismatch: router: %s, tmpl id: %i, record %s, expected %i, actual %i' |
---|
282 |
%(self.flowsrc, flowtmpl_hdr[0], self.flowtmpl_struct_id2name[decode[0]], |
---|
283 |
self.flowtmpl_struct[self.flowtmpl_struct_id2pos[decode[0]]][2], decode[1])) |
---|
284 |
self.template_cache[self.flowsrc][flowtmpl_hdr[0]].append(decode) |
---|
285 |
|
---|
286 |
# Check if we have any flowsets in the queue we have to parse |
---|
287 |
if self.flowsrc in self.flowset_queue and flowtmpl_hdr[0] in self.flowset_queue[self.flowsrc]: |
---|
288 |
debug("Queue match from %s for template %i" %(self.flowsrc, flowtmpl_hdr[0])) |
---|
289 |
if len(self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]) > 0: |
---|
290 |
|
---|
291 |
# Build the data and pass it to the writeout function |
---|
292 |
if self.flowsrc in self.flowset_queue: |
---|
293 |
while len(self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]]) > 0: |
---|
294 |
queue_flowset_decode = [] |
---|
295 |
queue_flow_hdr, queue_flowset_hdr, queue_flowset_count, queue_flowset_data = self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]].pop(0) |
---|
296 |
queue_flowset_decode.append(dict(zip(('version', 'count', 'uptime', 'unixtime', 'sequence', 'sourceID'), queue_flow_hdr))) |
---|
297 |
queue_flowset_decode[0]['sourceIP'] = self.flowsrc |
---|
298 |
debug('queue: queue_flowset_data_len: %i' % len(queue_flowset_data)) |
---|
299 |
queue_flowset_decode.append(self.flowset_iterate(queue_flow_hdr, queue_flowset_hdr, queue_flowset_count, queue_flowset_data, True)) |
---|
300 |
self.writeout(queue_flowset_decode) |
---|
301 |
|
---|
302 |
# Delete the now empty queue, we don't need it anymore |
---|
303 |
# del self.flowset_queue[self.flowsrc][flowtmpl_hdr[0]] |
---|
304 |
if len(self.flowset_queue[self.flowsrc]) == 0: |
---|
305 |
del self.flowset_queue[self.flowsrc] |
---|
306 |
|
---|
307 |
# We're not a template |
---|
308 |
else: |
---|
309 |
# Check if we already have the necessary template stored |
---|
310 |
if self.flowsrc in self.template_cache and flowset_hdr[0] in self.template_cache[self.flowsrc]: |
---|
311 |
debug("Template known") |
---|
312 |
self.flowset_iterate(flow_hdr, flowset_hdr, flowset_count, flowset_data) |
---|
313 |
|
---|
314 |
else: |
---|
315 |
debug("No Template found from %s, queuing flow" % self.flowsrc) |
---|
316 |
# Initialize queue |
---|
317 |
if self.flowsrc not in self.flowset_queue: |
---|
318 |
self.flowset_queue[self.flowsrc] = {} |
---|
319 |
if flowset_hdr[0] not in self.flowset_queue[self.flowsrc]: |
---|
320 |
self.flowset_queue[self.flowsrc][flowset_hdr[0]] = [] |
---|
321 |
|
---|
322 |
# Add the flowset and necessary headers to the queue for later parsing |
---|
323 |
self.flowset_queue[self.flowsrc][flowset_hdr[0]].append((flow_hdr, flowset_hdr, flowset_count, flowset_data)) |
---|
324 |
|
---|
325 |
pos = flowset_hdr[1] + pos |
---|