Ignore:
Timestamp:
04/26/13 16:57:16 (11 years ago)
Author:
jripsl
Message:

add JSON deserialization, mail notification, logging.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Monitoring/CNClient/send_AMQP_msg.c

    r852 r854  
    1616 
    1717#define SUMMARY_EVERY_US 1000000 
    18  
    1918 
    2019 
     
    117116    } 
    118117 
    119         // note that "amq.direct" is a special exchange 
    120         // (The empty exchange name is an alias for amq.direct) 
     118          // note that "amq.direct" is a special exchange 
     119          // (The empty exchange name is an alias for amq.direct) 
    121120    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing"); 
    122121 
     
    158157  amqp_connection_state_t conn; 
    159158 
    160   amqp_bytes_t reply_to_queue; 
    161  
    162   if (argc < 5) { 
    163     fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 
    164     return 1; 
    165   } 
     159  //amqp_bytes_t reply_to_queue; 
     160 
    166161 
    167162  hostname = argv[1]; 
     
    204199} 
    205200 
    206  
    207 //int main_complex(int argc, char const * const *argv) { 
    208 int main(int argc, char const * const *argv) { 
     201int read_file(char *f,char **source) { 
     202        FILE *fp = fopen(f, "r"); 
     203        if (fp != NULL) { 
     204                /* Go to the end of the file. */ 
     205                if (fseek(fp, 0L, SEEK_END) == 0) { 
     206                        /* Get the size of the file. */ 
     207                        long bufsize = ftell(fp); 
     208                        if (bufsize == -1) { /* Error */ } 
     209 
     210                        /* Allocate our buffer to that size. */ 
     211                        *source = malloc(sizeof(char) * (bufsize + 1)); 
     212 
     213                        /* Go back to the start of the file. */ 
     214                        if (fseek(fp, 0L, SEEK_SET) == 0) { /* Error */ } 
     215 
     216                        /* Read the entire file into memory. */ 
     217                        size_t newLen = fread(*source, sizeof(char), bufsize, fp); 
     218                        if (newLen == 0) { 
     219                                fputs("Error reading file\n", stderr); 
     220        return 1; 
     221                        } else { 
     222                                //source[++newLen] = '\0'; /* Just to be safe. */ 
     223                        } 
     224                } 
     225                fclose(fp); 
     226        } else { 
     227                fputs("File not found\n", stderr); 
     228                return 1; 
     229        } 
     230 
     231  return 0; 
     232} 
     233 
     234int main(int argc, char * const *argv) { 
    209235  char const *hostname; 
    210236  int port; 
    211   char const *exchange; 
    212   char const *routingkey; 
    213   char const *messagebody; 
     237  //char const *exchange; 
     238  //char const *routingkey; 
     239  char *body_tmp; 
     240  char const *body; 
     241  char *body_final = NULL; 
     242  char *buf = NULL; 
     243  int c; 
     244  char *filepath = NULL; 
     245  int file_flag=0; 
    214246 
    215247 
     
    218250  amqp_connection_state_t conn; 
    219251 
    220   if (argc < 4) { 
    221     fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n"); 
    222     return 1; 
    223   } 
    224  
    225   hostname = argv[1]; 
    226   port = atoi(argv[2]); 
    227   //exchange = argv[3]; 
    228   //routingkey = argv[4]; 
    229   messagebody = argv[5]; 
     252  //parsing args 
     253  opterr = 0; 
     254  while ((c = getopt (argc, argv, "b:ef:h:p:")) != -1) 
     255    switch (c) 
     256      { 
     257      case 'b': 
     258        body_tmp = optarg; 
     259        break; 
     260      case 'e': 
     261        fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n"); 
     262        fprintf(stderr, "Example 1: ./sendAMQPMsg -h localhost -p 5672 -b '{\"jobid\":\"toto\"}'\n"); 
     263        fprintf(stderr, "Example 2: ./sendAMQPMsg -h localhost -p 5672 -f /home/foobar/config.card -b '{\"jobid\":\"toto\"}'\n"); 
     264        exit(0); 
     265      case 'f': 
     266        filepath = optarg; 
     267        file_flag = 1; 
     268        break; 
     269      case 'h': 
     270        hostname = optarg; 
     271        break; 
     272      case 'p': 
     273        port = atoi(optarg); 
     274        break; 
     275      case '?': 
     276        fprintf (stderr, "ERR001: incorrect argument '-%c'.\n", optopt); 
     277        exit(EXIT_FAILURE); 
     278      default: 
     279        fprintf (stderr, "ERR002: incorrect argument\n"); 
     280        exit(EXIT_FAILURE); 
     281      } 
     282 
     283  //retrieve non-option argument 
     284  /* 
     285  int index; 
     286  for (index = optind; index < argc; index++) 
     287    printf ("Non-option argument %s\n", argv[index]); 
     288  */ 
     289 
     290  // add checks here 
     291  // (for example, body_tmp is mandatory) 
     292 
     293  if(file_flag==1) { 
     294    // retrieve file contents 
     295 
     296    if( access( filepath, F_OK ) != -1 ) { 
     297        // file exists 
     298         
     299        ; 
     300    } else { 
     301      // file doesn't exist 
     302 
     303      fprintf(stderr, "File not found (%s)\n",filepath); 
     304 
     305      exit(EXIT_FAILURE); 
     306    } 
     307 
     308    int res = read_file(filepath,&buf); 
     309    if (res != 0) { 
     310      exit(EXIT_FAILURE); 
     311    } 
     312 
     313    body_final=malloc(strlen(body_tmp) + 6 + strlen(buf) + 1); 
     314    //strcpy(body_final,"\0"); 
     315    strcat(body_final,body_tmp); 
     316    strcat(body_final,",file="); 
     317    strcat(body_final,buf); 
     318 
     319    body=body_final; 
     320 
     321    //debug 
     322    //fprintf(stderr, "hostname=%s, port=%d, body=%s, filepath=%s\n", hostname, port, body, filepath); 
     323 
     324  } else { 
     325    // retrieve msg body from argument 
     326 
     327    body=body_tmp; 
     328 
     329  } 
     330 
     331  //exit(0); 
    230332 
    231333  conn = amqp_new_connection(); 
     
    237339  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 
    238340 
    239  
    240  
    241  
    242   //JRA 
    243341  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table); 
    244342  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); 
    245343 
    246   //JRA 
    247   amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 
     344  amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete 
    248345  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 
    249346 
    250347  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type 
    251348  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding"); 
    252  
    253  
    254349 
    255350  {  
     
    258353    props.content_type = amqp_cstring_bytes("text/plain"); 
    259354    props.delivery_mode = 2; /* persistent delivery mode */ 
    260     die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing"); 
     355    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(body)), "Publishing"); 
    261356  } 
    262357 
     
    264359  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 
    265360  die_on_error(amqp_destroy_connection(conn), "Ending connection"); 
     361 
     362  if(file_flag==1) 
     363    free(buf); 
     364 
    266365  return 0; 
    267366} 
     
    347446} 
    348447 
     448/* vi: set et ts=2 sw=2: */ 
Note: See TracChangeset for help on using the changeset viewer.