Changeset 866 for trunk/Monitoring/Watch/watch
- Timestamp:
- 06/09/13 23:52:36 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Monitoring/Watch/watch
r865 r866 26 26 sys.path.append("/home/jripsl/snapshot/Monitoring") 27 27 28 import smon.repo_io 29 28 import smon.repo_io as repo_io 29 import smon.types 30 30 31 31 """ … … 66 66 67 67 @classmethod 68 def store_msg(cls ):69 repo_io. store_messages(name)70 71 @classmethod 72 def set_sim_status_to_error(cls ):68 def store_msg(cls,message): 69 repo_io.create_message(message) 70 71 @classmethod 72 def set_sim_status_to_error(cls,message): 73 73 repo_io.update_simulation_status() 74 74 75 75 @classmethod 76 def crea_sim(cls): 76 def crea_sim(cls,message): 77 77 78 #repo_io.retrieve_simulation(name) 78 79 #repo_io.delete_simulation(name) 79 repo_io.create_simulation() 80 81 simulation=smon.types.Simulation(name=message.simuid) 82 83 repo_io.create_simulation(simulation) 80 84 81 85 @classmethod … … 88 92 89 93 if "file" in message: 90 print "%s %s %s\n"%(message ["code"],message["jobid"],message["file"])94 print "%s %s %s\n"%(message.code,message.jobid,message.file) 91 95 else: 92 print "%s %s\n"%(message ["code"],message["jobid"])96 print "%s %s\n"%(message.code,message.jobid) 93 97 94 98 @classmethod 95 99 def log(cls,message): 96 100 with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file: 97 log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message ["code"],message["jobid"]))101 log_file.write("%s %s %s\n"%(datetime.now().strftime('%Y%m%d_%H%M%S'), message.code,message.jobid)) 98 102 99 103 @classmethod 100 104 def execActions(cls,message): 101 105 102 message_code=message ["code"]106 message_code=message.code 103 107 104 108 for action in MessageActionsMapping.mapping[message_code]: … … 155 159 156 160 def callback(ch, method, properties, raw_msg): 157 # 158 #return 159 160 base64_decoded_msg=base64.b64decode(raw_msg) 161 162 # msg fmt: body:base64,file:base64 (no JSON here !!!) 163 164 165 166 167 # first deserialization (no JSON here !!!) 168 169 fields=raw_msg.split(",") 170 171 l__tmp_dic={} 172 173 for field in fields: 174 175 # debug 176 print " [x] Received %s" % field 177 178 splitted_field=field.split(":") 179 180 key=splitted_field[0] 181 val=splitted_field[1] 182 183 l__tmp_dic[key]=val 184 185 186 # debug 187 print " [x] Received %s (encoded)" % l__tmp_dic["body"] 188 189 190 # base64 decode body 191 base64_decoded_msg=base64.b64decode(l__tmp_dic["body"]) 192 161 193 162 194 # debug 163 195 #print " [x] Received %s" % raw_msg 196 print " [x] Received %s (uudecoded)" % base64_decoded_msg 164 197 #print " [x] Received %s (uudecoded)" % base64_decoded_msg 198 165 199 166 200 message=None 167 201 try: 168 message=json.loads(base64_decoded_msg) 202 # body deserialization 203 JSON_msg=json.loads(base64_decoded_msg) 204 message=smon.types.Message(JSON_msg) # all JSON object members will be available in smon.types.Message object 205 206 if "file" in l__tmp_dic: 207 208 # base64 decode file 209 base64_decoded_file=base64.b64decode(l__tmp_dic["file"]) 210 211 # add into msg 212 message.file=base64_decoded_file 169 213 170 214 # message code based action 171 Watcher.execActions(message) 215 Actions.execActions(message) 216 172 217 except Exception,e: 173 218 print "Exception occurs (exception=%s,msg=%s)"%(str(e),base64_decoded_msg) 219 220 #traceback.print_exc() 221 222 raise 174 223 175 224 self.channel.basic_consume(callback, queue='myqueue', no_ack=True) … … 198 247 except Exception, e: 199 248 200 traceback.print_exc()249 #traceback.print_exc() 201 250 202 251 sys.exit(1)
Note: See TracChangeset
for help on using the changeset viewer.