root/trunk/flowmon-dbspooler.py

Revision 2, 8.5 kB (checked in by ixs, 16 years ago)

Added Direction tracking to the database in order to workaround flowprobes who omit this value

  • Property svn:executable set to *
Line 
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2008 Red Hat, Inc.
4 # Author: Andreas Thienemann <athienem@redhat.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU Library General Public License as published by
8 # the Free Software Foundation; version 2 only
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Library General Public License for more details.
14 #
15 # You should have received a copy of the GNU Library General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 # Copyright 2004, 2005 Red Hat, Inc.
19 #
20 # AUTHOR: Andreas Thienemann <athienem@redhat.com>
21 #
22
23 import gamin
24 import time
25 import optparse
26 from signal import signal, SIGTERM
27 import atexit
28 from logging import *
29 import os
30 from common import *
31 import pprint
32 import pickle
33 import IPy
34
35 mon = None
36 db = None
37 cursor = None
38
39 # The callback handler for the gamin transaction. Will be executed on changes
40 # in the watched directory
41 # This is the main function doing all the work
42 def callback(path, event):
43     #
44     # the type of events provided in the callbacks.
45     #
46     # GAMChanged=1
47     # GAMDeleted=2
48     # GAMStartExecuting=3
49     # GAMStopExecuting=4
50     # GAMCreated=5
51     # GAMMoved=6
52     # GAMAcknowledge=7
53     # GAMExists=8
54     # GAMEndExist=9
55
56     # Pull in needed vars
57     global db, cursor
58
59     # We're only interested in files ending .pkl, not the .tmp files.
60     if not path.endswith('.pkl'):
61         return False
62
63     # We need a bit of sleep to let the filesystem settle
64     # time.sleep(1)
65     debug('Got callback: %s, %s' % (path, event))
66     if event in (1, 8, 9):
67         pkl_file = os.path.join(SPOOLDIR, path)
68
69         if os.path.isfile(pkl_file) is True:
70             data_file = open(pkl_file, 'rb')
71             try:
72                 data = pickle.load(data_file)
73                 data_file.close()
74                 os.unlink(pkl_file)
75             except:
76                 trace = traceback.format_exc()
77                 print trace
78                 warning(trace)
79                 return False
80
81         # Insert each flow into the databse
82         unix_starttime = ''
83         sourceid = ''
84         for flowset in data:
85             if type(flowset) is dict:
86                 # The header
87                 # save the uptime and the unixtime
88                 unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000)
89                 # Transform the sourceid to the decimal notation
90                 sourceid = int(IPy.IP(flowset['sourceIP']).strDec())
91
92                 # Workaround for MySQL not correctly working with triggers
93                 if DATABASE is 'mysql':
94                     cursor.execute("""START TRANSACTION;""")
95                     cursor.execute("""SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s""", sourceid)
96                     if int(cursor.rowcount) == 0:
97                         # Insert the key
98                         cursor.execute("""INSERT INTO FlowSources (FlowSrc) VALUES (%s);""", sourceid)
99                     cursor.execute("""COMMIT;""")
100
101             elif type(flowset) is list:
102                 # flowset
103                 cursor.execute("""START TRANSACTION;""")
104                 for flow in flowset:
105                     # flow
106
107                     # General Workaround for flowprobes which do not send the expected data
108                     # Simply create the lookup key with the value None, this will make the SQL insert succeed
109                     for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR',
110                                 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'):
111                         if key not in flow:
112                             debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow)))
113                             flow[key] = None
114
115                     # Workaround for flowprobes which do not send a direction keyword
116                     # We can do an educated guess on the flow-direction based on the existance
117                     # of IN_ or OUT_ keys
118                     if 'DIRECTION' not in flow:
119                         flow['DIRECTION'] = None
120                         if 'IN_BYTES' in flow:
121                             flow['DIRECTION'] = 0
122                         if 'OUT_BYTES' in flow:
123                             flow['DIRECTION'] = 1
124
125                     if flow['DIRECTION'] is 0:
126                         # Ingress flow
127                         bytes = flow['IN_BYTES']
128                         pkts = flow['IN_PKTS']
129                     elif flow['DIRECTION'] is 1:
130                         # Egress flow
131                         bytes = flow['OUT_BYTES']
132                         pkts = flow['OUT_PKTS']
133
134                     cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn,
135                                       IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets)
136                                       VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""",
137                                       (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ),
138                                       unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(),
139                                       IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'],
140                                       flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'],
141                                       flow['DIRECTION'], bytes, pkts))
142
143                 debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT']))
144
145                 cursor.execute("""COMMIT;""")
146            
147
148 """Cleanup handler, called when the program is stopped"""
149 def cleanup():
150     global mon
151     global db, cursor
152
153     # Flush the database
154     cursor.execute("""COMMIT""")
155
156     # Remove the gamin monitor
157     mon.stop_watch(SPOOLDIR)
158     del mon
159     info('DB Spooler shutdown')
160
161 def main():
162     global db, cursor
163     # CLI Options
164     usage = "usage: %prog [options] arg"
165     parser = optparse.OptionParser(usage)
166     parser.add_option("-c", "--config", dest="configfile", default="config.py",
167                       help="Use FILENAME to read initial configuration from")
168     parser.add_option("-D", "--daemon",
169                       action="store_true", dest="daemon",
170                       help="Run as daemon")
171     parser.add_option("-d", "--loglevel=debug",
172                       action="store_const", const="DEBUG", dest="loglevel",
173                       help="DEBUG loglevel", default="INFO")
174     (options, args) = parser.parse_args()
175
176
177     # Read in configuration
178     execfile(options.configfile, globals())
179
180     # Set up logging
181     basicConfig(level=eval(options.loglevel),
182          format='%(asctime)s %(levelname)s %(module)s %(message)s',
183          filename=LOGFILE_DBSPOOLER, filemode='a+')
184
185     if options.daemon:
186         daemonize()
187
188     # Initialize databse
189     if DATABASE is 'postgresql':
190         import psycopg2
191         try:
192             db = psycodb2.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, dbname=DB_NAME)
193             # Create the database cursor we work with
194             cursor = db.cursor()
195         except psycopg2.Error, e:
196             print "Error %d: %s" % (e.args[0], e.args[1])
197             sys.exit(1)
198     elif DATABASE is 'mysql':
199         import MySQLdb
200         try:
201             db = MySQLdb.connect(host=DB_HOST, user=DB_USER, passwd=DB_PASS, db=DB_NAME)
202             # Create the database cursor we work with
203             cursor = db.cursor()
204         except MySQLdb.Error, e:
205             print "Error %d: %s" % (e.args[0], e.args[1])
206             sys.exit(1)
207     else:
208         print("Unknown database driver requested, exiting.")
209         sys.exit(1)
210
211
212
213     try:
214         atexit.register(cleanup)
215         signal(SIGTERM, lambda signum, stack_frame: exit(1))
216
217         global mon
218         mon = gamin.WatchMonitor()
219         mon.watch_directory(SPOOLDIR, callback)
220         time.sleep(1)
221         info('DB Spooler startup success')
222         while True:
223             time.sleep(10)
224             ret = mon.event_pending()
225             if ret > 0:
226                 ret = mon.handle_one_event()
227                 ret = mon.handle_events()
228
229     except (KeyboardInterrupt, SystemExit):
230         cleanup
231         pass
232     except:
233         raise
234    
235 if __name__ == "__main__":
236     main()
237
Note: See TracBrowser for help on using the browser.