source: trunk/Monitoring/smon/repo_io.py @ 879

Last change on this file since 879 was 879, checked in by jripsl, 11 years ago

Fix heartbeat test.

File size: 9.1 KB
Line 
1# -*- coding: ISO-8859-1 -*-
2
3##################################
4#  @program        smon
5#  @description    simulation monitor
6#  @copyright      Copyright “(c)2009 Centre National de la Recherche Scientifique CNRS.
7#                             All Rights Reserved”
8#  @svn_file       $Id: repo_io.py 2599 2013-03-24 19:01:23Z jripsl $
9#  @version        $Rev: 2599 $
10#  @lastrevision   $Date: 2013-03-24 20:01:23 +0100 (Sun, 24 Mar 2013) $
11#  @license        CeCILL (http://dods.ipsl.jussieu.fr/jripsl/smon/LICENSE)
12##################################
13
14"""
15This module contains repository I/O code
16"""
17
18import sys
19import datetime
20
21# line below is to include Prodiguer database I/O library in the search path
22sys.path.append("/home/jripsl/snapshot/src/prodiguer_shared/src")
23
24
25import types
26
27
28# --- module static initialization --- #
29
30CSTE_MODE_LOCAL_REPO="local_repo"
31CSTE_MODE_REMOTE_REPO="remote_repo"
32CSTE_MODE_REMOTE_REPO_STUB="remote_repo_stub"
33
34
35# HACK
36import prodiguer_shared.repo.session as repo_session
37import prodiguer_shared.models as models
38
39
40# Test constants.
41_SIM_ACTIVITY = 'IPSL'
42_SIM_COMPUTE_NODE = 'TGCC'
43_SIM_COMPUTE_NODE_LOGIN = 'p86denv'
44_SIM_COMPUTE_NODE_MACHINE = 'TGCC - Curie'
45_SIM_EXECUTION_START_DATE = datetime.datetime.now()
46_SIM_EXECUTION_STATE = models.EXECUTION_STATE_RUNNING
47_SIM_EXPERIMENT = '1pctCO2'
48_SIM_MODEL_ENGINE = 'IPSL-CM5A-LR'
49_SIM_SPACE = models.SIMULATION_SPACE_TEST
50_MSG_TYPE = "0000"
51_MSG_CONTENT1 = "12345690"
52_MSG_CONTENT2 = "12345690"
53
54
55
56
57
58
59
60# set mode
61mode=CSTE_MODE_REMOTE_REPO
62#mode=CSTE_MODE_LOCAL_REPO
63
64# set repository driver
65if mode==CSTE_MODE_REMOTE_REPO_STUB:
66        raise Exception()
67
68elif mode==CSTE_MODE_REMOTE_REPO:
69
70        # import Prodiguer database I/O library
71        import prodiguer_shared.mq.hooks as repo
72
73
74elif mode==CSTE_MODE_LOCAL_REPO:
75        import local_repo as repo
76else:
77        raise Exception("ERR001 - incorrect mode")
78
79
80
81
82
83
84# -- methods -- #
85
86def init():
87        if mode==CSTE_MODE_LOCAL_REPO:
88                repo.connect()
89        elif mode==CSTE_MODE_REMOTE_REPO:
90                _CONNECTION = "postgresql://postgres:Silence107!@ib-pp-db-dev.ipslnet:5432/pdgr_1_0b5"
91                repo_session.start(_CONNECTION)
92
93        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
94                pass
95        else:
96                raise Exception("ERR004 - incorrect mode")
97
98def free():
99        if mode==CSTE_MODE_LOCAL_REPO:
100                repo.free()
101        elif mode==CSTE_MODE_REMOTE_REPO:
102                repo_session.end()
103        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
104                pass
105        else:
106                raise Exception("ERR009 - incorrect mode")
107
108def populate_tables_with_sample():
109        """
110        used only by "repo-state" pgm
111        """
112       
113        repo.populate_tables_with_sample()
114
115def retrieve_simulations():
116        """
117        used by get_running_simulations
118        """
119        simulations=None
120
121        # debug
122        #print "bla"
123
124
125        if mode==CSTE_MODE_LOCAL_REPO:
126                simulations=repo.retrieve_simulations()
127        elif mode==CSTE_MODE_REMOTE_REPO:
128
129                # prepare
130                simulations=[]
131
132                # execute
133                sims=repo.retrieve_simulations()
134
135                # process return values
136
137                for s in sims:
138
139                        status=get_repo_execution_state(s.ExecutionState_ID)
140
141                        # HACK
142                        if status=="queued":
143                                status="waiting"
144
145                       
146
147                        simulations.append(types.Simulation(id=s.ID,name=s.Name,status=status.lower()))
148
149
150        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
151                pass
152        else:
153                raise Exception("ERR115 - incorrect mode")
154
155
156        # debug
157        """
158        if len(simulations)<1:
159                raise Exception("ERR915 - debug")
160        else:
161                print "%d"%len(simulations)
162        """
163
164
165        return simulations
166
167def test():
168        """
169        not used
170        """
171
172        repo.create_message("test2", 2, "bla2")
173        commit()
174
175        repo.update_simulation_status('1pctCO22', 'ERROR')
176        commit()
177
178        repo.create_message("test3", 3, "bla3")
179        rollback()
180
181def cleanup():
182        if mode==CSTE_MODE_LOCAL_REPO:
183                repo.cleanup()
184        elif mode==CSTE_MODE_REMOTE_REPO:
185
186
187                simulations_to_delete=["BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE",
188                                                                "BIGBRO.clim.TEST.LMDZOR.p86denv.TGCC.CURIE",
189                                                                "v5cf.amipMR1.amip.PROD.LMDZOR.p86denv.TGCC.CURIE",
190                                                                "v5cf.amipMR2.amip.PROD.LMDZOR.p86denv.TGCC.CURIE",
191                                                                "v5cf.amipMR3.amip.PROD.LMDZOR.p86denv.TGCC.CURIE"]
192
193
194                for s in simulations_to_delete:
195                        repo.delete_messages(s)
196                        repo.delete_simulation(s)
197
198                        commit()
199
200
201
202        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
203                pass
204        else:
205                raise Exception("ERR007 - incorrect mode")
206
207def commit():
208        if mode==CSTE_MODE_LOCAL_REPO:
209                repo.commit()
210        elif mode==CSTE_MODE_REMOTE_REPO:
211                repo_session.commit()
212        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
213                pass
214        else:
215                raise Exception("ERR002 - incorrect mode")
216
217def rollback():
218        if mode==CSTE_MODE_LOCAL_REPO:
219                repo.rollback()
220        elif mode==CSTE_MODE_REMOTE_REPO:
221                repo_session.commit()
222        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
223                pass
224        else:
225                raise Exception("ERR003 - incorrect mode")
226
227def get_repo_execution_state(state_id):
228    for key, value in models.EXECUTION_STATE_ID_SET.items():
229        if value == state_id:
230            return key
231    return None
232
233def retrieve_simulation(name):
234        simulation=None
235
236        if mode==CSTE_MODE_LOCAL_REPO:
237
238                try:
239                        simulation=repo.retrieve_simulation(name)
240                except:
241                        traceback.print_exc()
242
243        elif mode==CSTE_MODE_REMOTE_REPO:
244
245                # prepare args
246                # ..
247
248                # execute
249                s=repo.retrieve_simulation(name)
250
251                if s is None:
252                        raise Exception("RG543534")
253
254
255
256                # process return values
257
258                status=get_repo_execution_state(s.ExecutionState_ID)
259
260                # HACK
261                if status=="queued":
262                        status="waiting"
263
264                simulation=types.Simulation(id=s.ID,name=s.Name,status=status) 
265
266
267
268        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
269                pass
270        else:
271                raise Exception("ERR014 - incorrect mode")
272
273        return simulation
274
275def delete_simulation(name):
276        if mode==CSTE_MODE_LOCAL_REPO:
277                repo.delete_simulation(name)
278        elif mode==CSTE_MODE_REMOTE_REPO:
279
280                # prepare args
281                # ..
282
283                # execute
284                repo.delete_simulation(name)
285
286                # process return values
287                # ..
288
289        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
290                pass
291        else:
292                raise Exception("ERR015 - incorrect mode")
293
294def create_simulation(simulation):
295        if mode==CSTE_MODE_LOCAL_REPO:
296                repo.create_simulation(simulation)
297        elif mode==CSTE_MODE_REMOTE_REPO:
298
299                # prepare args
300                model_engine=None
301                space=None
302                exp=None
303
304                if "BIGBRO" in simulation.name:
305
306                        model_engine="IPSL-CM5A-LR"
307                        space="TEST"
308                        exp="sstClim"
309
310                elif "v5cf.amipMR" in simulation.name:
311
312                        model_engine="IPSL-CM5A-MR"
313                        space="PROD"
314                        exp="amip"
315
316                else:
317
318                        model_engine=_SIM_MODEL_ENGINE
319                        space=_SIM_SPACE
320                        exp=_SIM_EXPERIMENT
321
322
323                # execute
324                repo.create_simulation(_SIM_ACTIVITY,
325                                                                _SIM_COMPUTE_NODE,
326                                                                _SIM_COMPUTE_NODE_LOGIN,
327                                                                _SIM_COMPUTE_NODE_MACHINE,
328                                                                _SIM_EXECUTION_START_DATE,
329                                                                _SIM_EXECUTION_STATE,
330                                                                exp,
331                                                                model_engine,
332                                                                simulation.name,
333                                                                space,
334                                                                parent_simulation_name=None)
335
336
337                # process return values
338                # ..
339
340        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
341                pass
342        else:
343                raise Exception("ERR016 - incorrect mode")
344
345def update_simulation_status(simulation):
346        if mode==CSTE_MODE_LOCAL_REPO:
347
348                try:
349                        repo.update_simulation_status(simulation)
350                except:
351                        traceback.print_exc()
352
353        elif mode==CSTE_MODE_REMOTE_REPO:
354
355                # prepare args
356                # ..
357
358
359                # HACK
360                prodiguer_status=simulation.status
361                if simulation.status == "waiting":
362                        prodiguer_status="queued"
363
364
365                # execute
366                repo.update_simulation_status(simulation.name, prodiguer_status.upper())
367
368                # process return values
369                # ..
370
371                commit()
372
373        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
374                pass
375        else:
376                raise Exception("ERR017 - incorrect mode")
377
378def retrieve_messages(simulation):
379        message=None
380
381        if mode==CSTE_MODE_LOCAL_REPO:
382                message=repo.retrieve_messages(simulation)
383        elif mode==CSTE_MODE_REMOTE_REPO:
384
385                # prepare args
386                # ..
387
388                # execute
389                repo.retrieve_messages(name)
390
391                # process return values
392                # ..
393
394        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
395                pass
396        else:
397                raise Exception("ERR018 - incorrect mode")
398
399        return message
400
401def delete_messages(simulation):
402        if mode==CSTE_MODE_LOCAL_REPO:
403                repo.delete_messages(name)
404        elif mode==CSTE_MODE_REMOTE_REPO:
405
406                # prepare args
407                # ..
408
409                # execute
410                repo.delete_messages(name)
411
412                # process return values
413                # ..
414
415        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
416                pass
417        else:
418                raise Exception("ERR019 - incorrect mode")
419
420def create_message(message,simulation):
421        if mode==CSTE_MODE_LOCAL_REPO:
422                repo.create_message(message,simulation)
423        elif mode==CSTE_MODE_REMOTE_REPO:
424
425                # prepare args
426                # ..
427
428                print "%s %s %s"%(simulation.name,message.code,"message.body")
429
430                # execute
431                try:
432                        repo.create_message(simulation.name,message.code,"message.body")
433                except ValueError as i:
434                        print "bla (%s)"%str(i)
435
436                commit()
437
438
439                # process return values
440                # ..
441
442        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
443                pass
444        else:
445                raise Exception("ERR020 - incorrect mode")
446
447        commit()
448
449
450def retrieve_last_message(simulation):
451        message=None
452
453        if mode==CSTE_MODE_LOCAL_REPO:
454                message=repo.retrieve_last_message(simulation)
455        elif mode==CSTE_MODE_REMOTE_REPO:
456
457                # execute
458                message=repo.retrieve_last_message(simulation.name)
459
460                if message is None:
461                        raise Exception("ERR221 - null value")
462
463                # process return values
464                di={}
465                di["crea_date"]=message.CreateDate
466                message=types.Message(di)
467
468        elif mode==CSTE_MODE_REMOTE_REPO_STUB:
469                pass
470        else:
471                raise Exception("ERR021 - incorrect mode")
472
473        return message
474
475
476# --- higher level methods --- #
477
478def get_running_simulations():
479        running_simulation=[]
480
481        for s in retrieve_simulations():
482
483                if s.status=="running":
484                        running_simulation.append(s)
485                       
486        return running_simulation
487
488
Note: See TracBrowser for help on using the repository browser.