source: trunk/Monitoring/Watch/watch @ 859

Last change on this file since 859 was 859, checked in by jripsl, 11 years ago
  • add killed/segfault detection.
  • database communication implementation (suite).
  • trap BREAK signal to stop consumer before exiting.
  • Property svn:executable set to *
File size: 4.3 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15import pika
16import base64
17import json
18import sys
19import signal
20import traceback
21import smtplib
22from email.mime.text import MIMEText
23from datetime import datetime
24
25
26from smon import repo_io
27
28
29"""
30Code list reminder
31
320000 (la simulation démarre)
331000 (le job d'une simulation démarre)
342000 (PushStack)
353000 (PopStack OK)
369000 (PopStack NOK)
379999 (FATAL)
38"""
39
40class Mail():
41
42        @classmethod
43        def mail_example(cls):
44                me="jripsl@ipsl.jussieu.fr"
45                you="jripsl@ipsl.jussieu.fr"
46                body="Alarm"
47                object="Supervisor"
48
49                cls.mail(me,you,object,body)
50
51        @classmethod
52        def send_mail(cls,me,you,object,body):
53                msg = MIMEText(body)
54                msg['Subject'] = object
55                msg['From'] = me
56                msg['To'] = you
57
58                # Send the message via our own SMTP server, but don't include the # envelope header.
59                s = smtplib.SMTP('localhost')
60                s.sendmail(me,[you], msg.as_string())
61                s.quit()
62
63class Actions():
64
65        @classmethod
66        def store_msg(cls):
67                repo_io.store_messages(name)
68
69        @classmethod
70        def set_sim_status_to_error(cls):
71                repo_io.update_simulation_status()
72
73        @classmethod
74        def crea_sim(cls):
75                #repo_io.retrieve_simulation(name)
76                #repo_io.delete_simulation(name)
77                repo_io.create_simulation()
78
79        @classmethod
80        def mail(cls):
81                cls.mail_example()
82
83        @classmethod
84        def print_stdout(cls,message):
85                # used for debug
86
87                if "file" in message:
88                        print "%s %s %s\n"%(message["code"],message["jobid"],message["file"])
89                else:
90                        print "%s %s\n"%(message["code"],message["jobid"])
91
92        @classmethod
93        def log(cls,message):
94                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
95                        log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message["code"],message["jobid"]))
96
97        @classmethod
98        def execActions(cls,message):
99
100                message_code=message["code"]
101
102                for action in MessageActionsMapping.mapping[message_code]:
103                        proc_name=action
104
105                        try:
106                                getattr(Actions, proc_name)(message)
107                        except Exception,e:
108                                traceback.print_exc()
109
110                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
111
112class MessageActionsMapping():
113
114        mapping = { "0000":["log", "store_msg", "crea_sim"],
115                                "1000":["log", "store_msg"],
116                                "2000":["log", "store_msg"],
117                                "3000":["log", "store_msg"],
118                                "9000":["log", "store_msg", "mail"],
119                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
120
121class Watcher():
122
123        @classmethod
124        def start(cls):
125                pass
126
127        @classmethod
128        def stop(cls):
129                pass
130
131        @classmethod
132        def main(self):
133
134                """
135                # parse args
136                parser = argparse.ArgumentParser(prog='watcher')
137                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
138                args = parser.parse_args()
139
140                # check
141                if not os.path.exists(SMON.smon_home):
142                        sys.exit(1)
143
144                SMON.init_singleton()
145                """
146
147                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
148                self.channel = connection.channel()
149
150                #self.channel.queue_declare(queue='myqueue')
151
152                print ' [*] Waiting for messages. To exit press CTRL+C'
153
154                def callback(ch, method, properties, raw_msg):
155                        #
156                        #return
157
158                        base64_decoded_msg=base64.b64decode(raw_msg)
159
160                        # debug
161                        #print " [x] Received %s" % raw_msg
162                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
163
164                        message=None
165                        try:
166                                message=json.loads(base64_decoded_msg)
167
168                                # message code based action
169                                Watcher.execActions(message)
170                        except Exception,e:
171                                print "Exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
172
173                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
174
175                self.channel.start_consuming()
176
177
178                """
179                SMON.free_singleton()
180                """
181
182def signal_handler(signal, frame):
183                print 'You pressed Ctrl+C!'
184                Watcher.channel.stop_consuming()
185                sys.exit(0)
186
187if __name__ == '__main__':
188
189        signal.signal(signal.SIGINT, signal_handler)
190
191        try:
192                Watcher.main()
193
194                sys.exit(0)
195
196        except Exception, e:
197
198                traceback.print_exc()
199
200                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.