root/trunk/flowmon-dbspooler.py

Revision 1, 8.5 kB (checked in by ixs, 16 years ago)

initial checkin of RH revision

  • Property svn:executable set to *
Line 
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2008 Red Hat, Inc.
4 # Author: Andreas Thienemann <athienem@redhat.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU Library General Public License as published by
8 # the Free Software Foundation; version 2 only
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Library General Public License for more details.
14 #
15 # You should have received a copy of the GNU Library General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 # Copyright 2004, 2005 Red Hat, Inc.
19 #
20 # AUTHOR: Andreas Thienemann <athienem@redhat.com>
21 #
22
23 import gamin
24 import time
25 import optparse
26 from signal import signal, SIGTERM
27 import atexit
28 from logging import *
29 import os
30 from common import *
31 import pprint
32 import pickle
33 import IPy
34
35 mon = None
36 db = None
37 cursor = None
38
39 # The callback handler for the gamin transaction. Will be executed on changes
40 # in the watched directory
41 # This is the main function doing all the work
42 def callback(path, event):
43     #
44     # the type of events provided in the callbacks.
45     #
46     # GAMChanged=1
47     # GAMDeleted=2
48     # GAMStartExecuting=3
49     # GAMStopExecuting=4
50     # GAMCreated=5
51     # GAMMoved=6
52     # GAMAcknowledge=7
53     # GAMExists=8
54     # GAMEndExist=9
55
56     # Pull in needed vars
57     global db, cursor
58
59     # We're only interested in files ending .pkl, not the .tmp files.
60     if not path.endswith('.pkl'):
61         return False
62
63     # We need a bit of sleep to let the filesystem settle
64     # time.sleep(1)
65     debug('Got callback: %s, %s' % (path, event))
66     if event in (1, 8, 9):
67         pkl_file = os.path.join(SPOOLDIR, path)
68
69         if os.path.isfile(pkl_file) is True:
70             data_file = open(pkl_file, 'rb')
71             try:
72                 data = pickle.load(data_file)
73                 data_file.close()
74                 os.unlink(pkl_file)
75             except:
76                 trace = traceback.format_exc()
77                 print trace
78                 warning(trace)
79                 return False
80
81         # Insert each flow into the databse
82         unix_starttime = ''
83         sourceid = ''
84         for flowset in data:
85             if type(flowset) is dict:
86                 # The header
87                 # save the uptime and the unixtime
88                 unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000)
89                 # Transform the sourceid to the decimal notation
90                 sourceid = int(IPy.IP(flowset['sourceIP']).strDec())
91
92                 # Workaround for MySQL not correctly working with triggers
93                 if DATABASE is 'mysql':
94                     cursor.execute("""START TRANSACTION;""")
95                     cursor.execute("""SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s""", sourceid)
96                     if int(cursor.rowcount) == 0:
97                         # Insert the key
98                         cursor.execute("""INSERT INTO FlowSources (FlowSrc) VALUES (%s);""", sourceid)
99                     cursor.execute("""COMMIT;""")
100
101             elif type(flowset) is list:
102                 # flowset
103                 cursor.execute("""START TRANSACTION;""")
104                 for flow in flowset:
105                     # flow
106
107                     # General Workaround for flowprobes which do not send the expected data
108                     # Simply create the lookup key with the value None, this will make the SQL insert succeed
109                     for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR',
110                                 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'):
111                         if key not in flow:
112                             debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow)))
113                             flow[key] = None
114
115                     # Workaround for flowprobes which do not send a direction keyword
116                     # We can do an educated guess on the flow-direction based on the existance
117                     # of IN_ or OUT_ keys
118                     if 'DIRECTION' not in flow:
119                         flow['DIRECTION'] = None
120                         if 'IN_BYTES' in flow:
121                             flow['DIRECTION'] = 0
122                         if 'OUT_BYTES' in flow:
123                             flow['DIRECTION'] = 1
124
125                     if flow['DIRECTION'] is 0:
126                         # Ingress flow
127                         bytes = flow['IN_BYTES']
128                         pkts = flow['IN_PKTS']
129                     elif flow['DIRECTION'] is 1:
130                         # Egress flow
131                         bytes = flow['OUT_BYTES']
132                         pkts = flow['OUT_PKTS']
133
134                     cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn,
135                                       IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Bytes, Pakets)
136                                       VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""",
137                                       (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ),
138                                       unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(),
139                                       IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'],
140                                       flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], bytes, pkts))
141
142                 debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT']))
143
144                 cursor.execute("""COMMIT;""")
145            
146
147 """Cleanup handler, called when the program is stopped"""
148 def cleanup():
149     global mon
150     global db, cursor
151
152     # Flush the database
153     cursor.execute("""COMMIT""")
154
155     # Remove the gamin monitor
156     mon.stop_watch(SPOOLDIR)
157     del mon
158     info('DB Spooler shutdown')
159
160 def main():
161     global db, cursor
162     # CLI Options
163     usage = "usage: %prog [options] arg"
164     parser = optparse.OptionParser(usage)
165     parser.add_option("-c", "--config", dest="configfile", default="config.py",
166                       help="Use FILENAME to read initial configuration from")
167     parser.add_option("-D", "--daemon",
168                       action="store_true", dest="daemon",
169                       help="Run as daemon")
170     parser.add_option("-d", "--loglevel=debug",
171                       action="store_const", const="DEBUG", dest="loglevel",
172                       help="DEBUG loglevel", default="INFO")
173     (options, args) = parser.parse_args()
174
175
176     # Read in configuration
177     execfile(options.configfile, globals())
178
179     # Set up logging
180     basicConfig(level=eval(options.loglevel),
181          format='%(asctime)s %(levelname)s %(module)s %(message)s',
182          filename=LOGFILE_DBSPOOLER, filemode='a+')
183
184     if options.daemon:
185         daemonize()
186
187     # Initialize databse
188     if DATABASE is 'postgresql':
189         import psycopg2
190         try:
191             db = psycodb2.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, dbname=DB_NAME)
192             # Create the database cursor we work with
193             cursor = db.cursor()
194         except psycopg2.Error, e:
195             print "Error %d: %s" % (e.args[0], e.args[1])
196             sys.exit(1)
197     elif DATABASE is 'mysql':
198         import MySQLdb
199         try:
200             db = MySQLdb.connect(host=DB_HOST, user=DB_USER, passwd=DB_PASS, db=DB_NAME)
201             # Create the database cursor we work with
202             cursor = db.cursor()
203         except MySQLdb.Error, e:
204             print "Error %d: %s" % (e.args[0], e.args[1])
205             sys.exit(1)
206     else:
207         print("Unknown database driver requested, exiting.")
208         sys.exit(1)
209
210
211
212     try:
213         atexit.register(cleanup)
214         signal(SIGTERM, lambda signum, stack_frame: exit(1))
215
216         global mon
217         mon = gamin.WatchMonitor()
218         mon.watch_directory(SPOOLDIR, callback)
219         time.sleep(1)
220         info('DB Spooler startup success')
221         while True:
222             time.sleep(10)
223             ret = mon.event_pending()
224             if ret > 0:
225                 ret = mon.handle_one_event()
226                 ret = mon.handle_events()
227
228     except (KeyboardInterrupt, SystemExit):
229         cleanup
230         pass
231     except:
232         raise
233    
234 if __name__ == "__main__":
235     main()
236
Note: See TracBrowser for help on using the browser.