source: trunk/Monitoring/Watch/watch @ 879

Last change on this file since 879 was 879, checked in by jripsl, 11 years ago

Fix heartbeat test.

  • Property svn:executable set to *
File size: 7.4 KB
RevLine 
[854]1#!/usr/bin/env python
[840]2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
[854]15import pika
16import base64
17import json
18import sys
[857]19import signal
[854]20import traceback
21import smtplib
22from email.mime.text import MIMEText
[879]23import datetime
[840]24
[865]25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
[857]27
[866]28import smon.repo_io as repo_io
29import smon.types
[857]30
[875]31
[859]32class Mail():
[857]33
[840]34        @classmethod
[854]35        def mail_example(cls):
36                me="jripsl@ipsl.jussieu.fr"
37                you="jripsl@ipsl.jussieu.fr"
38                body="Alarm"
39                object="Supervisor"
40
41                cls.mail(me,you,object,body)
42
43        @classmethod
44        def send_mail(cls,me,you,object,body):
45                msg = MIMEText(body)
46                msg['Subject'] = object
47                msg['From'] = me
48                msg['To'] = you
49
50                # Send the message via our own SMTP server, but don't include the # envelope header.
51                s = smtplib.SMTP('localhost')
52                s.sendmail(me,[you], msg.as_string())
53                s.quit()
54
[859]55class Actions():
56
[854]57        @classmethod
[866]58        def store_msg(cls,message):
[859]59
[879]60                try:
[877]61
[879]62                        # the simu exists when we are here (see TAG0001 tag)
63                        s=repo_io.retrieve_simulation(message.simuid)
[877]64
[879]65                        repo_io.create_message(message,s)
66
67                        sys.exit()
68
69                except:
70                        traceback.print_exc()
71                        raise
72
[859]73        @classmethod
[875]74        def cleanup(cls,message):
75                repo_io.cleanup() # truncate/delete everything
76
77        @classmethod
[866]78        def set_sim_status_to_error(cls,message):
[859]79
[879]80                try:
[875]81
[879]82                        s=repo_io.retrieve_simulation(message.simuid)
[875]83
[879]84                        s.status="error"
[875]85
[879]86                        repo_io.update_simulation_status(s)
87
88                except:
89                        traceback.print_exc()
90                        raise
91
[859]92        @classmethod
[875]93        def set_sim_status_to_complete(cls,message):
94
95                s=repo_io.retrieve_simulation(message.simuid)
96
97                s.status="complete"
98
99                repo_io.update_simulation_status(s)
100
101        @classmethod
[866]102        def crea_sim(cls,message):
103
[859]104                #repo_io.retrieve_simulation(name)
105                #repo_io.delete_simulation(name)
106
[875]107                simulation=smon.types.Simulation(name=message.simuid,status="running")
[866]108
109                repo_io.create_simulation(simulation)
110
[859]111        @classmethod
112        def mail(cls):
113                cls.mail_example()
114
115        @classmethod
[857]116        def print_stdout(cls,message):
117                # used for debug
118
[879]119                """
[875]120                if message.file is not None:
[876]121                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
[857]122                else:
[876]123                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
[879]124                """
[857]125
[879]126                print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
127
[857]128        @classmethod
[854]129        def log(cls,message):
130                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
[879]131                        log_file.write("%s %s %s %s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
[854]132
133        @classmethod
134        def execActions(cls,message):
135
[866]136                message_code=message.code
[854]137
[859]138                for action in MessageActionsMapping.mapping[message_code]:
[854]139                        proc_name=action
140
141                        try:
[859]142                                getattr(Actions, proc_name)(message)
[854]143                        except Exception,e:
144                                traceback.print_exc()
145
146                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
147
[859]148class MessageActionsMapping():
149
[875]150        # debug
[877]151        #
152        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
153        #
154        mapping = { "0000":["crea_sim", "log", "store_msg", "print_stdout"],
[876]155                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
156                                "1000":["log", "store_msg", "print_stdout"],
157                                "1100":["log", "store_msg", "print_stdout"],
158                                "2000":["log", "store_msg", "print_stdout"],
159                                "3000":["log", "store_msg", "print_stdout"],
[875]160                                "8888":["cleanup"],
[876]161                                "9000":["log", "store_msg", "print_stdout"],
162                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
[875]163
164        # prod
[877]165        #
166        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
167        #
[875]168        """
[877]169        mapping = { "0000":["crea_sim", "log", "store_msg"],
[875]170                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
[859]171                                "1000":["log", "store_msg"],
[875]172                                "1100":["log", "store_msg"],
[859]173                                "2000":["log", "store_msg"],
174                                "3000":["log", "store_msg"],
[875]175                                "8888":["cleanup"],
[859]176                                "9000":["log", "store_msg", "mail"],
177                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
[875]178        """
[859]179
180class Watcher():
181
[857]182        @classmethod
[859]183        def start(cls):
[871]184                repo_io.init() # open DB connection
[859]185
186        @classmethod
187        def stop(cls):
[871]188                repo_io.free() # close DB connection
[859]189
190        @classmethod
[857]191        def main(self):
[840]192
[857]193                """
194                # parse args
195                parser = argparse.ArgumentParser(prog='watcher')
196                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
197                args = parser.parse_args()
[840]198
[857]199                # check
200                if not os.path.exists(SMON.smon_home):
201                        sys.exit(1)
[840]202
[857]203                SMON.init_singleton()
204                """
[840]205
[857]206                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
207                self.channel = connection.channel()
[840]208
[857]209                #self.channel.queue_declare(queue='myqueue')
[854]210
[857]211                print ' [*] Waiting for messages. To exit press CTRL+C'
[854]212
[857]213                def callback(ch, method, properties, raw_msg):
[854]214
[866]215                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
[854]216
[866]217
218
219
220                        # first deserialization (no JSON here !!!)
221
222                        fields=raw_msg.split(",")
223
224                        l__tmp_dic={}
225
226                        for field in fields:
227
228                                # debug
[871]229                                #print " [x] Received %s" % field
[866]230
231                                splitted_field=field.split(":")
232
233                                key=splitted_field[0]
234                                val=splitted_field[1]
235
236                                l__tmp_dic[key]=val
237
238
[857]239                        # debug
[871]240                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
[866]241
242                       
243                        # base64 decode body
244                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
245
246
247                        # debug
[857]248                        #print " [x] Received %s" % raw_msg
249                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[871]250                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[854]251
[866]252
[871]253                        # message deserialization
[857]254                        message=None
255                        try:
[866]256                                JSON_msg=json.loads(base64_decoded_msg)
257                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
[854]258
[879]259                                # non working
260                                #print message.type
[866]261
[879]262                                # working
263                                #print message.code
[866]264
[879]265
266
[871]267                        except Exception,e:
268                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[866]269
[875]270                                traceback.print_exc()
271                                raise
[871]272
273
274
[879]275                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded)
[871]276                        #
277                        if "file" in l__tmp_dic:
278
279                                # base64 decode file
280                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
281
282                                # add as msg attribute
283                                message.file=base64_decoded_file
284
285
286
287                        # execute actions
288                        try:
[857]289                                # message code based action
[875]290                                Actions.execActions(message)
291
[857]292                        except Exception,e:
[871]293                                print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[854]294
[871]295                                traceback.print_exc()
[866]296
297                                raise
298
[871]299
[875]300                        # slow down consumer
301                        #time.sleep(0.5)
[871]302
[875]303
[857]304                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
[854]305
[857]306                self.channel.start_consuming()
[840]307
[857]308
309                """
310                SMON.free_singleton()
311                """
312
313def signal_handler(signal, frame):
314                print 'You pressed Ctrl+C!'
315                Watcher.channel.stop_consuming()
[875]316                Watcher.stop()
[857]317                sys.exit(0)
318
[840]319if __name__ == '__main__':
[857]320
321        signal.signal(signal.SIGINT, signal_handler)
322
[840]323        try:
[875]324
325                Watcher.start()
326
[857]327                Watcher.main()
[840]328
[875]329                Watcher.stop()
330
[840]331                sys.exit(0)
332
333        except Exception, e:
334
[875]335                traceback.print_exc()
[840]336
337                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.