source: trunk/Monitoring/Watch/watch @ 877

Last change on this file since 877 was 877, checked in by jripsl, 11 years ago

Fix bug (incorrect simulation_id was used in message table).

  • Property svn:executable set to *
File size: 7.1 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23from datetime import datetime
24
25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
27
28import smon.repo_io as repo_io
29import smon.types
30
31
32class Mail():
33
34        @classmethod
35        def mail_example(cls):
36                me="jripsl@ipsl.jussieu.fr"
37                you="jripsl@ipsl.jussieu.fr"
38                body="Alarm"
39                object="Supervisor"
40
41                cls.mail(me,you,object,body)
42
43        @classmethod
44        def send_mail(cls,me,you,object,body):
45                msg = MIMEText(body)
46                msg['Subject'] = object
47                msg['From'] = me
48                msg['To'] = you
49
50                # Send the message via our own SMTP server, but don't include the # envelope header.
51                s = smtplib.SMTP('localhost')
52                s.sendmail(me,[you], msg.as_string())
53                s.quit()
54
55class Actions():
56
57        @classmethod
58        def store_msg(cls,message):
59
60                # the simu exists when we are here (see TAG0001 tag)
61                s=repo_io.retrieve_simulation(message.simuid)
62
63                repo_io.create_message(message,s)
64
65        @classmethod
66        def cleanup(cls,message):
67                repo_io.cleanup() # truncate/delete everything
68
69        @classmethod
70        def set_sim_status_to_error(cls,message):
71
72                s=repo_io.retrieve_simulation(message.simuid)
73
74                s.status="error"
75
76                repo_io.update_simulation_status(s)
77
78        @classmethod
79        def set_sim_status_to_complete(cls,message):
80
81                s=repo_io.retrieve_simulation(message.simuid)
82
83                s.status="complete"
84
85                repo_io.update_simulation_status(s)
86
87        @classmethod
88        def crea_sim(cls,message):
89
90                #repo_io.retrieve_simulation(name)
91                #repo_io.delete_simulation(name)
92
93                simulation=smon.types.Simulation(name=message.simuid,status="running")
94
95                repo_io.create_simulation(simulation)
96
97        @classmethod
98        def mail(cls):
99                cls.mail_example()
100
101        @classmethod
102        def print_stdout(cls,message):
103                # used for debug
104
105                if message.file is not None:
106                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
107                else:
108                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
109
110        @classmethod
111        def log(cls,message):
112                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
113                        log_file.write("%s %s %s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
114
115        @classmethod
116        def execActions(cls,message):
117
118                message_code=message.code
119
120                for action in MessageActionsMapping.mapping[message_code]:
121                        proc_name=action
122
123                        try:
124                                getattr(Actions, proc_name)(message)
125                        except Exception,e:
126                                traceback.print_exc()
127
128                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
129
130class MessageActionsMapping():
131
132        # debug
133        #
134        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
135        #
136        mapping = { "0000":["crea_sim", "log", "store_msg", "print_stdout"],
137                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
138                                "1000":["log", "store_msg", "print_stdout"],
139                                "1100":["log", "store_msg", "print_stdout"],
140                                "2000":["log", "store_msg", "print_stdout"],
141                                "3000":["log", "store_msg", "print_stdout"],
142                                "8888":["cleanup"],
143                                "9000":["log", "store_msg", "print_stdout"],
144                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
145
146        # prod
147        #
148        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
149        #
150        """
151        mapping = { "0000":["crea_sim", "log", "store_msg"],
152                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
153                                "1000":["log", "store_msg"],
154                                "1100":["log", "store_msg"],
155                                "2000":["log", "store_msg"],
156                                "3000":["log", "store_msg"],
157                                "8888":["cleanup"],
158                                "9000":["log", "store_msg", "mail"],
159                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
160        """
161
162class Watcher():
163
164        @classmethod
165        def start(cls):
166                repo_io.init() # open DB connection
167
168        @classmethod
169        def stop(cls):
170                repo_io.free() # close DB connection
171
172        @classmethod
173        def main(self):
174
175                """
176                # parse args
177                parser = argparse.ArgumentParser(prog='watcher')
178                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
179                args = parser.parse_args()
180
181                # check
182                if not os.path.exists(SMON.smon_home):
183                        sys.exit(1)
184
185                SMON.init_singleton()
186                """
187
188                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
189                self.channel = connection.channel()
190
191                #self.channel.queue_declare(queue='myqueue')
192
193                print ' [*] Waiting for messages. To exit press CTRL+C'
194
195                def callback(ch, method, properties, raw_msg):
196
197                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
198
199
200
201
202                        # first deserialization (no JSON here !!!)
203
204                        fields=raw_msg.split(",")
205
206                        l__tmp_dic={}
207
208                        for field in fields:
209
210                                # debug
211                                #print " [x] Received %s" % field
212
213                                splitted_field=field.split(":")
214
215                                key=splitted_field[0]
216                                val=splitted_field[1]
217
218                                l__tmp_dic[key]=val
219
220
221                        # debug
222                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
223
224                       
225                        # base64 decode body
226                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
227
228
229                        # debug
230                        #print " [x] Received %s" % raw_msg
231                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
232                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
233
234
235                        # message deserialization
236                        message=None
237                        try:
238                                JSON_msg=json.loads(base64_decoded_msg)
239                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
240
241
242
243                        except Exception,e:
244                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
245
246                                traceback.print_exc()
247                                raise
248
249
250
251                        # manage config-card file which is attached to the "0000" type message (this file is base64 encoded and need to be unencoded)
252                        #
253                        if "file" in l__tmp_dic:
254
255                                # base64 decode file
256                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
257
258                                # add as msg attribute
259                                message.file=base64_decoded_file
260
261
262
263                        # execute actions
264                        try:
265                                # message code based action
266                                Actions.execActions(message)
267
268                        except Exception,e:
269                                print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
270
271                                traceback.print_exc()
272
273                                raise
274
275
276                        # slow down consumer
277                        #time.sleep(0.5)
278
279
280                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
281
282                self.channel.start_consuming()
283
284
285                """
286                SMON.free_singleton()
287                """
288
289def signal_handler(signal, frame):
290                print 'You pressed Ctrl+C!'
291                Watcher.channel.stop_consuming()
292                Watcher.stop()
293                sys.exit(0)
294
295if __name__ == '__main__':
296
297        signal.signal(signal.SIGINT, signal_handler)
298
299        try:
300
301                Watcher.start()
302
303                Watcher.main()
304
305                Watcher.stop()
306
307                sys.exit(0)
308
309        except Exception, e:
310
311                traceback.print_exc()
312
313                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.