source: trunk/Monitoring/Watch/watch @ 876

Last change on this file since 876 was 876, checked in by jripsl, 11 years ago

Timeout test impl.
Add timestamp.

  • Property svn:executable set to *
File size: 6.8 KB
RevLine 
[854]1#!/usr/bin/env python
[840]2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
[854]15import pika
16import base64
17import json
18import sys
[857]19import signal
[854]20import traceback
21import smtplib
22from email.mime.text import MIMEText
[857]23from datetime import datetime
[840]24
[865]25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
[857]27
[866]28import smon.repo_io as repo_io
29import smon.types
[857]30
[875]31
[859]32class Mail():
[857]33
[840]34        @classmethod
[854]35        def mail_example(cls):
36                me="jripsl@ipsl.jussieu.fr"
37                you="jripsl@ipsl.jussieu.fr"
38                body="Alarm"
39                object="Supervisor"
40
41                cls.mail(me,you,object,body)
42
43        @classmethod
44        def send_mail(cls,me,you,object,body):
45                msg = MIMEText(body)
46                msg['Subject'] = object
47                msg['From'] = me
48                msg['To'] = you
49
50                # Send the message via our own SMTP server, but don't include the # envelope header.
51                s = smtplib.SMTP('localhost')
52                s.sendmail(me,[you], msg.as_string())
53                s.quit()
54
[859]55class Actions():
56
[854]57        @classmethod
[866]58        def store_msg(cls,message):
59                repo_io.create_message(message)
[859]60
61        @classmethod
[875]62        def cleanup(cls,message):
63                repo_io.cleanup() # truncate/delete everything
64
65        @classmethod
[866]66        def set_sim_status_to_error(cls,message):
[859]67
[875]68                s=repo_io.retrieve_simulation(message.simuid)
69
70                s.status="error"
71
72                repo_io.update_simulation_status(s)
73
[859]74        @classmethod
[875]75        def set_sim_status_to_complete(cls,message):
76
77                s=repo_io.retrieve_simulation(message.simuid)
78
79                s.status="complete"
80
81                repo_io.update_simulation_status(s)
82
83        @classmethod
[866]84        def crea_sim(cls,message):
85
[859]86                #repo_io.retrieve_simulation(name)
87                #repo_io.delete_simulation(name)
88
[875]89                simulation=smon.types.Simulation(name=message.simuid,status="running")
[866]90
91                repo_io.create_simulation(simulation)
92
[859]93        @classmethod
94        def mail(cls):
95                cls.mail_example()
96
97        @classmethod
[857]98        def print_stdout(cls,message):
99                # used for debug
100
[875]101                if message.file is not None:
[876]102                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
[857]103                else:
[876]104                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
[857]105
106        @classmethod
[854]107        def log(cls,message):
108                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
[876]109                        log_file.write("%s %s %s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
[854]110
111        @classmethod
112        def execActions(cls,message):
113
[866]114                message_code=message.code
[854]115
[859]116                for action in MessageActionsMapping.mapping[message_code]:
[854]117                        proc_name=action
118
119                        try:
[859]120                                getattr(Actions, proc_name)(message)
[854]121                        except Exception,e:
122                                traceback.print_exc()
123
124                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
125
[859]126class MessageActionsMapping():
127
[875]128        # debug
[876]129        mapping = { "0000":["log", "store_msg", "print_stdout", "crea_sim"],
130                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
131                                "1000":["log", "store_msg", "print_stdout"],
132                                "1100":["log", "store_msg", "print_stdout"],
133                                "2000":["log", "store_msg", "print_stdout"],
134                                "3000":["log", "store_msg", "print_stdout"],
[875]135                                "8888":["cleanup"],
[876]136                                "9000":["log", "store_msg", "print_stdout"],
137                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
[875]138
139        # prod
140        """
[859]141        mapping = { "0000":["log", "store_msg", "crea_sim"],
[875]142                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
[859]143                                "1000":["log", "store_msg"],
[875]144                                "1100":["log", "store_msg"],
[859]145                                "2000":["log", "store_msg"],
146                                "3000":["log", "store_msg"],
[875]147                                "8888":["cleanup"],
[859]148                                "9000":["log", "store_msg", "mail"],
149                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
[875]150        """
[859]151
152class Watcher():
153
[857]154        @classmethod
[859]155        def start(cls):
[871]156                repo_io.init() # open DB connection
[859]157
158        @classmethod
159        def stop(cls):
[871]160                repo_io.free() # close DB connection
[859]161
162        @classmethod
[857]163        def main(self):
[840]164
[857]165                """
166                # parse args
167                parser = argparse.ArgumentParser(prog='watcher')
168                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
169                args = parser.parse_args()
[840]170
[857]171                # check
172                if not os.path.exists(SMON.smon_home):
173                        sys.exit(1)
[840]174
[857]175                SMON.init_singleton()
176                """
[840]177
[857]178                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
179                self.channel = connection.channel()
[840]180
[857]181                #self.channel.queue_declare(queue='myqueue')
[854]182
[857]183                print ' [*] Waiting for messages. To exit press CTRL+C'
[854]184
[857]185                def callback(ch, method, properties, raw_msg):
[854]186
[866]187                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
[854]188
[866]189
190
191
192                        # first deserialization (no JSON here !!!)
193
194                        fields=raw_msg.split(",")
195
196                        l__tmp_dic={}
197
198                        for field in fields:
199
200                                # debug
[871]201                                #print " [x] Received %s" % field
[866]202
203                                splitted_field=field.split(":")
204
205                                key=splitted_field[0]
206                                val=splitted_field[1]
207
208                                l__tmp_dic[key]=val
209
210
[857]211                        # debug
[871]212                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
[866]213
214                       
215                        # base64 decode body
216                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
217
218
219                        # debug
[857]220                        #print " [x] Received %s" % raw_msg
221                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[871]222                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[854]223
[866]224
[871]225                        # message deserialization
[857]226                        message=None
227                        try:
[866]228                                JSON_msg=json.loads(base64_decoded_msg)
229                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
[854]230
[866]231
232
[871]233                        except Exception,e:
234                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[866]235
[875]236                                traceback.print_exc()
237                                raise
[871]238
239
240
241                        # manage config-card file which is attached to the "0000" type message (this file is base64 encoded and need to be unencoded)
242                        #
243                        if "file" in l__tmp_dic:
244
245                                # base64 decode file
246                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
247
248                                # add as msg attribute
249                                message.file=base64_decoded_file
250
251
252
253                        # execute actions
254                        try:
[857]255                                # message code based action
[875]256                                Actions.execActions(message)
257
[857]258                        except Exception,e:
[871]259                                print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[854]260
[871]261                                traceback.print_exc()
[866]262
263                                raise
264
[871]265
[875]266                        # slow down consumer
267                        #time.sleep(0.5)
[871]268
[875]269
[857]270                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
[854]271
[857]272                self.channel.start_consuming()
[840]273
[857]274
275                """
276                SMON.free_singleton()
277                """
278
279def signal_handler(signal, frame):
280                print 'You pressed Ctrl+C!'
281                Watcher.channel.stop_consuming()
[875]282                Watcher.stop()
[857]283                sys.exit(0)
284
[840]285if __name__ == '__main__':
[857]286
287        signal.signal(signal.SIGINT, signal_handler)
288
[840]289        try:
[875]290
291                Watcher.start()
292
[857]293                Watcher.main()
[840]294
[875]295                Watcher.stop()
296
[840]297                sys.exit(0)
298
299        except Exception, e:
300
[875]301                traceback.print_exc()
[840]302
303                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.