source: trunk/Monitoring/Watch/watch @ 857

Last change on this file since 857 was 857, checked in by jripsl, 11 years ago
  • start database communication implementation.
  • add usleep( 200 000 ) in CNClient to prevent overflow on the server side.
  • Property svn:executable set to *
File size: 5.3 KB
Line 
1#!/usr/bin/env python
2# -*- coding: ISO-8859-1 -*-
3
4##################################
5#  @program        smon
6#  @description    simulation monitor
7#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
8#                             All Rights Reserved”
9#  @svn_file       $Id: watcher 2545 2013-02-01 09:58:10Z jripsl $
10#  @version        $Rev: 2545 $
11#  @lastrevision   $Date: 2013-02-01 10:58:10 +0100 (Fri, 01 Feb 2013) $
12#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
13##################################
14
15#from smon import dao
16import pika
17import base64
18import json
19import sys
20import signal
21import traceback
22import smtplib
23from email.mime.text import MIMEText
24from datetime import datetime
25
26
27
28
29sys.path.append("/home/jripsl/snapshot/src") 
30import elixir
31import prodiguer_repository
32
33#import prodiguer_repository.operations.mq_hooks_stub as mq_hooks
34import prodiguer_repository.operations.mq_hooks as mq_hooks
35
36
37
38"""
39mq_hooks.retrieve_simulation(name)
40mq_hooks.delete_simulation(name)
41mq_hooks.create_simulation(activity,
42mq_hooks.update_simulation_status(name,
43mq_hooks.retrieve_messages(name)
44mq_hooks.delete_messages(name)
45"""
46
47"""
48from prodiguer_repository.models import (
49        ExecutionState,
50        EXECUTION_STATE_RUNNING,
51        EXECUTION_STATE_SET,
52        Message,
53        Simulation,
54        SIMULATION_SPACE_TEST
55)
56"""
57
58
59
60"""
61scenarios from ticket 136
62
63libIGCM simulent des erreurs (killed, segfault, 1% d'erreur)
64le watcher temps réel change le status des erreurs détéctés
65le watcher temps différé détécte et change le statut des killed
66"""
67
68class Watcher():
69        # prod
70        #message_code_action_mapping = {"0000":["log","mail"],"1000":["log"],"2000":["log"],"3000":["log"],"9000":["log"],"9999":["log","mail"]}
71
72        # dev
73        message_code_action_mapping = {"0000":["log","print_stdout"],"1000":[],"2000":[],"3000":[],"9000":["log"],"9999":["log"]}
74        #message_code_action_mapping = {"0000":[],"1000":[],"2000":[],"3000":[],"9000":["log"],"9999":["log"]}
75
76        # debug
77        #message_code_action_mapping = {"0000":["print_stdout"],"1000":["log"],"2000":[],"3000":[],"9000":["log"],"9999":["log"]}
78
79        @classmethod
80        def get_fake_progress_messages(cls):
81                pass
82
83        @classmethod
84        def start(cls):
85                #dao.insert_progress_messages(cls.get_fake_progress_messages())
86                pass
87
88        @classmethod
89        def stop(cls):
90                pass
91
92        @classmethod
93        def add(cls,message):
94                pass
95
96        @classmethod
97        def mail_example(cls):
98                me="jripsl@ipsl.jussieu.fr"
99                you="jripsl@ipsl.jussieu.fr"
100                body="Alarm"
101                object="Supervisor"
102
103                cls.mail(me,you,object,body)
104
105        @classmethod
106        def mail(cls):
107                cls.mail_example()
108
109        @classmethod
110        def send_mail(cls,me,you,object,body):
111                msg = MIMEText(body)
112                msg['Subject'] = object
113                msg['From'] = me
114                msg['To'] = you
115
116                # Send the message via our own SMTP server, but don't include the # envelope header.
117                s = smtplib.SMTP('localhost')
118                s.sendmail(me,[you], msg.as_string())
119                s.quit()
120
121        @classmethod
122        def print_stdout(cls,message):
123                # used for debug
124
125                if "file" in message:
126                        print "%s %s %s\n"%(message["code"],message["jobid"],message["file"])
127                else:
128                        print "%s %s\n"%(message["code"],message["jobid"])
129
130        @classmethod
131        def log(cls,message):
132
133                with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file:
134                        log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message["code"],message["jobid"]))
135
136        @classmethod
137        def execActions(cls,message):
138
139                message_code=message["code"]
140
141                for action in cls.message_code_action_mapping[message_code]:
142                        proc_name=action
143
144                        try:
145                                getattr(cls, proc_name)(message)
146                        except Exception,e:
147                                traceback.print_exc()
148
149                                raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e)))
150
151        @classmethod
152        def main(self):
153
154                """
155                _CONNECTION = "postgresql://postgres:Silence107!@localhost:5432/prodiguer" # Repo connection string.
156                prodiguer_repository.connect(_CONNECTION) # Connect to repo.
157                mq_hooks.create_message("test", 2, "bla")
158                elixir.session.commit()
159                #elixir.session.rollback()
160                """
161                mq_hooks.create_message()
162
163                """
164                # parse args
165                parser = argparse.ArgumentParser(prog='watcher')
166                parser.add_argument('-v', dest='verbose',required=False,action='store_true')
167                args = parser.parse_args()
168
169                # check
170                if not os.path.exists(SMON.smon_home):
171                        sys.exit(1)
172
173                SMON.init_singleton()
174                """
175
176                connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
177                self.channel = connection.channel()
178
179                #self.channel.queue_declare(queue='myqueue')
180
181                print ' [*] Waiting for messages. To exit press CTRL+C'
182
183                def callback(ch, method, properties, raw_msg):
184                        #
185                        #return
186
187                        base64_decoded_msg=base64.b64decode(raw_msg)
188
189                        # debug
190                        #print " [x] Received %s" % raw_msg
191                        #print " [x] Received %s (uudecoded)" % base64_decoded_msg
192
193                        message=None
194                        try:
195                                message=json.loads(base64_decoded_msg)
196
197                                # message code based action
198                                Watcher.execActions(message)
199                        except Exception,e:
200                                print "Exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg)
201
202                self.channel.basic_consume(callback, queue='myqueue', no_ack=True)
203
204                self.channel.start_consuming()
205
206
207                """
208                SMON.free_singleton()
209                """
210
211def signal_handler(signal, frame):
212                print 'You pressed Ctrl+C!'
213                Watcher.channel.stop_consuming()
214                sys.exit(0)
215
216if __name__ == '__main__':
217
218        signal.signal(signal.SIGINT, signal_handler)
219        #print 'Press Ctrl+C'
220
221        try:
222                Watcher.main()
223
224                sys.exit(0)
225
226        except Exception, e:
227
228                traceback.print_exc()
229
230                sys.exit(1)
Note: See TracBrowser for help on using the repository browser.