#!/usr/bin/env python # -*- coding: ISO-8859-1 -*- ################################## # @program smon # @description simulation monitor # @copyright Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS. # All Rights Reserved” # @svn_file $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $ # @version $Rev: 2545 $ # @lastrevision $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $ # @license CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE) ################################## import pika import base64 import json import sys import signal import smtplib from email.mime.text import MIMEText import datetime import logging # line below is to include "smon" package in the search path sys.path.append("/opt/supervisor/Monitoring") import smon.repo_io as repo_io import smon.types CSTE_BROKER_HOST='cstest-broker.ipsl.jussieu.fr' # cstest #CSTE_BROKER_HOST='localhost' # vesg4 CSTE_LOG_DIR='/var/log/cssupervisor' CSTE_LOG_FILENAME_MAIN='supervisor.log' CSTE_LOG_FILENAME_DEBUG='debug.log' CSTE_LOG_FILENAME_MSG='message.log' # log AMQP msgs CSTE_LOG_FILE_MAIN="%s/%s"%(CSTE_LOG_DIR,CSTE_LOG_FILENAME_MAIN) CSTE_LOG_FILE_MSG="%s/%s"%(CSTE_LOG_DIR,CSTE_LOG_FILENAME_MSG) def create_logger(name,fullpath_filename): # new logger instance logger = logging.getLogger(name) logger.setLevel(logging.DEBUG) # create associated file fh = logging.FileHandler(fullpath_filename) fh.setLevel(logging.DEBUG) # create formatter formatter = logging.Formatter('%(asctime)-15s - %(message)s') fh.setFormatter(formatter) # binding logger.addHandler(fh) return logger # loggers init. logger=create_logger(CSTE_LOG_FILENAME_MAIN,CSTE_LOG_FILE_MAIN) msg_logger=create_logger(CSTE_LOG_FILENAME_MSG,CSTE_LOG_FILE_MSG) class Mail(): @classmethod def mail_example(cls): me="jripsl@ipsl.jussieu.fr" you="jripsl@ipsl.jussieu.fr" body="Alarm" object="Supervisor" cls.mail(me,you,object,body) @classmethod def send_mail(cls,me,you,object,body): msg = MIMEText(body) msg['Subject'] = object msg['From'] = me msg['To'] = you # Send the message via our own SMTP server, but don't include the # envelope header. s = smtplib.SMTP('localhost') s.sendmail(me,[you], msg.as_string()) s.quit() class Actions(): @classmethod def store_msg(cls,message): try: # the simu exists when we are here (see TAG0001 tag) s=repo_io.retrieve_simulation(message.simuid) if s is None: raise Exception("WATCH-ERR102","simulation not found") repo_io.create_message(message,s) except: raise @classmethod def cleanup(cls,message): repo_io.cleanup() # truncate/delete everything @classmethod def set_sim_status_to_error(cls,message): try: s=repo_io.retrieve_simulation(message.simuid) s.status="error" repo_io.update_simulation_status(s) except: raise @classmethod def set_sim_status_to_complete(cls,message): s=repo_io.retrieve_simulation(message.simuid) s.status="complete" repo_io.update_simulation_status(s) @classmethod def crea_sim(cls,message): s=repo_io.retrieve_simulation(message.simuid) if s is not None: #repo_io.delete_simulation(name) s.status="running" repo_io.update_simulation_status(s) else: simulation=smon.types.Simulation(name=message.simuid,status="running") repo_io.create_simulation(simulation) @classmethod def mail(cls): cls.mail_example() @classmethod def log_debug(cls,line): cls.log(CSTE_LOG_FILENAME_DEBUG,line) @classmethod def log(cls,filename,line): with open("%s/%s"%(CSTE_LOG_DIR,filename), "a") as log_file: log_file.write("%s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), line)) @classmethod def log_msg(cls,message): line="%s %s %s %s"%(message.code,message.jobid,message.timestamp,message.command) msg_logger.info(line) """ if message.file is not None: "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file) else: "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp) """ @classmethod def execActions(cls,message): message_code=message.code for action in MessageActionsMapping.mapping[message_code]: proc_name=action try: getattr(Actions, proc_name)(message) except Exception,e: logger.exception("ERR909 - exception occurs") raise Exception("WATCH-ERR002","procedure error (%s)"%(proc_name,)) class MessageActionsMapping(): # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id) # mapping = { "0000":["crea_sim", "log_msg", "store_msg"], "0100":["log_msg", "store_msg", "set_sim_status_to_complete"], "1000":["log_msg", "store_msg"], "1100":["log_msg", "store_msg"], "2000":["log_msg", "store_msg"], "3000":["log_msg", "store_msg"], "7000":["log_msg"], "8888":["cleanup"], "9000":["log_msg", "store_msg"], "9999":["log_msg", "store_msg", "set_sim_status_to_error"] } # prod (mail added for some action) """ "9000":["log_msg", "store_msg", "mail"], "9999":["log_msg", "store_msg", "set_sim_status_to_error", "mail"] } """ class Watcher(): @classmethod def start(cls): repo_io.init() # open DB connection @classmethod def stop(cls): repo_io.free() # close DB connection @classmethod def main(self): """ # parse args parser = argparse.ArgumentParser(prog='watcher') parser.add_argument('-v', dest='verbose',required=False,action='store_true') args = parser.parse_args() # check if not os.path.exists(SMON.smon_home): sys.exit(1) SMON.init_singleton() """ connection = pika.BlockingConnection(pika.ConnectionParameters(host=CSTE_BROKER_HOST)) self.channel = connection.channel() logger.info("[*] Waiting for messages") def callback(ch, method, properties, raw_msg): # msg fmt: body:base64,file:base64 (no JSON here !!!) # first deserialization (no JSON here !!!) fields=raw_msg.split(",") l__tmp_dic={} for field in fields: # debug #logger.debug(" [x] Received %s"%field) splitted_field=field.split(":") key=splitted_field[0] val=splitted_field[1] l__tmp_dic[key]=val # debug #logger.debug(" [x] Received %s (encoded)" % l__tmp_dic["body"]) # base64 decode body base64_decoded_msg=base64.b64decode(l__tmp_dic["body"]) # debug #logger.debug(" [x] Received %s" % raw_msg) #logger.debug(" [x] Received %s (uudecoded)" % base64_decoded_msg ) #logger.debug(" [x] Received %s (uudecoded)" % base64_decoded_msg ) # message deserialization message=None try: JSON_msg=json.loads(base64_decoded_msg) message=smon.types.Message(JSON_msg) # all JSON object members will be available in smon.types.Message object # non working #logger.debug("DEB003 - %s"%message.type) # working #logger.debug("DEB009 - %s"%message.code) except Exception,e: logger.exception("ERR009 - exception occurs") Actions.log_debug("DEB021 - %s"%base64_decoded_msg) raise # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded) # if "file" in l__tmp_dic: # base64 decode file base64_decoded_file=base64.b64decode(l__tmp_dic["file"]) # add as msg attribute message.file=base64_decoded_file # execute actions try: # message code based action Actions.execActions(message) self.channel.basic_ack(delivery_tag = method.delivery_tag) # "msg acknowledgment" stuff except Exception,e: logger.exception("ERR019 - exception occurs") Actions.log_debug("DEB020 - %s"%base64_decoded_msg) raise # slow down consumer #time.sleep(0.5) self.channel.queue_declare(queue='myqueue',durable=True) self.channel.basic_consume(callback, queue='myqueue') # do not set "no_ack=True" anymore here so to enable "msg acknowledgment" (i.e. no_ack default is false) self.channel.start_consuming() """ SMON.free_singleton() """ def signal_handler(signal, frame): logger.info("TERM signal received: exiting.") Watcher.channel.stop_consuming() Watcher.stop() sys.exit(0) if __name__ == '__main__': signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) try: Watcher.start() Watcher.main() Watcher.stop() sys.exit(0) except Exception, e: logger.exception("ERR904 - exception occurred") sys.exit(1) # vim: set ts=4 sw=4 :