root/trunk/flowmon-dbspooler.py

Revision 5, 9.1 kB (checked in by ixs, 16 years ago)

Added better debugging

  • Property svn:executable set to *
Line 
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2008 Red Hat, Inc.
4 # Author: Andreas Thienemann <athienem@redhat.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU Library General Public License as published by
8 # the Free Software Foundation; version 2 only
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Library General Public License for more details.
14 #
15 # You should have received a copy of the GNU Library General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 # Copyright 2004, 2005 Red Hat, Inc.
19 #
20 # AUTHOR: Andreas Thienemann <athienem@redhat.com>
21 #
22
23 import gamin
24 import time
25 import optparse
26 from signal import signal, SIGTERM
27 import atexit
28 from logging import *
29 import os
30 from common import *
31 import pprint
32 import pickle
33 import IPy
34
35 mon = None
36 db = None
37 cursor = None
38
39 # The callback handler for the gamin transaction. Will be executed on changes
40 # in the watched directory
41 # This is the main function doing all the work
42 def callback(path, event):
43     #
44     # the type of events provided in the callbacks.
45     #
46     # GAMChanged=1
47     # GAMDeleted=2
48     # GAMStartExecuting=3
49     # GAMStopExecuting=4
50     # GAMCreated=5
51     # GAMMoved=6
52     # GAMAcknowledge=7
53     # GAMExists=8
54     # GAMEndExist=9
55
56     # Pull in needed vars
57     global db, cursor
58
59     # We're only interested in files ending .pkl, not the .tmp files.
60     if not path.endswith('.pkl'):
61         return False
62
63     # We need a bit of sleep to let the filesystem settle
64     # time.sleep(1)
65     debug('Got callback: %s, %s' % (path, event))
66     if event in (1, 8, 9):
67         pkl_file = os.path.join(SPOOLDIR, path)
68
69         if os.path.isfile(pkl_file) is True:
70             data_file = open(pkl_file, 'rb')
71             try:
72                 data = pickle.load(data_file)
73                 data_file.close()
74 #                os.unlink(pkl_file)
75             except:
76                 trace = traceback.format_exc()
77                 print trace
78                 warning(trace)
79                 return False
80
81         # Insert each flow into the databse
82         try:
83             unix_starttime = ''
84             sourceid = ''
85             for flowset in data:
86                 if type(flowset) is dict:
87                     # The header
88                     # save the uptime and the unixtime
89                     unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000)
90                     # Transform the sourceid to the decimal notation
91                     sourceid = int(IPy.IP(flowset['sourceIP']).strDec())
92
93                     # Workaround for MySQL not correctly working with triggers
94                     if DATABASE is 'mysql':
95                         cursor.execute("START TRANSACTION;")
96                         cursor.execute("SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s", sourceid)
97                         if int(cursor.rowcount) == 0:
98                             # Insert the key
99                             cursor.execute("INSERT INTO FlowSources (FlowSrc) VALUES (%s);", sourceid)
100                         cursor.execute("COMMIT;")
101
102                 elif type(flowset) is list:
103                     # flowset
104                     cursor.execute("START TRANSACTION;")
105                     for flow in flowset:
106                         # flow
107
108                         # General Workaround for flowprobes which do not send the expected data
109                         # Simply create the lookup key with the value None, this will make the SQL insert succeed
110                         for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR',
111                                     'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'):
112                             if key not in flow:
113                                 try:
114                                     flow[key] = None
115                                 except:
116                                     debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow)))
117                                     print key, type(flow)
118
119                         # Workaround for flowprobes which do not send a direction keyword
120                         # We can do an educated guess on the flow-direction based on the existance
121                         # of IN_ or OUT_ keys
122                         if 'DIRECTION' not in flow:
123                             flow['DIRECTION'] = None
124                             if 'IN_BYTES' in flow:
125                                 flow['DIRECTION'] = 0
126                             if 'OUT_BYTES' in flow:
127                                 flow['DIRECTION'] = 1
128
129                         if flow['DIRECTION'] is 0:
130                             # Ingress flow
131                             bytes = flow['IN_BYTES']
132                             pkts = flow['IN_PKTS']
133                         elif flow['DIRECTION'] is 1:
134                             # Egress flow
135                             bytes = flow['OUT_BYTES']
136                             pkts = flow['OUT_PKTS']
137
138                         cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn,
139                                           IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Direction, Bytes, Pakets)
140                                           VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""",
141                                           (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ),
142                                           unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(),
143                                           IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'],
144                                           flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'],
145                                           flow['DIRECTION'], bytes, pkts))
146
147                     debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT']))
148
149                     cursor.execute("COMMIT;")
150             os.unlink(pkl_file)
151         except:
152             trace = traceback.format_exc()
153             print 'error in', pkl_file
154             print trace
155             warning(trace)
156             return False
157            
158
159 """Cleanup handler, called when the program is stopped"""
160 def cleanup():
161     global mon
162     global db, cursor
163
164     # Flush the database
165     cursor.execute("COMMIT")
166
167     # Remove the gamin monitor
168     mon.stop_watch(SPOOLDIR)
169     del mon
170     info('DB Spooler shutdown')
171
172 def main():
173     global db, cursor
174     # CLI Options
175     usage = "usage: %prog [options] arg"
176     parser = optparse.OptionParser(usage)
177     parser.add_option("-c", "--config", dest="configfile", default="config.py",
178                       help="Use FILENAME to read initial configuration from")
179     parser.add_option("-D", "--daemon",
180                       action="store_true", dest="daemon",
181                       help="Run as daemon")
182     parser.add_option("-d", "--loglevel=debug",
183                       action="store_const", const="DEBUG", dest="loglevel",
184                       help="DEBUG loglevel", default="INFO")
185     (options, args) = parser.parse_args()
186
187
188     # Read in configuration
189     execfile(options.configfile, globals())
190
191     # Set up logging
192     basicConfig(level=eval(options.loglevel),
193          format='%(asctime)s %(levelname)s %(module)s %(message)s',
194          filename=LOGFILE_DBSPOOLER, filemode='a+')
195
196     if options.daemon:
197         daemonize()
198
199     # Initialize databse
200     if DATABASE is 'postgresql':
201         import psycopg2
202         try:
203             db = psycodb2.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, dbname=DB_NAME)
204             # Create the database cursor we work with
205             cursor = db.cursor()
206         except psycopg2.Error, e:
207             print "Error %d: %s" % (e.args[0], e.args[1])
208             sys.exit(1)
209     elif DATABASE is 'mysql':
210         import MySQLdb
211         try:
212             db = MySQLdb.connect(host=DB_HOST, user=DB_USER, passwd=DB_PASS, db=DB_NAME)
213             # Create the database cursor we work with
214             cursor = db.cursor()
215         except MySQLdb.Error, e:
216             print "Error %d: %s" % (e.args[0], e.args[1])
217             sys.exit(1)
218     else:
219         print("Unknown database driver requested, exiting.")
220         sys.exit(1)
221
222
223
224     try:
225         atexit.register(cleanup)
226         signal(SIGTERM, lambda signum, stack_frame: exit(1))
227
228         global mon
229         mon = gamin.WatchMonitor()
230         mon.watch_directory(SPOOLDIR, callback)
231         time.sleep(1)
232         info('DB Spooler startup success')
233         while True:
234             time.sleep(10)
235             ret = mon.event_pending()
236             if ret > 0:
237                 ret = mon.handle_one_event()
238                 ret = mon.handle_events()
239
240     except (KeyboardInterrupt, SystemExit):
241         cleanup
242         sys.exit(0)
243     except:
244         raise
245    
246 if __name__ == "__main__":
247     main()
248
Note: See TracBrowser for help on using the browser.