82 | | unix_starttime = '' |
---|
83 | | sourceid = '' |
---|
84 | | for flowset in data: |
---|
85 | | if type(flowset) is dict: |
---|
86 | | # The header |
---|
87 | | # save the uptime and the unixtime |
---|
88 | | unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) |
---|
89 | | # Transform the sourceid to the decimal notation |
---|
90 | | sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) |
---|
91 | | |
---|
92 | | # Workaround for MySQL not correctly working with triggers |
---|
93 | | if DATABASE is 'mysql': |
---|
94 | | cursor.execute("""START TRANSACTION;""") |
---|
95 | | cursor.execute("""SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s""", sourceid) |
---|
96 | | if int(cursor.rowcount) == 0: |
---|
97 | | # Insert the key |
---|
98 | | cursor.execute("""INSERT INTO FlowSources (FlowSrc) VALUES (%s);""", sourceid) |
---|
99 | | cursor.execute("""COMMIT;""") |
---|
100 | | |
---|
101 | | elif type(flowset) is list: |
---|
102 | | # flowset |
---|
103 | | cursor.execute("""START TRANSACTION;""") |
---|
104 | | for flow in flowset: |
---|
105 | | # flow |
---|
106 | | |
---|
107 | | # General Workaround for flowprobes which do not send the expected data |
---|
108 | | # Simply create the lookup key with the value None, this will make the SQL insert succeed |
---|
109 | | for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', |
---|
110 | | 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): |
---|
111 | | if key not in flow: |
---|
112 | | debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) |
---|
113 | | flow[key] = None |
---|
114 | | |
---|
115 | | # Workaround for flowprobes which do not send a direction keyword |
---|
116 | | # We can do an educated guess on the flow-direction based on the existance |
---|
117 | | # of IN_ or OUT_ keys |
---|
118 | | if 'DIRECTION' not in flow: |
---|
119 | | flow['DIRECTION'] = None |
---|
120 | | if 'IN_BYTES' in flow: |
---|
121 | | flow['DIRECTION'] = 0 |
---|
122 | | if 'OUT_BYTES' in flow: |
---|
123 | | flow['DIRECTION'] = 1 |
---|
124 | | |
---|
125 | | if flow['DIRECTION'] is 0: |
---|
126 | | # Ingress flow |
---|
127 | | bytes = flow['IN_BYTES'] |
---|
128 | | pkts = flow['IN_PKTS'] |
---|
129 | | elif flow['DIRECTION'] is 1: |
---|
130 | | # Egress flow |
---|
131 | | bytes = flow['OUT_BYTES'] |
---|
132 | | pkts = flow['OUT_PKTS'] |
---|
133 | | |
---|
134 | | cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, |
---|
135 | | IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets) |
---|
136 | | VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", |
---|
137 | | (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), |
---|
138 | | unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), |
---|
139 | | IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], |
---|
140 | | flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], |
---|
141 | | flow['DIRECTION'], bytes, pkts)) |
---|
142 | | |
---|
143 | | debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) |
---|
144 | | |
---|
145 | | cursor.execute("""COMMIT;""") |
---|
| 82 | try: |
---|
| 83 | unix_starttime = '' |
---|
| 84 | sourceid = '' |
---|
| 85 | for flowset in data: |
---|
| 86 | if type(flowset) is dict: |
---|
| 87 | # The header |
---|
| 88 | # save the uptime and the unixtime |
---|
| 89 | unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) |
---|
| 90 | # Transform the sourceid to the decimal notation |
---|
| 91 | sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) |
---|
| 92 | |
---|
| 93 | # Workaround for MySQL not correctly working with triggers |
---|
| 94 | if DATABASE is 'mysql': |
---|
| 95 | cursor.execute("START TRANSACTION;") |
---|
| 96 | cursor.execute("SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s", sourceid) |
---|
| 97 | if int(cursor.rowcount) == 0: |
---|
| 98 | # Insert the key |
---|
| 99 | cursor.execute("INSERT INTO FlowSources (FlowSrc) VALUES (%s);", sourceid) |
---|
| 100 | cursor.execute("COMMIT;") |
---|
| 101 | |
---|
| 102 | elif type(flowset) is list: |
---|
| 103 | # flowset |
---|
| 104 | cursor.execute("START TRANSACTION;") |
---|
| 105 | for flow in flowset: |
---|
| 106 | # flow |
---|
| 107 | |
---|
| 108 | # General Workaround for flowprobes which do not send the expected data |
---|
| 109 | # Simply create the lookup key with the value None, this will make the SQL insert succeed |
---|
| 110 | for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', |
---|
| 111 | 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): |
---|
| 112 | if key not in flow: |
---|
| 113 | try: |
---|
| 114 | flow[key] = None |
---|
| 115 | except: |
---|
| 116 | debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) |
---|
| 117 | print key, type(flow) |
---|
| 118 | |
---|
| 119 | # Workaround for flowprobes which do not send a direction keyword |
---|
| 120 | # We can do an educated guess on the flow-direction based on the existance |
---|
| 121 | # of IN_ or OUT_ keys |
---|
| 122 | if 'DIRECTION' not in flow: |
---|
| 123 | flow['DIRECTION'] = None |
---|
| 124 | if 'IN_BYTES' in flow: |
---|
| 125 | flow['DIRECTION'] = 0 |
---|
| 126 | if 'OUT_BYTES' in flow: |
---|
| 127 | flow['DIRECTION'] = 1 |
---|
| 128 | |
---|
| 129 | if flow['DIRECTION'] is 0: |
---|
| 130 | # Ingress flow |
---|
| 131 | bytes = flow['IN_BYTES'] |
---|
| 132 | pkts = flow['IN_PKTS'] |
---|
| 133 | elif flow['DIRECTION'] is 1: |
---|
| 134 | # Egress flow |
---|
| 135 | bytes = flow['OUT_BYTES'] |
---|
| 136 | pkts = flow['OUT_PKTS'] |
---|
| 137 | |
---|
| 138 | cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, |
---|
| 139 | IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets) |
---|
| 140 | VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", |
---|
| 141 | (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), |
---|
| 142 | unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), |
---|
| 143 | IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], |
---|
| 144 | flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], |
---|
| 145 | flow['DIRECTION'], bytes, pkts)) |
---|
| 146 | |
---|
| 147 | debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) |
---|
| 148 | |
---|
| 149 | cursor.execute("COMMIT;") |
---|
| 150 | os.unlink(pkl_file) |
---|
| 151 | except: |
---|
| 152 | trace = traceback.format_exc() |
---|
| 153 | print 'error in', pkl_file |
---|
| 154 | print trace |
---|
| 155 | warning(trace) |
---|
| 156 | return False |
---|