Show
Ignore:
Timestamp:
12/05/08 12:57:15 (16 years ago)
Author:
ixs
Message:

Added better debugging

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/flowmon-dbspooler.py

    r2 r5  
    7272                data = pickle.load(data_file) 
    7373                data_file.close() 
    74                 os.unlink(pkl_file) 
     74#                os.unlink(pkl_file) 
    7575            except: 
    7676                trace = traceback.format_exc() 
     
    8080 
    8181        # Insert each flow into the databse 
    82         unix_starttime = '' 
    83         sourceid = '' 
    84         for flowset in data: 
    85             if type(flowset) is dict: 
    86                 # The header 
    87                 # save the uptime and the unixtime 
    88                 unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) 
    89                 # Transform the sourceid to the decimal notation 
    90                 sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) 
    91  
    92                 # Workaround for MySQL not correctly working with triggers 
    93                 if DATABASE is 'mysql': 
    94                     cursor.execute("""START TRANSACTION;""") 
    95                     cursor.execute("""SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s""", sourceid) 
    96                     if int(cursor.rowcount) == 0: 
    97                         # Insert the key 
    98                         cursor.execute("""INSERT INTO FlowSources (FlowSrc) VALUES (%s);""", sourceid) 
    99                     cursor.execute("""COMMIT;""") 
    100  
    101             elif type(flowset) is list: 
    102                 # flowset 
    103                 cursor.execute("""START TRANSACTION;""") 
    104                 for flow in flowset: 
    105                     # flow 
    106  
    107                     # General Workaround for flowprobes which do not send the expected data 
    108                     # Simply create the lookup key with the value None, this will make the SQL insert succeed 
    109                     for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', 
    110                                 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): 
    111                         if key not in flow: 
    112                             debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) 
    113                             flow[key] = None 
    114  
    115                     # Workaround for flowprobes which do not send a direction keyword 
    116                     # We can do an educated guess on the flow-direction based on the existance 
    117                     # of IN_ or OUT_ keys 
    118                     if 'DIRECTION' not in flow: 
    119                         flow['DIRECTION'] = None 
    120                         if 'IN_BYTES' in flow: 
    121                             flow['DIRECTION'] = 0 
    122                         if 'OUT_BYTES' in flow: 
    123                             flow['DIRECTION'] = 1 
    124  
    125                     if flow['DIRECTION'] is 0: 
    126                         # Ingress flow 
    127                         bytes = flow['IN_BYTES'] 
    128                         pkts = flow['IN_PKTS'] 
    129                     elif flow['DIRECTION'] is 1: 
    130                         # Egress flow 
    131                         bytes = flow['OUT_BYTES'] 
    132                         pkts = flow['OUT_PKTS'] 
    133  
    134                     cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, 
    135                                       IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets) 
    136                                       VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", 
    137                                       (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), 
    138                                       unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), 
    139                                       IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], 
    140                                       flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], 
    141                                       flow['DIRECTION'], bytes, pkts)) 
    142  
    143                 debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) 
    144  
    145                 cursor.execute("""COMMIT;""") 
     82        try:  
     83            unix_starttime = '' 
     84            sourceid = '' 
     85            for flowset in data: 
     86                if type(flowset) is dict: 
     87                    # The header 
     88                    # save the uptime and the unixtime 
     89                    unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) 
     90                    # Transform the sourceid to the decimal notation 
     91                    sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) 
     92 
     93                    # Workaround for MySQL not correctly working with triggers 
     94                    if DATABASE is 'mysql': 
     95                        cursor.execute("START TRANSACTION;") 
     96                        cursor.execute("SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s", sourceid) 
     97                        if int(cursor.rowcount) == 0: 
     98                            # Insert the key 
     99                            cursor.execute("INSERT INTO FlowSources (FlowSrc) VALUES (%s);", sourceid) 
     100                        cursor.execute("COMMIT;") 
     101 
     102                elif type(flowset) is list: 
     103                    # flowset 
     104                    cursor.execute("START TRANSACTION;") 
     105                    for flow in flowset: 
     106                        # flow 
     107 
     108                        # General Workaround for flowprobes which do not send the expected data 
     109                        # Simply create the lookup key with the value None, this will make the SQL insert succeed 
     110                        for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', 
     111                                    'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): 
     112                            if key not in flow: 
     113                                try: 
     114                                    flow[key] = None 
     115                                except: 
     116                                    debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) 
     117                                    print key, type(flow) 
     118 
     119                        # Workaround for flowprobes which do not send a direction keyword 
     120                        # We can do an educated guess on the flow-direction based on the existance 
     121                        # of IN_ or OUT_ keys 
     122                        if 'DIRECTION' not in flow: 
     123                            flow['DIRECTION'] = None 
     124                            if 'IN_BYTES' in flow: 
     125                                flow['DIRECTION'] = 0 
     126                            if 'OUT_BYTES' in flow: 
     127                                flow['DIRECTION'] = 1 
     128 
     129                        if flow['DIRECTION'] is 0: 
     130                            # Ingress flow 
     131                            bytes = flow['IN_BYTES'] 
     132                            pkts = flow['IN_PKTS'] 
     133                        elif flow['DIRECTION'] is 1: 
     134                            # Egress flow 
     135                            bytes = flow['OUT_BYTES'] 
     136                            pkts = flow['OUT_PKTS'] 
     137 
     138                        cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, 
     139                                          IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets) 
     140                                          VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", 
     141                                          (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), 
     142                                          unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), 
     143                                          IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], 
     144                                          flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], 
     145                                          flow['DIRECTION'], bytes, pkts)) 
     146 
     147                    debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) 
     148 
     149                    cursor.execute("COMMIT;") 
     150            os.unlink(pkl_file) 
     151        except: 
     152            trace = traceback.format_exc() 
     153            print 'error in', pkl_file 
     154            print trace 
     155            warning(trace) 
     156            return False 
    146157             
    147158 
     
    152163 
    153164    # Flush the database 
    154     cursor.execute("""COMMIT""") 
     165    cursor.execute("COMMIT") 
    155166 
    156167    # Remove the gamin monitor 
     
    229240    except (KeyboardInterrupt, SystemExit): 
    230241        cleanup 
    231         pass 
     242        sys.exit(0) 
    232243    except: 
    233244        raise