Changeset 854 for trunk/Monitoring/CNClient/send_AMQP_msg.c
- Timestamp:
- 04/26/13 16:57:16 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Monitoring/CNClient/send_AMQP_msg.c
r852 r854 16 16 17 17 #define SUMMARY_EVERY_US 1000000 18 19 18 20 19 … … 117 116 } 118 117 119 // note that "amq.direct" is a special exchange120 // (The empty exchange name is an alias for amq.direct)118 // note that "amq.direct" is a special exchange 119 // (The empty exchange name is an alias for amq.direct) 121 120 //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 122 121 … … 158 157 amqp_connection_state_t conn; 159 158 160 amqp_bytes_t reply_to_queue; 161 162 if (argc < 5) { 163 fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 164 return 1; 165 } 159 //amqp_bytes_t reply_to_queue; 160 166 161 167 162 hostname = argv[1]; … … 204 199 } 205 200 206 207 //int main_complex(int argc, char const * const *argv) { 208 int main(int argc, char const * const *argv) { 201 int read_file(char *f,char **source) { 202 FILE *fp = fopen(f, "r"); 203 if (fp != NULL) { 204 /* Go to the end of the file. */ 205 if (fseek(fp, 0L, SEEK_END) == 0) { 206 /* Get the size of the file. */ 207 long bufsize = ftell(fp); 208 if (bufsize == -1) { /* Error */ } 209 210 /* Allocate our buffer to that size. */ 211 *source = malloc(sizeof(char) * (bufsize + 1)); 212 213 /* Go back to the start of the file. */ 214 if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ } 215 216 /* Read the entire file into memory. */ 217 size_t newLen = fread(*source, sizeof(char), bufsize, fp); 218 if (newLen == 0) { 219 fputs("Error reading file\n", stderr); 220 return 1; 221 } else { 222 //source[++newLen] = '\0'; /* Just to be safe. */ 223 } 224 } 225 fclose(fp); 226 } else { 227 fputs("File not found\n", stderr); 228 return 1; 229 } 230 231 return 0; 232 } 233 234 int main(int argc, char * const *argv) { 209 235 char const *hostname; 210 236 int port; 211 char const *exchange; 212 char const *routingkey; 213 char const *messagebody; 237 //char const *exchange; 238 //char const *routingkey; 239 char *body_tmp; 240 char const *body; 241 char *body_final = NULL; 242 char *buf = NULL; 243 int c; 244 char *filepath = NULL; 245 int file_flag=0; 214 246 215 247 … … 218 250 amqp_connection_state_t conn; 219 251 220 if (argc < 4) { 221 fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n"); 222 return 1; 223 } 224 225 hostname = argv[1]; 226 port = atoi(argv[2]); 227 //exchange = argv[3]; 228 //routingkey = argv[4]; 229 messagebody = argv[5]; 252 //parsing args 253 opterr = 0; 254 while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1) 255 switch (c) 256 { 257 case 'b': 258 body_tmp = optarg; 259 break; 260 case 'e': 261 fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 262 fprintf(stderr, "Example 1: ./sendAMQPMsg -h localhost -p 5672 -b '{\"jobid\":\"toto\"}'\n"); 263 fprintf(stderr, "Example 2: ./sendAMQPMsg -h localhost -p 5672 -f /home/foobar/config.card -b '{\"jobid\":\"toto\"}'\n"); 264 exit(0); 265 case 'f': 266 filepath = optarg; 267 file_flag = 1; 268 break; 269 case 'h': 270 hostname = optarg; 271 break; 272 case 'p': 273 port = atoi(optarg); 274 break; 275 case '?': 276 fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt); 277 exit(EXIT_FAILURE); 278 default: 279 fprintf (stderr, "ERR002: incorrect argument\n"); 280 exit(EXIT_FAILURE); 281 } 282 283 //retrieve non-option argument 284 /* 285 int index; 286 for (index = optind; index < argc; index++) 287 printf ("Non-option argument %s\n", argv[index]); 288 */ 289 290 // add checks here 291 // (for example, body_tmp is mandatory) 292 293 if(file_flag==1) { 294 // retrieve file contents 295 296 if( access( filepath, F_OK ) != -1 ) { 297 // file exists 298 299 ; 300 } else { 301 // file doesn't exist 302 303 fprintf(stderr, "File not found (%s)\n",filepath); 304 305 exit(EXIT_FAILURE); 306 } 307 308 int res = read_file(filepath,&buf); 309 if (res != 0) { 310 exit(EXIT_FAILURE); 311 } 312 313 body_final=malloc(strlen(body_tmp) + 6 + strlen(buf) + 1); 314 //strcpy(body_final,"\0"); 315 strcat(body_final,body_tmp); 316 strcat(body_final,",file="); 317 strcat(body_final,buf); 318 319 body=body_final; 320 321 //debug 322 //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath); 323 324 } else { 325 // retrieve msg body from argument 326 327 body=body_tmp; 328 329 } 330 331 //exit(0); 230 332 231 333 conn = amqp_new_connection(); … … 237 339 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 238 340 239 240 241 242 //JRA243 341 amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); 244 342 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); 245 343 246 //JRA 247 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 344 amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 248 345 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 249 346 250 347 amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type 251 348 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); 252 253 254 349 255 350 { … … 258 353 props.content_type = amqp_cstring_bytes("text/plain"); 259 354 props.delivery_mode = 2; /* persistent delivery mode */ 260 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes( messagebody)), "Publishing");355 die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing"); 261 356 } 262 357 … … 264 359 die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 265 360 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 361 362 if(file_flag==1) 363 free(buf); 364 266 365 return 0; 267 366 } … … 347 446 } 348 447 448 /* vi: set et ts=2 sw=2: */
Note: See TracChangeset
for help on using the changeset viewer.