source: trunk/Monitoring/Watch/watch @ 859

Last change on this file since 859 was 859, checked in by jripsl, 11 years ago
  • add killed/segfault detection.
  • database communication implementation (suite).
  • trap BREAK signal to stop consumer before exiting.
  • Property svn:executable set to *
File size: 4.3 KB
RevLine 
[854]1#!/usr/bin/env python
[840]2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
[854]15import pika
16import base64
17import json
18import sys
[857]19import signal
[854]20import traceback
21import smtplib
22from email.mime.text import MIMEText
[857]23from datetime import datetime
[840]24
[857]25
[859]26from smon import repo_io
[857]27
28
29"""
[859]30Code list reminder
[857]31
[859]320000 (la simulation démarre)
331000 (le job d'une simulation démarre)
342000 (PushStack)
353000 (PopStack OK)
369000 (PopStack NOK)
379999 (FATAL)
[857]38"""
39
[859]40class Mail():
[857]41
[840]42        @classmethod
[854]43        def mail_example(cls):
44                me="jripsl@ipsl.jussieu.fr"
45                you="jripsl@ipsl.jussieu.fr"
46                body="Alarm"
47                object="Supervisor"
48
49                cls.mail(me,you,object,body)
50
51        @classmethod
52        def send_mail(cls,me,you,object,body):
53                msg = MIMEText(body)
54                msg['Subject'] = object
55                msg['From'] = me
56                msg['To'] = you
57
58                # Send the message via our own SMTP server, but don't include the # envelope header.
59                s = smtplib.SMTP('localhost')
60                s.sendmail(me,[you], msg.as_string())
61                s.quit()
62
[859]63class Actions():
64
[854]65        @classmethod
[859]66        def store_msg(cls):
67                repo_io.store_messages(name)
68
69        @classmethod
70        def set_sim_status_to_error(cls):
71                repo_io.update_simulation_status()
72
73        @classmethod
74        def crea_sim(cls):
75                #repo_io.retrieve_simulation(name)
76                #repo_io.delete_simulation(name)
77                repo_io.create_simulation()
78
79        @classmethod
80        def mail(cls):
81                cls.mail_example()
82
83        @classmethod
[857]84        def print_stdout(cls,message):
85                # used for debug
86
87                if "file" in message:
88                        print "%s %s %s\n"%(message["code"],message["jobid"],message["file"])
89                else:
90                        print "%s %s\n"%(message["code"],message["jobid"])
91
92        @classmethod
[854]93        def log(cls,message):
94                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
[857]95                        log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message["code"],message["jobid"]))
[854]96
97        @classmethod
98        def execActions(cls,message):
99
100                message_code=message["code"]
101
[859]102                for action in MessageActionsMapping.mapping[message_code]:
[854]103                        proc_name=action
104
105                        try:
[859]106                                getattr(Actions, proc_name)(message)
[854]107                        except Exception,e:
108                                traceback.print_exc()
109
110                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
111
[859]112class MessageActionsMapping():
113
114        mapping = { "0000":["log", "store_msg", "crea_sim"],
115                                "1000":["log", "store_msg"],
116                                "2000":["log", "store_msg"],
117                                "3000":["log", "store_msg"],
118                                "9000":["log", "store_msg", "mail"],
119                                "9999":["log", "store_msg", "set_sim_status_to_error", "mail"] }
120
121class Watcher():
122
[857]123        @classmethod
[859]124        def start(cls):
125                pass
126
127        @classmethod
128        def stop(cls):
129                pass
130
131        @classmethod
[857]132        def main(self):
[840]133
[857]134                """
135                # parse args
136                parser = argparse.ArgumentParser(prog='watcher')
137                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
138                args = parser.parse_args()
[840]139
[857]140                # check
141                if not os.path.exists(SMON.smon_home):
142                        sys.exit(1)
[840]143
[857]144                SMON.init_singleton()
145                """
[840]146
[857]147                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
148                self.channel = connection.channel()
[840]149
[857]150                #self.channel.queue_declare(queue='myqueue')
[854]151
[857]152                print ' [*] Waiting for messages. To exit press CTRL+C'
[854]153
[857]154                def callback(ch, method, properties, raw_msg):
155                        #
156                        #return
[854]157
[857]158                        base64_decoded_msg=base64.b64decode(raw_msg)
[854]159
[857]160                        # debug
161                        #print " [x] Received %s" % raw_msg
162                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
[854]163
[857]164                        message=None
165                        try:
166                                message=json.loads(base64_decoded_msg)
[854]167
[857]168                                # message code based action
169                                Watcher.execActions(message)
170                        except Exception,e:
171                                print "Exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
[854]172
[857]173                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
[854]174
[857]175                self.channel.start_consuming()
[840]176
[857]177
178                """
179                SMON.free_singleton()
180                """
181
182def signal_handler(signal, frame):
183                print 'You pressed Ctrl+C!'
184                Watcher.channel.stop_consuming()
185                sys.exit(0)
186
[840]187if __name__ == '__main__':
[857]188
189        signal.signal(signal.SIGINT, signal_handler)
190
[840]191        try:
[857]192                Watcher.main()
[840]193
194                sys.exit(0)
195
196        except Exception, e:
197
[854]198                traceback.print_exc()
[840]199
200                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.