source: trunk/Monitoring/Watch/watch @ 876

Last change on this file since 876 was 876, checked in by jripsl, 11 years ago

Timeout test impl.
Add timestamp.

  • Property svn:executable set to *
File size: 6.8 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23from datetime import datetime
24
25# line below is to include "smon" package in the search path
26sys.path.append("/home/jripsl/snapshot/Monitoring")
27
28import smon.repo_io as repo_io
29import smon.types
30
31
32class Mail():
33
34        @classmethod
35        def mail_example(cls):
36                me="jripsl@ipsl.jussieu.fr"
37                you="jripsl@ipsl.jussieu.fr"
38                body="Alarm"
39                object="Supervisor"
40
41                cls.mail(me,you,object,body)
42
43        @classmethod
44        def send_mail(cls,me,you,object,body):
45                msg = MIMEText(body)
46                msg['Subject'] = object
47                msg['From'] = me
48                msg['To'] = you
49
50                # Send the message via our own SMTP server, but don't include the # envelope header.
51                s = smtplib.SMTP('localhost')
52                s.sendmail(me,[you], msg.as_string())
53                s.quit()
54
55class Actions():
56
57        @classmethod
58        def store_msg(cls,message):
59                repo_io.create_message(message)
60
61        @classmethod
62        def cleanup(cls,message):
63                repo_io.cleanup() # truncate/delete everything
64
65        @classmethod
66        def set_sim_status_to_error(cls,message):
67
68                s=repo_io.retrieve_simulation(message.simuid)
69
70                s.status="error"
71
72                repo_io.update_simulation_status(s)
73
74        @classmethod
75        def set_sim_status_to_complete(cls,message):
76
77                s=repo_io.retrieve_simulation(message.simuid)
78
79                s.status="complete"
80
81                repo_io.update_simulation_status(s)
82
83        @classmethod
84        def crea_sim(cls,message):
85
86                #repo_io.retrieve_simulation(name)
87                #repo_io.delete_simulation(name)
88
89                simulation=smon.types.Simulation(name=message.simuid,status="running")
90
91                repo_io.create_simulation(simulation)
92
93        @classmethod
94        def mail(cls):
95                cls.mail_example()
96
97        @classmethod
98        def print_stdout(cls,message):
99                # used for debug
100
101                if message.file is not None:
102                        print "%s %s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp,message.file)
103                else:
104                        print "%s %s %s %s\n"%(message.code,message.jobid,message.command,message.timestamp)
105
106        @classmethod
107        def log(cls,message):
108                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
109                        log_file.write("%s %s %s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid,message.timestamp,message.command))
110
111        @classmethod
112        def execActions(cls,message):
113
114                message_code=message.code
115
116                for action in MessageActionsMapping.mapping[message_code]:
117                        proc_name=action
118
119                        try:
120                                getattr(Actions, proc_name)(message)
121                        except Exception,e:
122                                traceback.print_exc()
123
124                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
125
126class MessageActionsMapping():
127
128        # debug
129        mapping = { "0000":["log", "store_msg", "print_stdout", "crea_sim"],
130                                "0100":["log", "store_msg", "print_stdout", "set_sim_status_to_complete"],
131                                "1000":["log", "store_msg", "print_stdout"],
132                                "1100":["log", "store_msg", "print_stdout"],
133                                "2000":["log", "store_msg", "print_stdout"],
134                                "3000":["log", "store_msg", "print_stdout"],
135                                "8888":["cleanup"],
136                                "9000":["log", "store_msg", "print_stdout"],
137                                "9999":["log", "store_msg", "print_stdout", "set_sim_status_to_error"] }
138
139        # prod
140        """
141        mapping = { "0000":["log", "store_msg", "crea_sim"],
142                                "0100":["log", "store_msg", "set_sim_status_to_complete"],
143                                "1000":["log", "store_msg"],
144                                "1100":["log", "store_msg"],
145                                "2000":["log", "store_msg"],
146                                "3000":["log", "store_msg"],
147                                "8888":["cleanup"],
148                                "9000":["log", "store_msg", "mail"],
149                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
150        """
151
152class Watcher():
153
154        @classmethod
155        def start(cls):
156                repo_io.init() # open DB connection
157
158        @classmethod
159        def stop(cls):
160                repo_io.free() # close DB connection
161
162        @classmethod
163        def main(self):
164
165                """
166                # parse args
167                parser = argparse.ArgumentParser(prog='watcher')
168                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
169                args = parser.parse_args()
170
171                # check
172                if not os.path.exists(SMON.smon_home):
173                        sys.exit(1)
174
175                SMON.init_singleton()
176                """
177
178                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
179                self.channel = connection.channel()
180
181                #self.channel.queue_declare(queue='myqueue')
182
183                print ' [*] Waiting for messages. To exit press CTRL+C'
184
185                def callback(ch, method, properties, raw_msg):
186
187                        # msg fmt: body:base64,file:base64 (no JSON here !!!)
188
189
190
191
192                        # first deserialization (no JSON here !!!)
193
194                        fields=raw_msg.split(",")
195
196                        l__tmp_dic={}
197
198                        for field in fields:
199
200                                # debug
201                                #print " [x] Received %s" % field
202
203                                splitted_field=field.split(":")
204
205                                key=splitted_field[0]
206                                val=splitted_field[1]
207
208                                l__tmp_dic[key]=val
209
210
211                        # debug
212                        #print " [x] Received %s (encoded)" % l__tmp_dic["body"]
213
214                       
215                        # base64 decode body
216                        base64_decoded_msg=base64.b64decode(l__tmp_dic["body"])
217
218
219                        # debug
220                        #print " [x] Received %s" % raw_msg
221                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
222                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
223
224
225                        # message deserialization
226                        message=None
227                        try:
228                                JSON_msg=json.loads(base64_decoded_msg)
229                                message=smon.types.Message(JSON_msg)      # all JSON object members will be available in smon.types.Message object
230
231
232
233                        except Exception,e:
234                                print "ERR009 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
235
236                                traceback.print_exc()
237                                raise
238
239
240
241                        # manage config-card file which is attached to the "0000" type message (this file is base64 encoded and need to be unencoded)
242                        #
243                        if "file" in l__tmp_dic:
244
245                                # base64 decode file
246                                base64_decoded_file=base64.b64decode(l__tmp_dic["file"])
247
248                                # add as msg attribute
249                                message.file=base64_decoded_file
250
251
252
253                        # execute actions
254                        try:
255                                # message code based action
256                                Actions.execActions(message)
257
258                        except Exception,e:
259                                print "ERR019 - exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
260
261                                traceback.print_exc()
262
263                                raise
264
265
266                        # slow down consumer
267                        #time.sleep(0.5)
268
269
270                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
271
272                self.channel.start_consuming()
273
274
275                """
276                SMON.free_singleton()
277                """
278
279def signal_handler(signal, frame):
280                print 'You pressed Ctrl+C!'
281                Watcher.channel.stop_consuming()
282                Watcher.stop()
283                sys.exit(0)
284
285if __name__ == '__main__':
286
287        signal.signal(signal.SIGINT, signal_handler)
288
289        try:
290
291                Watcher.start()
292
293                Watcher.main()
294
295                Watcher.stop()
296
297                sys.exit(0)
298
299        except Exception, e:
300
301                traceback.print_exc()
302
303                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.