#!/usr/bin/python # # Copyright (C) 2008 Red Hat, Inc. # Author: Andreas Thienemann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Library General Public License as published by # the Free Software Foundation; version 2 only # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU Library General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Copyright 2004, 2005 Red Hat, Inc. # # AUTHOR: Andreas Thienemann # import gamin import time import optparse from signal import signal, SIGTERM import atexit from logging import * import os from common import * import pprint import pickle import IPy mon = None db = None cursor = None # The callback handler for the gamin transaction. Will be executed on changes # in the watched directory # This is the main function doing all the work def callback(path, event): # # the type of events provided in the callbacks. # # GAMChanged=1 # GAMDeleted=2 # GAMStartExecuting=3 # GAMStopExecuting=4 # GAMCreated=5 # GAMMoved=6 # GAMAcknowledge=7 # GAMExists=8 # GAMEndExist=9 # Pull in needed vars global db, cursor # We're only interested in files ending .pkl, not the .tmp files. if not path.endswith('.pkl'): return False # We need a bit of sleep to let the filesystem settle # time.sleep(1) debug('Got callback: %s, %s' % (path, event)) if event in (1, 8, 9): pkl_file = os.path.join(SPOOLDIR, path) if os.path.isfile(pkl_file) is True: data_file = open(pkl_file, 'rb') try: data = pickle.load(data_file) data_file.close() os.unlink(pkl_file) except: trace = traceback.format_exc() print trace warning(trace) return False # Insert each flow into the databse unix_starttime = '' sourceid = '' for flowset in data: if type(flowset) is dict: # The header # save the uptime and the unixtime unix_starttime = flowset['unixtime'] - ( flowset['uptime'] / 1000) # Transform the sourceid to the decimal notation sourceid = int(IPy.IP(flowset['sourceIP']).strDec()) # Workaround for MySQL not correctly working with triggers if DATABASE is 'mysql': cursor.execute("""START TRANSACTION;""") cursor.execute("""SELECT FlowSrc FROM FlowSources WHERE FlowSrc = %s""", sourceid) if int(cursor.rowcount) == 0: # Insert the key cursor.execute("""INSERT INTO FlowSources (FlowSrc) VALUES (%s);""", sourceid) cursor.execute("""COMMIT;""") elif type(flowset) is list: # flowset cursor.execute("""START TRANSACTION;""") for flow in flowset: # flow # General Workaround for flowprobes which do not send the expected data # Simply create the lookup key with the value None, this will make the SQL insert succeed for key in ('FIRST_SWITCHED', 'LAST_SWITCHED', 'PROTOCOL', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR', 'INPUT_SNMP', 'OUTPUT_SNMP', 'L4_SRC_PORT', 'L4_DST_PORT', 'SRC_MASK', 'DST_MASK'): if key not in flow: debug('key>flow: %s > %s' % (pprint.pformat(key), pprint.pformat(flow))) flow[key] = None # Workaround for flowprobes which do not send a direction keyword # We can do an educated guess on the flow-direction based on the existance # of IN_ or OUT_ keys if 'DIRECTION' not in flow: flow['DIRECTION'] = None if 'IN_BYTES' in flow: flow['DIRECTION'] = 0 if 'OUT_BYTES' in flow: flow['DIRECTION'] = 1 if flow['DIRECTION'] is 0: # Ingress flow bytes = flow['IN_BYTES'] pkts = flow['IN_PKTS'] elif flow['DIRECTION'] is 1: # Egress flow bytes = flow['OUT_BYTES'] pkts = flow['OUT_PKTS'] cursor.execute("""INSERT INTO Flows (FlowSrc, TimeStart, TimeStop, IPProto, IPSrc, IPDst, IntIn, IntOut, PortSrc, PortDst, MaskSrc, MaskDst, Bytes, Pakets) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""", (sourceid, unix_starttime + ( flow['FIRST_SWITCHED'] / 1000 ), unix_starttime + ( flow['LAST_SWITCHED'] / 1000), flow['PROTOCOL'], IPy.IP(flow['IPV4_SRC_ADDR']).strDec(), IPy.IP(flow['IPV4_DST_ADDR']).strDec(), flow['INPUT_SNMP'], flow['OUTPUT_SNMP'], flow['L4_SRC_PORT'], flow['L4_DST_PORT'], flow['SRC_MASK'], flow['DST_MASK'], bytes, pkts)) debug("Flow ready to insert: %s:%s -> %s:%s" % (flow['IPV4_SRC_ADDR'], flow['L4_SRC_PORT'], flow['IPV4_DST_ADDR'], flow['L4_DST_PORT'])) cursor.execute("""COMMIT;""") """Cleanup handler, called when the program is stopped""" def cleanup(): global mon global db, cursor # Flush the database cursor.execute("""COMMIT""") # Remove the gamin monitor mon.stop_watch(SPOOLDIR) del mon info('DB Spooler shutdown') def main(): global db, cursor # CLI Options usage = "usage: %prog [options] arg" parser = optparse.OptionParser(usage) parser.add_option("-c", "--config", dest="configfile", default="config.py", help="Use FILENAME to read initial configuration from") parser.add_option("-D", "--daemon", action="store_true", dest="daemon", help="Run as daemon") parser.add_option("-d", "--loglevel=debug", action="store_const", const="DEBUG", dest="loglevel", help="DEBUG loglevel", default="INFO") (options, args) = parser.parse_args() # Read in configuration execfile(options.configfile, globals()) # Set up logging basicConfig(level=eval(options.loglevel), format='%(asctime)s %(levelname)s %(module)s %(message)s', filename=LOGFILE_DBSPOOLER, filemode='a+') if options.daemon: daemonize() # Initialize databse if DATABASE is 'postgresql': import psycopg2 try: db = psycodb2.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, dbname=DB_NAME) # Create the database cursor we work with cursor = db.cursor() except psycopg2.Error, e: print "Error %d: %s" % (e.args[0], e.args[1]) sys.exit(1) elif DATABASE is 'mysql': import MySQLdb try: db = MySQLdb.connect(host=DB_HOST, user=DB_USER, passwd=DB_PASS, db=DB_NAME) # Create the database cursor we work with cursor = db.cursor() except MySQLdb.Error, e: print "Error %d: %s" % (e.args[0], e.args[1]) sys.exit(1) else: print("Unknown database driver requested, exiting.") sys.exit(1) try: atexit.register(cleanup) signal(SIGTERM, lambda signum, stack_frame: exit(1)) global mon mon = gamin.WatchMonitor() mon.watch_directory(SPOOLDIR, callback) time.sleep(1) info('DB Spooler startup success') while True: time.sleep(10) ret = mon.event_pending() if ret > 0: ret = mon.handle_one_event() ret = mon.handle_events() except (KeyboardInterrupt, SystemExit): cleanup pass except: raise if __name__ == "__main__": main()