source: trunk/Monitoring/Watch/watch @ 972

Last change on this file since 972 was 972, checked in by jripsl, 11 years ago
  • add message acknowledgement.
  • Property svn:executable set to *
File size: 8.4 KB
RevLine 
[854]1#!/usr/bin/env python
[840]2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
[854]15import pika
16import base64
17import json
18import sys
[857]19import signal
[854]20import smtplib
21from email.mime.text import MIMEText
[879]22import datetime
[940]23import logging
[840]24
[865]25# line below is to include "smon" package in the search path
[937]26sys.path.append("/opt/supervisor/Monitoring")
[857]27
[866]28import smon.repo_io as repo_io
29import smon.types
[857]30
[937]31CSTE_BROKER_HOST='cstest-broker.ipsl.jussieu.fr' # cstest
32#CSTE_BROKER_HOST='localhost' # vesg4
[940]33CSTE_LOG_DIR='/var/log/cssupervisor'
34CSTE_LOG_FILENAME_MAIN='supervisor.log'
35CSTE_LOG_FILENAME_DEBUG='debug.log'
36CSTE_LOG_FILENAME_MSG='message.log' # log AMQP msgs
37CSTE_LOG_FILE_MAIN="%s/%s"%(CSTE_LOG_DIR,CSTE_LOG_FILENAME_MAIN)
[966]38CSTE_LOG_FILE_MSG="%s/%s"%(CSTE_LOG_DIR,CSTE_LOG_FILENAME_MSG)
[875]39
[966]40def create_logger(name,fullpath_filename):
[940]41
[966]42        # new logger instance
43        logger = logging.getLogger(name)
44        logger.setLevel(logging.DEBUG)
[940]45
[966]46        # create associated file
47        fh = logging.FileHandler(fullpath_filename)
48        fh.setLevel(logging.DEBUG)
49
50        # create formatter
51        formatter = logging.Formatter('%(asctime)-15s - %(message)s')
52        fh.setFormatter(formatter)
53
54        # binding
55        logger.addHandler(fh)
56
57        return logger
58
59
60
61
62# loggers init.
63logger=create_logger(CSTE_LOG_FILENAME_MAIN,CSTE_LOG_FILE_MAIN)
64msg_logger=create_logger(CSTE_LOG_FILENAME_MSG,CSTE_LOG_FILE_MSG)
65
[859]66class Mail():
[857]67
[840]68        @classmethod
[854]69        def mail_example(cls):
70                me="jripsl@ipsl.jussieu.fr"
71                you="jripsl@ipsl.jussieu.fr"
72                body="Alarm"
73                object="Supervisor"
74
75                cls.mail(me,you,object,body)
76
77        @classmethod
78        def send_mail(cls,me,you,object,body):
79                msg = MIMEText(body)
80                msg['Subject'] = object
81                msg['From'] = me
82                msg['To'] = you
83
84                # Send the message via our own SMTP server, but don't include the # envelope header.
85                s = smtplib.SMTP('localhost')
86                s.sendmail(me,[you], msg.as_string())
87                s.quit()
88
[859]89class Actions():
90
[854]91        @classmethod
[866]92        def store_msg(cls,message):
[859]93
[879]94                try:
[877]95
[879]96                        # the simu exists when we are here (see TAG0001 tag)
97                        s=repo_io.retrieve_simulation(message.simuid)
[877]98
[935]99                        if s is None:
100                                raise Exception("WATCH-ERR102","simulation not found")
101
[879]102                        repo_io.create_message(message,s)
103
104                except:
105                        raise
106
[859]107        @classmethod
[875]108        def cleanup(cls,message):
109                repo_io.cleanup() # truncate/delete everything
110
111        @classmethod
[866]112        def set_sim_status_to_error(cls,message):
[859]113
[879]114                try:
[875]115
[879]116                        s=repo_io.retrieve_simulation(message.simuid)
[875]117
[879]118                        s.status="error"
[875]119
[879]120                        repo_io.update_simulation_status(s)
121
122                except:
123                        raise
124
[859]125        @classmethod
[875]126        def set_sim_status_to_complete(cls,message):
127
128                s=repo_io.retrieve_simulation(message.simuid)
129
130                s.status="complete"
131
132                repo_io.update_simulation_status(s)
133
134        @classmethod
[866]135        def crea_sim(cls,message):
136
[935]137                s=repo_io.retrieve_simulation(message.simuid)
[859]138
[935]139                if s is not None:
140                        #repo_io.delete_simulation(name)
[866]141
[935]142                        s.status="running"
143                        repo_io.update_simulation_status(s)
[866]144
[935]145                else:
146                        simulation=smon.types.Simulation(name=message.simuid,status="running")
147
148                        repo_io.create_simulation(simulation)
149
[859]150        @classmethod
151        def mail(cls):
152                cls.mail_example()
153
154        @classmethod
[940]155        def log_debug(cls,line):
156                cls.log(CSTE_LOG_FILENAME_DEBUG,line)
[857]157
[940]158        @classmethod
159        def log(cls,filename,line):
160                with open("%s/%s"%(CSTE_LOG_DIR,filename), "a") as log_file:
161                        log_file.write("%s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), line))
162
163        @classmethod
164        def log_msg(cls,message):
165                line="%s %s %s %s"%(message.code,message.jobid,message.timestamp,message.command)
[966]166                msg_logger.info(line)
[940]167
[879]168                """
[875]169                if message.file is not None:
[940]170                        "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
[857]171                else:
[940]172                "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
[879]173                """
[857]174
175        @classmethod
[854]176        def execActions(cls,message):
177
[866]178                message_code=message.code
[854]179
[859]180                for action in MessageActionsMapping.mapping[message_code]:
[854]181                        proc_name=action
182
183                        try:
[859]184                                getattr(Actions, proc_name)(message)
[854]185                        except Exception,e:
[966]186                                logger.exception("ERR909 - exception occurs")
[854]187
[966]188                                raise Exception("WATCH-ERR002","procedure error (%s)"%(proc_name,))
[854]189
[859]190class MessageActionsMapping():
191
[877]192        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
193        #
[940]194        mapping = { "0000":["crea_sim", "log_msg", "store_msg"],
195                                "0100":["log_msg", "store_msg", "set_sim_status_to_complete"],
196                                "1000":["log_msg", "store_msg"],
197                                "1100":["log_msg", "store_msg"],
198                                "2000":["log_msg", "store_msg"],
199                                "3000":["log_msg", "store_msg"],
[966]200                                "7000":["log_msg"],
[875]201                                "8888":["cleanup"],
[940]202                                "9000":["log_msg", "store_msg"],
203                                "9999":["log_msg", "store_msg", "set_sim_status_to_error"] }
[875]204
[966]205        # prod (mail added for some action)
[875]206        """
[940]207                                "9000":["log_msg", "store_msg", "mail"],
208                                "9999":["log_msg", "store_msg", "set_sim_status_to_error", "mail"] }
[875]209        """
[859]210
211class Watcher():
212
[857]213        @classmethod
[859]214        def start(cls):
[871]215                repo_io.init() # open DB connection
[859]216
217        @classmethod
218        def stop(cls):
[871]219                repo_io.free() # close DB connection
[859]220
221        @classmethod
[857]222        def main(self):
[840]223
[857]224                """
225                # parse args
226                parser = argparse.ArgumentParser(prog='watcher')
227                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
228                args = parser.parse_args()
[840]229
[857]230                # check
231                if not os.path.exists(SMON.smon_home):
232                        sys.exit(1)
[840]233
[857]234                SMON.init_singleton()
235                """
[840]236
[937]237                connection = pika.BlockingConnection(pika.ConnectionParameters(host=CSTE_BROKER_HOST))
[857]238                self.channel = connection.channel()
[840]239
[966]240                logger.info("[*] Waiting for messages")
[854]241
[857]242                def callback(ch, method, properties, raw_msg):
[854]243
[866]244                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
[854]245
[866]246
247
248
249                        # first deserialization (no JSON here !!!)
250
251                        fields=raw_msg.split(",")
252
253                        l__tmp_dic={}
254
255                        for field in fields:
256
257                                # debug
[966]258                                #logger.debug(" [x] Received %s"%field)
[866]259
260                                splitted_field=field.split(":")
261
262                                key=splitted_field[0]
263                                val=splitted_field[1]
264
265                                l__tmp_dic[key]=val
266
267
[857]268                        # debug
[966]269                        #logger.debug(" [x] Received %s (encoded)" % l__tmp_dic["body"])
[866]270
271                       
272                        # base64 decode body
273                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
274
275
276                        # debug
[966]277                        #logger.debug(" [x] Received %s" % raw_msg)
278                        #logger.debug(" [x] Received %s (uudecoded)" % base64_decoded_msg )
279                        #logger.debug(" [x] Received %s (uudecoded)" % base64_decoded_msg )
[854]280
[866]281
[871]282                        # message deserialization
[857]283                        message=None
284                        try:
[866]285                                JSON_msg=json.loads(base64_decoded_msg)
286                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
[854]287
[879]288                                # non working
[966]289                                #logger.debug("DEB003 - %s"%message.type)
[866]290
[879]291                                # working
[966]292                                #logger.debug("DEB009 - %s"%message.code)
[866]293
[879]294
295
[871]296                        except Exception,e:
[866]297
[966]298                                logger.exception("ERR009 - exception occurs")
[940]299
300                                Actions.log_debug("DEB021 - %s"%base64_decoded_msg) 
301
[875]302                                raise
[871]303
304
305
[879]306                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded)
[871]307                        #
308                        if "file" in l__tmp_dic:
309
310                                # base64 decode file
311                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
312
313                                # add as msg attribute
314                                message.file=base64_decoded_file
315
316
317
318                        # execute actions
319                        try:
[857]320                                # message code based action
[875]321                                Actions.execActions(message)
322
[972]323
324                                self.channel.basic_ack(delivery_tag = method.delivery_tag) # "msg acknowledgment" stuff
325
326
[857]327                        except Exception,e:
[966]328                                logger.exception("ERR019 - exception occurs")
[854]329
[940]330                                Actions.log_debug("DEB020 - %s"%base64_decoded_msg) 
[866]331
332                                raise
333
[871]334
[875]335                        # slow down consumer
336                        #time.sleep(0.5)
[871]337
[966]338                self.channel.queue_declare(queue='myqueue',durable=True)
[875]339
[972]340                self.channel.basic_consume(callback, queue='myqueue') # do not set "no_ack=True" anymore here so to enable "msg acknowledgment" (i.e. no_ack default is false)
[854]341
[857]342                self.channel.start_consuming()
[840]343
[857]344
345                """
346                SMON.free_singleton()
347                """
348
349def signal_handler(signal, frame):
[966]350                logger.info("TERM signal received: exiting.")
[857]351                Watcher.channel.stop_consuming()
[875]352                Watcher.stop()
[857]353                sys.exit(0)
354
[840]355if __name__ == '__main__':
[857]356
[940]357        signal.signal(signal.SIGTERM, signal_handler)
[857]358        signal.signal(signal.SIGINT, signal_handler)
359
[840]360        try:
[875]361
362                Watcher.start()
363
[857]364                Watcher.main()
[840]365
[875]366                Watcher.stop()
367
[840]368                sys.exit(0)
369
370        except Exception, e:
371
[966]372                logger.exception("ERR904 - exception occurred")
[840]373
374                sys.exit(1)
[940]375# vim: set ts=4 sw=4 :
Note: See TracBrowser for help on using the repository browser.