Changeset 854 for trunk/Monitoring
- Timestamp:
- 04/26/13 16:57:16 (11 years ago)
- Location:
- trunk/Monitoring
- Files:
-
- 10 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Monitoring/Broker/README
r852 r854 42 42 database dir: /var/lib/rabbitmq/mnesia/rabbit 43 43 - run 44 - to start the daemon, use command below 44 - to start the daemon, use command below as root 45 - cd /opt/rabbitmq-server-3.0.2/sbin 45 46 - ./rabbitmq-server -detached 46 47 - to stop the daemon, use this -
trunk/Monitoring/CNClient/README
r852 r854 1 1 - This program uses rabbitmq-c library (v0.3.0) 2 2 - https://github.com/alanxz/rabbitmq-c 3 - https://github.com/alanxz/rabbitmq-c/archive/rabbitmq-c-v0.3.0.zip4 3 - Compilation (static) 5 4 - Library installation … … 11 10 so better use installation from source 12 11 - from source 13 tar xzvf rabbitmq-c-v0.3.0.zip 14 cd rabbitmq-c-v0.3.0 15 autoreconf -i 16 ./configure --enable-static 17 make 18 make install 12 - retrieve source 13 - wget https://github.com/alanxz/rabbitmq-c/archive/rabbitmq-c-v0.3.0.zip -O rabbitmq-c-v0.3.0.zip 14 - unzip rabbitmq-c-v0.3.0.zip 15 - cd rabbitmq-c-v0.3.0 16 - compilation using autoconf (automake v1.9+, and libtool v2.2+) 17 - autoreconf -i 18 - if error occurs at this step, check requirement below 19 - we require autotools v2.63 or better to build. I think RHEL5 ships with v2.59 which lacks the AC_PROC_CC_C99 macro. 20 - You can do one of two things; 21 - Install a newer version of autotools 22 - Build using cmake (v2.6 or better). 23 - ./configure --enable-static 24 - make 25 - make install 26 - compilation using cmake (CMake v2.6+) 27 - mkdir build && cd build 28 - cmake -DBUILD_STATIC_LIBS=True -DBUILD_SHARED_LIBS=True .. 29 - cmake --build . 30 - make 31 - you got error AAA below, it's normal 32 make[2]: *** Pas de rÚgle pour fabriquer la cible « librabbitmq/librabbitmq.so.1.0.1 », nécessaire pour « examples/amqp_bind ». Arrêt. 33 make[1]: *** [examples/CMakeFiles/amqp_bind.dir/all] Erreur 2 34 make: *** [all] Erreur 2 35 - make 36 (because of the above error (AAA), you need to run make twice) 37 - make install 19 38 - gcc -static -I/usr/local/include -L/usr/local/lib -Wall -o sendAMQPMsg send_AMQP_msg.c -lrabbitmq 20 39 - we get warning below during compilation … … 27 46 - it means that you may need to be sure all computing node have the same glibc version !!!! 28 47 - also means that a different binary must be use in each computing center 29 - Run 30 - ./sendAMQPMsg localhost 5672 1 10 48 - usage 49 - to send a message in the queue, do 50 - ./sendAMQPMsg localhost 5672 test 51 - note 52 - to show how many messages are in the queue, do as root 53 - ./rabbitmqctl list_queues -
trunk/Monitoring/CNClient/send_AMQP_msg.c
r852 r854 16 16 17 17 #define SUMMARY_EVERY_US 1000000 18 19 18 20 19 … … 117 116 } 118 117 119 // note that "amq.direct" is a special exchange120 // (The empty exchange name is an alias for amq.direct)118 // note that "amq.direct" is a special exchange 119 // (The empty exchange name is an alias for amq.direct) 121 120 //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 122 121 … … 158 157 amqp_connection_state_t conn; 159 158 160 amqp_bytes_t reply_to_queue; 161 162 if (argc < 5) { 163 fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 164 return 1; 165 } 159 //amqp_bytes_t reply_to_queue; 160 166 161 167 162 hostname = argv[1]; … … 204 199 } 205 200 206 207 //int main_complex(int argc, char const * const *argv) { 208 int main(int argc, char const * const *argv) { 201 int read_file(char *f,char **source) { 202 FILE *fp = fopen(f, "r"); 203 if (fp != NULL) { 204 /* Go to the end of the file. */ 205 if (fseek(fp, 0L, SEEK_END) == 0) { 206 /* Get the size of the file. */ 207 long bufsize = ftell(fp); 208 if (bufsize == -1) { /* Error */ } 209 210 /* Allocate our buffer to that size. */ 211 *source = malloc(sizeof(char) * (bufsize + 1)); 212 213 /* Go back to the start of the file. */ 214 if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ } 215 216 /* Read the entire file into memory. */ 217 size_t newLen = fread(*source, sizeof(char), bufsize, fp); 218 if (newLen == 0) { 219 fputs("Error reading file\n", stderr); 220 return 1; 221 } else { 222 //source[++newLen] = '\0'; /* Just to be safe. */ 223 } 224 } 225 fclose(fp); 226 } else { 227 fputs("File not found\n", stderr); 228 return 1; 229 } 230 231 return 0; 232 } 233 234 int main(int argc, char * const *argv) { 209 235 char const *hostname; 210 236 int port; 211 char const *exchange; 212 char const *routingkey; 213 char const *messagebody; 237 //char const *exchange; 238 //char const *routingkey; 239 char *body_tmp; 240 char const *body; 241 char *body_final = NULL; 242 char *buf = NULL; 243 int c; 244 char *filepath = NULL; 245 int file_flag=0; 214 246 215 247 … … 218 250 amqp_connection_state_t conn; 219 251 220 if (argc < 4) { 221 fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n"); 222 return 1; 223 } 224 225 hostname = argv[1]; 226 port = atoi(argv[2]); 227 //exchange = argv[3]; 228 //routingkey = argv[4]; 229 messagebody = argv[5]; 252 //parsing args 253 opterr = 0; 254 while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1) 255 switch (c) 256 { 257 case 'b': 258 body_tmp = optarg; 259 break; 260 case 'e': 261 fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 262 fprintf(stderr, "Example 1: ./sendAMQPMsg -h localhost -p 5672 -b '{\"jobid\":\"toto\"}'\n"); 263 fprintf(stderr, "Example 2: ./sendAMQPMsg -h localhost -p 5672 -f /home/foobar/config.card -b '{\"jobid\":\"toto\"}'\n"); 264 exit(0); 265 case 'f': 266 filepath = optarg; 267 file_flag = 1; 268 break; 269 case 'h': 270 hostname = optarg; 271 break; 272 case 'p': 273 port = atoi(optarg); 274 break; 275 case '?': 276 fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt); 277 exit(EXIT_FAILURE); 278 default: 279 fprintf (stderr, "ERR002: incorrect argument\n"); 280 exit(EXIT_FAILURE); 281 } 282 283 //retrieve non-option argument 284 /* 285 int index; 286 for (index = optind; index < argc; index++) 287 printf ("Non-option argument %s\n", argv[index]); 288 */ 289 290 // add checks here 291 // (for example, body_tmp is mandatory) 292 293 if(file_flag==1) { 294 // retrieve file contents 295 296 if( access( filepath, F_OK ) != -1 ) { 297 // file exists 298 299 ; 300 } else { 301 // file doesn't exist 302 303 fprintf(stderr, "File not found (%s)\n",filepath); 304 305 exit(EXIT_FAILURE); 306 } 307 308 int res = read_file(filepath,&buf); 309 if (res != 0) { 310 exit(EXIT_FAILURE); 311 } 312 313 body_final=malloc(strlen(body_tmp) + 6 + strlen(buf) + 1); 314 //strcpy(body_final,"\0"); 315 strcat(body_final,body_tmp); 316 strcat(body_final,",file="); 317 strcat(body_final,buf); 318 319 body=body_final; 320 321 //debug 322 //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath); 323 324 } else { 325 // retrieve msg body from argument 326 327 body=body_tmp; 328 329 } 330 331 //exit(0); 230 332 231 333 conn = amqp_new_connection(); … … 237 339 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 238 340 239 240 241 242 //JRA243 341 amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); 244 342 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); 245 343 246 //JRA 247 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 344 amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 248 345 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 249 346 250 347 amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type 251 348 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); 252 253 254 349 255 350 { … … 258 353 props.content_type = amqp_cstring_bytes("text/plain"); 259 354 props.delivery_mode = 2; /* persistent delivery mode */ 260 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes( messagebody)), "Publishing");355 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing"); 261 356 } 262 357 … … 264 359 die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 265 360 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 361 362 if(file_flag==1) 363 free(buf); 364 266 365 return 0; 267 366 } … … 347 446 } 348 447 448 /* vi: set et ts=2 sw=2: */ -
trunk/Monitoring/Watch/watch
r841 r854 1 #!/usr/bin/ python -u1 #!/usr/bin/env python 2 2 # -*- coding: ISO-8859-1 -*- 3 3 … … 13 13 ################################## 14 14 15 from smon import dao 15 #from smon import dao 16 import pika 17 import base64 18 import json 19 import sys 20 import traceback 21 import smtplib 22 from email.mime.text import MIMEText 16 23 17 24 class Watcher(): 25 message_code_action_mapping = {"0000":["log","mail"],"1000":["log"],"2000":["log"],"3000":["log"],"9000":["log"],"9999":["log","mail"]} 18 26 19 27 @classmethod … … 23 31 @classmethod 24 32 def start(cls): 25 dao.insert_progress_messages(cls.get_fake_progress_messages()) 33 #dao.insert_progress_messages(cls.get_fake_progress_messages()) 34 pass 26 35 27 36 @classmethod 28 37 def stop(cls): 29 38 pass 30 39 40 @classmethod 41 def add(cls,message): 42 pass 43 44 @classmethod 45 def mail_example(cls): 46 me="jripsl@ipsl.jussieu.fr" 47 you="jripsl@ipsl.jussieu.fr" 48 body="Alarm" 49 object="Supervisor" 50 51 cls.mail(me,you,object,body) 52 53 @classmethod 54 def mail(cls): 55 cls.mail_example() 56 57 @classmethod 58 def send_mail(cls,me,you,object,body): 59 msg = MIMEText(body) 60 msg['Subject'] = object 61 msg['From'] = me 62 msg['To'] = you 63 64 # Send the message via our own SMTP server, but don't include the # envelope header. 65 s = smtplib.SMTP('localhost') 66 s.sendmail(me,[you], msg.as_string()) 67 s.quit() 68 69 @classmethod 70 def log(cls,message): 71 72 with open("/home/jripsl/supervisor/log/supervisor.log", "a") as log_file: 73 log_file.write("%s %s\n"%(message["code"],message["jobid"])) 74 75 @classmethod 76 def execActions(cls,message): 77 78 message_code=message["code"] 79 80 for action in cls.message_code_action_mapping[message_code]: 81 proc_name=action 82 83 try: 84 getattr(cls, proc_name)(message) 85 except Exception,e: 86 traceback.print_exc() 87 88 raise Exception("WATCH-ERR002","procedure error (%s,%s)"%(proc_name,str(e))) 89 31 90 def main(): 32 91 92 """ 33 93 # parse args 34 94 parser = argparse.ArgumentParser(prog='watcher') … … 41 101 42 102 SMON.init_singleton() 103 """ 104 105 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) 106 channel = connection.channel() 107 108 #channel.queue_declare(queue='myqueue') 109 110 print ' [*] Waiting for messages. To exit press CTRL+C' 111 112 def callback(ch, method, properties, raw_msg): 113 # 114 #return 115 116 base64_decoded_msg=base64.b64decode(raw_msg) 117 118 # debug 119 #print " [x] Received %s" % raw_msg 120 #print " [x] Received %s (uudecoded)" % base64_decoded_msg 121 122 try: 123 message=json.loads(base64_decoded_msg) 124 except Exception,e: 125 print base64_decoded_msg 126 127 # message code based action 128 Watcher.execActions(message) 129 130 channel.basic_consume(callback, queue='myqueue', no_ack=True) 131 132 channel.start_consuming() 43 133 44 134 45 135 """ 46 136 SMON.free_singleton() 137 """ 47 138 48 139 if __name__ == '__main__': … … 54 145 except Exception, e: 55 146 56 #traceback.print_exc()147 traceback.print_exc() 57 148 58 149 sys.exit(1) -
trunk/Monitoring/script/libIGCM_mock.sh
r842 r854 12 12 ################################## 13 13 14 stack_file="../sample/stack_light" 14 stack_file=$1 15 send_msg="/home/jripsl/snapshot/Monitoring/CNClient/sendAMQPMsg" 16 17 if [ $# -lt 1 ]; then 18 echo "Usage $0 ../sample/stack_light" 19 exit 1 20 fi 15 21 16 22 IFS=$'\n' 17 23 for line in $(cat $stack_file); do 18 echo $line | awk -F" " '{print $4}' 24 #echo $line | awk -F" " '{print $4}' 25 callname=$(echo $line | awk -F" " '{print $4}' ) 26 $send_msg localhost 5672 string "$callname" 19 27 done -
trunk/Monitoring/test/README
r852 r854 1 test_twisted.py consumer/producer test showing how to use RabbitMQ, Twisted and txamqp together 2 recv_AMQP_msg.c basic consumer test 1 test_twisted.py consumer/producer test showing how to use RabbitMQ, Twisted and txamqp together 2 recv_AMQP_msg.c basic consumer test 3 librabbitmq-python consumer/producer using librabbitmq-Python library
Note: See TracChangeset
for help on using the changeset viewer.