source: trunk/Monitoring/Watch/watch @ 879

Last change on this file since 879 was 879, checked in by jripsl, 11 years ago

Fix heartbeat test.

  • Property svn:executable set to *
File size: 7.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23import datetime
24
25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
27
28import smon.repo_io as repo_io
29import smon.types
30
31
32class Mail():
33
34        @classmethod
35        def mail_example(cls):
36                me="jripsl@ipsl.jussieu.fr"
37                you="jripsl@ipsl.jussieu.fr"
38                body="Alarm"
39                object="Supervisor"
40
41                cls.mail(me,you,object,body)
42
43        @classmethod
44        def send_mail(cls,me,you,object,body):
45                msg = MIMEText(body)
46                msg['Subject'] = object
47                msg['From'] = me
48                msg['To'] = you
49
50                # Send the message via our own SMTP server, but don't include the # envelope header.
51                s = smtplib.SMTP('localhost')
52                s.sendmail(me,[you], msg.as_string())
53                s.quit()
54
55class Actions():
56
57        @classmethod
58        def store_msg(cls,message):
59
60                try:
61
62                        # the simu exists when we are here (see TAG0001 tag)
63                        s=repo_io.retrieve_simulation(message.simuid)
64
65                        repo_io.create_message(message,s)
66
67                        sys.exit()
68
69                except:
70                        traceback.print_exc()
71                        raise
72
73        @classmethod
74        def cleanup(cls,message):
75                repo_io.cleanup() # truncate/delete everything
76
77        @classmethod
78        def set_sim_status_to_error(cls,message):
79
80                try:
81
82                        s=repo_io.retrieve_simulation(message.simuid)
83
84                        s.status="error"
85
86                        repo_io.update_simulation_status(s)
87
88                except:
89                        traceback.print_exc()
90                        raise
91
92        @classmethod
93        def set_sim_status_to_complete(cls,message):
94
95                s=repo_io.retrieve_simulation(message.simuid)
96
97                s.status="complete"
98
99                repo_io.update_simulation_status(s)
100
101        @classmethod
102        def crea_sim(cls,message):
103
104                #repo_io.retrieve_simulation(name)
105                #repo_io.delete_simulation(name)
106
107                simulation=smon.types.Simulation(name=message.simuid,status="running")
108
109                repo_io.create_simulation(simulation)
110
111        @classmethod
112        def mail(cls):
113                cls.mail_example()
114
115        @classmethod
116        def print_stdout(cls,message):
117                # used for debug
118
119                """
120                if message.file is not None:
121                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
122                else:
123                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
124                """
125
126                print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
127
128        @classmethod
129        def log(cls,message):
130                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
131                        log_file.write("%s %s %s %s %s\n"%(datetime.datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
132
133        @classmethod
134        def execActions(cls,message):
135
136                message_code=message.code
137
138                for action in MessageActionsMapping.mapping[message_code]:
139                        proc_name=action
140
141                        try:
142                                getattr(Actions, proc_name)(message)
143                        except Exception,e:
144                                traceback.print_exc()
145
146                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
147
148class MessageActionsMapping():
149
150        # debug
151        #
152        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
153        #
154        mapping = { "0000":["crea_sim", "log", "store_msg", "print_stdout"],
155                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
156                                "1000":["log", "store_msg", "print_stdout"],
157                                "1100":["log", "store_msg", "print_stdout"],
158                                "2000":["log", "store_msg", "print_stdout"],
159                                "3000":["log", "store_msg", "print_stdout"],
160                                "8888":["cleanup"],
161                                "9000":["log", "store_msg", "print_stdout"],
162                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
163
164        # prod
165        #
166        # TAG0001: note that crea_sim must be BEFORE store_msg in the list (because when we insert the msg, we need the simu_id)
167        #
168        """
169        mapping = { "0000":["crea_sim", "log", "store_msg"],
170                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
171                                "1000":["log", "store_msg"],
172                                "1100":["log", "store_msg"],
173                                "2000":["log", "store_msg"],
174                                "3000":["log", "store_msg"],
175                                "8888":["cleanup"],
176                                "9000":["log", "store_msg", "mail"],
177                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
178        """
179
180class Watcher():
181
182        @classmethod
183        def start(cls):
184                repo_io.init() # open DB connection
185
186        @classmethod
187        def stop(cls):
188                repo_io.free() # close DB connection
189
190        @classmethod
191        def main(self):
192
193                """
194                # parse args
195                parser = argparse.ArgumentParser(prog='watcher')
196                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
197                args = parser.parse_args()
198
199                # check
200                if not os.path.exists(SMON.smon_home):
201                        sys.exit(1)
202
203                SMON.init_singleton()
204                """
205
206                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
207                self.channel = connection.channel()
208
209                #self.channel.queue_declare(queue='myqueue')
210
211                print ' [*] Waiting for messages. To exit press CTRL+C'
212
213                def callback(ch, method, properties, raw_msg):
214
215                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
216
217
218
219
220                        # first deserialization (no JSON here !!!)
221
222                        fields=raw_msg.split(",")
223
224                        l__tmp_dic={}
225
226                        for field in fields:
227
228                                # debug
229                                #print " [x] Received %s" % field
230
231                                splitted_field=field.split(":")
232
233                                key=splitted_field[0]
234                                val=splitted_field[1]
235
236                                l__tmp_dic[key]=val
237
238
239                        # debug
240                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
241
242                       
243                        # base64 decode body
244                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
245
246
247                        # debug
248                        #print " [x] Received %s" % raw_msg
249                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
250                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
251
252
253                        # message deserialization
254                        message=None
255                        try:
256                                JSON_msg=json.loads(base64_decoded_msg)
257                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
258
259                                # non working
260                                #print message.type
261
262                                # working
263                                #print message.code
264
265
266
267                        except Exception,e:
268                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
269
270                                traceback.print_exc()
271                                raise
272
273
274
275                        # manage config-card file which is attached to the "0000" code message (this file is base64 encoded and need to be unencoded)
276                        #
277                        if "file" in l__tmp_dic:
278
279                                # base64 decode file
280                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
281
282                                # add as msg attribute
283                                message.file=base64_decoded_file
284
285
286
287                        # execute actions
288                        try:
289                                # message code based action
290                                Actions.execActions(message)
291
292                        except Exception,e:
293                                print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
294
295                                traceback.print_exc()
296
297                                raise
298
299
300                        # slow down consumer
301                        #time.sleep(0.5)
302
303
304                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
305
306                self.channel.start_consuming()
307
308
309                """
310                SMON.free_singleton()
311                """
312
313def signal_handler(signal, frame):
314                print 'You pressed Ctrl+C!'
315                Watcher.channel.stop_consuming()
316                Watcher.stop()
317                sys.exit(0)
318
319if __name__ == '__main__':
320
321        signal.signal(signal.SIGINT, signal_handler)
322
323        try:
324
325                Watcher.start()
326
327                Watcher.main()
328
329                Watcher.stop()
330
331                sys.exit(0)
332
333        except Exception, e:
334
335                traceback.print_exc()
336
337                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.