source: trunk/Monitoring/smon/repo_io.py @ 937

Last change on this file since 937 was 937, checked in by jripsl, 11 years ago

Create AMQP queue on supervisor side if not exists already.

File size: 9.2 KB
Line 
1# -*- coding: ISO-8859-1 -*-
2
3##################################
4#  @program        smon
5#  @description    simulation monitor
6#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
7#                             All Rights Reserved”
8#  @svn_file       $Id: repo_io.py 2599 2013-03-24 19:01:23Z jripsl $
9#  @version        $Rev: 2599 $
10#  @lastrevision   $Date: 2013-03-24 20:01:23 +0100 (Sun, 24 Mar 2013) $
11#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
12##################################
13
14"""
15This module contains repository I/O code
16"""
17
18import sys
19import datetime
20
21# line below is to include Prodiguer database I/O library in the search path
22sys.path.append("/opt/supervisor/prodiguer_lib/src")
23
24
25import types
26
27
28# --- module static initialization --- #
29
30CSTE_MODE_LOCAL_REPO="local_repo"
31CSTE_MODE_REMOTE_REPO="remote_repo"
32CSTE_MODE_REMOTE_REPO_STUB="remote_repo_stub"
33
34
35
36
37
38
39
40
41
42# set mode
43#mode=CSTE_MODE_REMOTE_REPO
44mode=CSTE_MODE_LOCAL_REPO
45
46# mode specific init. (e.g. repository driver)
47if mode==CSTE_MODE_REMOTE_REPO_STUB:
48        raise Exception()
49
50elif mode==CSTE_MODE_REMOTE_REPO:
51
52        # import Prodiguer database I/O library
53        import prodiguer_shared.mq.hooks as repo
54
55
56        # HACK
57        # (uncomment those two lines if use prodiguer repo)
58        import prodiguer_shared.repo.session as repo_session
59        import prodiguer_shared.models as models
60
61
62        # Test constants.
63        _SIM_ACTIVITY = 'IPSL'
64        _SIM_COMPUTE_NODE = 'TGCC'
65        _SIM_COMPUTE_NODE_LOGIN = 'p86denv'
66        _SIM_COMPUTE_NODE_MACHINE = 'TGCC - Curie'
67        _SIM_EXECUTION_START_DATE = datetime.datetime.now()
68        _SIM_EXECUTION_STATE = models.EXECUTION_STATE_RUNNING
69        _SIM_EXPERIMENT = '1pctCO2'
70        _SIM_MODEL_ENGINE = 'IPSL-CM5A-LR'
71        _SIM_SPACE = models.SIMULATION_SPACE_TEST
72        _MSG_TYPE = "0000"
73        _MSG_CONTENT1 = "12345690"
74        _MSG_CONTENT2 = "12345690"
75
76elif mode==CSTE_MODE_LOCAL_REPO:
77
78        import local_repo as repo
79
80
81else:
82        raise Exception("ERR001 - incorrect mode")
83
84
85
86
87
88
89
90# -- methods -- #
91
92def init():
93        if mode==CSTE_MODE_LOCAL_REPO:
94                repo.connect()
95        elif mode==CSTE_MODE_REMOTE_REPO:
96                _CONNECTION = "postgresql://postgres:Silence107!@ib-pp-db-dev.ipslnet:5432/pdgr_1_0b5"
97                repo_session.start(_CONNECTION)
98
99        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
100                pass
101        else:
102                raise Exception("ERR004 - incorrect mode")
103
104def free():
105        if mode==CSTE_MODE_LOCAL_REPO:
106                repo.free()
107        elif mode==CSTE_MODE_REMOTE_REPO:
108                repo_session.end()
109        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
110                pass
111        else:
112                raise Exception("ERR009 - incorrect mode")
113
114def populate_tables_with_sample():
115        """
116        used only by "repo-state" pgm
117        """
118       
119        repo.populate_tables_with_sample()
120
121def retrieve_simulations():
122        """
123        used by get_running_simulations
124        """
125        simulations=None
126
127        # debug
128        #print "bla"
129
130
131        if mode==CSTE_MODE_LOCAL_REPO:
132                simulations=repo.retrieve_simulations()
133        elif mode==CSTE_MODE_REMOTE_REPO:
134
135                # prepare
136                simulations=[]
137
138                # execute
139                sims=repo.retrieve_simulations()
140
141                # process return values
142
143                for s in sims:
144
145                        status=get_repo_execution_state(s.ExecutionState_ID)
146
147                        # HACK
148                        if status=="queued":
149                                status="waiting"
150
151                       
152
153                        simulations.append(types.Simulation(id=s.ID,name=s.Name,status=status.lower()))
154
155
156        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
157                pass
158        else:
159                raise Exception("ERR115 - incorrect mode")
160
161
162        # debug
163        """
164        if len(simulations)<1:
165                raise Exception("ERR915 - debug")
166        else:
167                print "%d"%len(simulations)
168        """
169
170
171        return simulations
172
173def test():
174        """
175        not used
176        """
177
178        repo.create_message("test2", 2, "bla2")
179        commit()
180
181        repo.update_simulation_status('1pctCO22', 'ERROR')
182        commit()
183
184        repo.create_message("test3", 3, "bla3")
185        rollback()
186
187def cleanup():
188        if mode==CSTE_MODE_LOCAL_REPO:
189                repo.cleanup()
190        elif mode==CSTE_MODE_REMOTE_REPO:
191
192
193                simulations_to_delete=["BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE",
194                                                                "BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE",
195                                                                "v5cf.amipMR1.amip.PROD.LMDZOR.p86denv.TGCC.CURIE",
196                                                                "v5cf.amipMR2.amip.PROD.LMDZOR.p86denv.TGCC.CURIE",
197                                                                "v5cf.amipMR3.amip.PROD.LMDZOR.p86denv.TGCC.CURIE"]
198
199
200                for s in simulations_to_delete:
201                        repo.delete_messages(s)
202                        repo.delete_simulation(s)
203
204                        commit()
205
206
207
208        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
209                pass
210        else:
211                raise Exception("ERR007 - incorrect mode")
212
213def commit():
214        if mode==CSTE_MODE_LOCAL_REPO:
215                repo.commit()
216        elif mode==CSTE_MODE_REMOTE_REPO:
217                repo_session.commit()
218        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
219                pass
220        else:
221                raise Exception("ERR002 - incorrect mode")
222
223def rollback():
224        if mode==CSTE_MODE_LOCAL_REPO:
225                repo.rollback()
226        elif mode==CSTE_MODE_REMOTE_REPO:
227                repo_session.commit()
228        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
229                pass
230        else:
231                raise Exception("ERR003 - incorrect mode")
232
233def get_repo_execution_state(state_id):
234    for key, value in models.EXECUTION_STATE_ID_SET.items():
235        if value == state_id:
236            return key
237    return None
238
239def retrieve_simulation(name):
240        simulation=None
241
242        if mode==CSTE_MODE_LOCAL_REPO:
243
244                try:
245                        simulation=repo.retrieve_simulation(name)
246                except:
247                        traceback.print_exc()
248
249        elif mode==CSTE_MODE_REMOTE_REPO:
250
251                # prepare args
252                # ..
253
254                # execute
255                s=repo.retrieve_simulation(name)
256
257                if s is None:
258                        #raise Exception("RG543534")
259                        return None
260
261
262
263                # process return values
264
265                status=get_repo_execution_state(s.ExecutionState_ID)
266
267                # HACK
268                if status=="queued":
269                        status="waiting"
270
271                simulation=types.Simulation(id=s.ID,name=s.Name,status=status) 
272
273
274
275        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
276                pass
277        else:
278                raise Exception("ERR014 - incorrect mode")
279
280        return simulation
281
282def delete_simulation(name):
283        if mode==CSTE_MODE_LOCAL_REPO:
284                repo.delete_simulation(name)
285        elif mode==CSTE_MODE_REMOTE_REPO:
286
287                # prepare args
288                # ..
289
290                # execute
291                repo.delete_simulation(name)
292
293                # process return values
294                # ..
295
296        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
297                pass
298        else:
299                raise Exception("ERR015 - incorrect mode")
300
301def create_simulation(simulation):
302        if mode==CSTE_MODE_LOCAL_REPO:
303                repo.create_simulation(simulation)
304        elif mode==CSTE_MODE_REMOTE_REPO:
305
306                # prepare args
307                model_engine=None
308                space=None
309                exp=None
310
311                if "BIGBRO" in simulation.name:
312
313                        model_engine="IPSL-CM5A-LR"
314                        space="TEST"
315                        exp="sstClim"
316
317                elif "v5cf.amipMR" in simulation.name:
318
319                        model_engine="IPSL-CM5A-MR"
320                        space="PROD"
321                        exp="amip"
322
323                else:
324
325                        model_engine=_SIM_MODEL_ENGINE
326                        space=_SIM_SPACE
327                        exp=_SIM_EXPERIMENT
328
329
330                # execute
331                repo.create_simulation(_SIM_ACTIVITY,
332                                                                _SIM_COMPUTE_NODE,
333                                                                _SIM_COMPUTE_NODE_LOGIN,
334                                                                _SIM_COMPUTE_NODE_MACHINE,
335                                                                _SIM_EXECUTION_START_DATE,
336                                                                _SIM_EXECUTION_STATE,
337                                                                exp,
338                                                                model_engine,
339                                                                simulation.name,
340                                                                space,
341                                                                parent_simulation_name=None)
342
343
344                # process return values
345                # ..
346
347        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
348                pass
349        else:
350                raise Exception("ERR016 - incorrect mode")
351
352def update_simulation_status(simulation):
353        if mode==CSTE_MODE_LOCAL_REPO:
354
355                try:
356                        repo.update_simulation_status(simulation)
357                except:
358                        traceback.print_exc()
359
360        elif mode==CSTE_MODE_REMOTE_REPO:
361
362                # prepare args
363                # ..
364
365
366                # HACK
367                prodiguer_status=simulation.status
368                if simulation.status == "waiting":
369                        prodiguer_status="queued"
370
371
372                # execute
373                repo.update_simulation_status(simulation.name, prodiguer_status.upper())
374
375                # process return values
376                # ..
377
378                commit()
379
380        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
381                pass
382        else:
383                raise Exception("ERR017 - incorrect mode")
384
385def retrieve_messages(simulation):
386        message=None
387
388        if mode==CSTE_MODE_LOCAL_REPO:
389                message=repo.retrieve_messages(simulation)
390        elif mode==CSTE_MODE_REMOTE_REPO:
391
392                # prepare args
393                # ..
394
395                # execute
396                repo.retrieve_messages(name)
397
398                # process return values
399                # ..
400
401        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
402                pass
403        else:
404                raise Exception("ERR018 - incorrect mode")
405
406        return message
407
408def delete_messages(simulation):
409        if mode==CSTE_MODE_LOCAL_REPO:
410                repo.delete_messages(name)
411        elif mode==CSTE_MODE_REMOTE_REPO:
412
413                # prepare args
414                # ..
415
416                # execute
417                repo.delete_messages(name)
418
419                # process return values
420                # ..
421
422        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
423                pass
424        else:
425                raise Exception("ERR019 - incorrect mode")
426
427def create_message(message,simulation):
428        if mode==CSTE_MODE_LOCAL_REPO:
429                repo.create_message(message,simulation)
430        elif mode==CSTE_MODE_REMOTE_REPO:
431
432                # prepare args
433                # ..
434
435                # debug
436                #print "%s %s %s"%(simulation.name,message.code,"message.body")
437
438                # execute
439                try:
440                        repo.create_message(simulation.name,message.code,"message.body")
441                except ValueError as i:
442                        print "bla (%s)"%str(i)
443
444                commit()
445
446
447                # process return values
448                # ..
449
450        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
451                pass
452        else:
453                raise Exception("ERR020 - incorrect mode")
454
455        commit()
456
457
458def retrieve_last_message(simulation):
459        message=None
460
461        if mode==CSTE_MODE_LOCAL_REPO:
462                message=repo.retrieve_last_message(simulation)
463        elif mode==CSTE_MODE_REMOTE_REPO:
464
465                # execute
466                message=repo.retrieve_last_message(simulation.name)
467
468                if message is None:
469                        raise Exception("ERR221 - null value")
470
471                # process return values
472                di={}
473                di["crea_date"]=message.CreateDate
474                message=types.Message(di)
475
476        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
477                pass
478        else:
479                raise Exception("ERR021 - incorrect mode")
480
481        return message
482
483
484# --- higher level methods --- #
485
486def get_running_simulations():
487        running_simulation=[]
488
489        for s in retrieve_simulations():
490
491                if s.status=="running":
492                        running_simulation.append(s)
493                       
494        return running_simulation
495
496
Note: See TracBrowser for help on using the repository browser.