Index: trunk/flowmon-dbspooler.py =================================================================== --- trunk/flowmon-dbspooler.py (revision 2) +++ trunk/flowmon-dbspooler.py (revision 5) @@ -72,5 +72,5 @@ data = pickle.load(data_file) data_file.close() - os.unlink(pkl_file) +# os.unlink(pkl_file) except: trace = traceback.format_exc() @@ -80,68 +80,79 @@ # Insert each flow into the databse - unix_starttime = '' - sourceid = '' - for flowset in data: - if type(flowset) is dict: - # The header - # save the uptime and the unixtime - unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) - # Transform the sourceid to the decimal notation - sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) - - # Workaround for MySQL not correctly working with triggers - if DATABASE is 'mysql': - cursor.execute("""START TRANSACTION;""") - cursor.execute("""SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s""", sourceid) - if int(cursor.rowcount) == 0: - # Insert the key - cursor.execute("""INSERT INTO FlowSources (FlowSrc) VALUES (%s);""", sourceid) - cursor.execute("""COMMIT;""") - - elif type(flowset) is list: - # flowset - cursor.execute("""START TRANSACTION;""") - for flow in flowset: - # flow - - # General Workaround for flowprobes which do not send the expected data - # Simply create the lookup key with the value None, this will make the SQL insert succeed - for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', - 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): - if key not in flow: - debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) - flow[key] = None - - # Workaround for flowprobes which do not send a direction keyword - # We can do an educated guess on the flow-direction based on the existance - # of IN_ or OUT_ keys - if 'DIRECTION' not in flow: - flow['DIRECTION'] = None - if 'IN_BYTES' in flow: - flow['DIRECTION'] = 0 - if 'OUT_BYTES' in flow: - flow['DIRECTION'] = 1 - - if flow['DIRECTION'] is 0: - # Ingress flow - bytes = flow['IN_BYTES'] - pkts = flow['IN_PKTS'] - elif flow['DIRECTION'] is 1: - # Egress flow - bytes = flow['OUT_BYTES'] - pkts = flow['OUT_PKTS'] - - cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, - IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", - (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), - unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), - IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], - flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], - flow['DIRECTION'], bytes, pkts)) - - debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) - - cursor.execute("""COMMIT;""") + try: + unix_starttime = '' + sourceid = '' + for flowset in data: + if type(flowset) is dict: + # The header + # save the uptime and the unixtime + unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) + # Transform the sourceid to the decimal notation + sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) + + # Workaround for MySQL not correctly working with triggers + if DATABASE is 'mysql': + cursor.execute("START TRANSACTION;") + cursor.execute("SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s", sourceid) + if int(cursor.rowcount) == 0: + # Insert the key + cursor.execute("INSERT INTO FlowSources (FlowSrc) VALUES (%s);", sourceid) + cursor.execute("COMMIT;") + + elif type(flowset) is list: + # flowset + cursor.execute("START TRANSACTION;") + for flow in flowset: + # flow + + # General Workaround for flowprobes which do not send the expected data + # Simply create the lookup key with the value None, this will make the SQL insert succeed + for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', + 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): + if key not in flow: + try: + flow[key] = None + except: + debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) + print key, type(flow) + + # Workaround for flowprobes which do not send a direction keyword + # We can do an educated guess on the flow-direction based on the existance + # of IN_ or OUT_ keys + if 'DIRECTION' not in flow: + flow['DIRECTION'] = None + if 'IN_BYTES' in flow: + flow['DIRECTION'] = 0 + if 'OUT_BYTES' in flow: + flow['DIRECTION'] = 1 + + if flow['DIRECTION'] is 0: + # Ingress flow + bytes = flow['IN_BYTES'] + pkts = flow['IN_PKTS'] + elif flow['DIRECTION'] is 1: + # Egress flow + bytes = flow['OUT_BYTES'] + pkts = flow['OUT_PKTS'] + + cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, + IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", + (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), + unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), + IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], + flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], + flow['DIRECTION'], bytes, pkts)) + + debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) + + cursor.execute("COMMIT;") + os.unlink(pkl_file) + except: + trace = traceback.format_exc() + print 'error in', pkl_file + print trace + warning(trace) + return False @@ -152,5 +163,5 @@ # Flush the database - cursor.execute("""COMMIT""") + cursor.execute("COMMIT") # Remove the gamin monitor @@ -229,5 +240,5 @@ except (KeyboardInterrupt, SystemExit): cleanup - pass + sys.exit(0) except: raise