source: trunk/Monitoring/CNClient/send_AMQP_msg.c @ 851

Last change on this file since 851 was 851, checked in by jripsl, 11 years ago
  • add skeleton and documentation.
File size: 4.4 KB
Line 
1#include <stdlib.h>
2#include <stdio.h>
3#include <string.h>
4#include <stdint.h>
5#include <sys/time.h>
6#include <unistd.h>
7
8
9#include <ctype.h>
10
11#include <amqp.h>
12#include <amqp_framing.h>
13
14#include "utils.h"
15
16
17#define SUMMARY_EVERY_US 1000000
18
19
20
21
22uint64_t now_microseconds(void)
23{
24  struct timeval tv;
25  gettimeofday(&tv, NULL);
26  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
27}
28
29void microsleep(int usec)
30{
31  usleep(usec);
32}
33
34void die_on_error(int x, char const *context) {
35  if (x < 0) {
36    char *errstr = amqp_error_string(-x);
37    fprintf(stderr, "%s: %s\n", context, errstr);
38    free(errstr);
39    exit(1);
40  }
41}
42
43void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
44  switch (x.reply_type) {
45    case AMQP_RESPONSE_NORMAL:
46      return;
47
48    case AMQP_RESPONSE_NONE:
49      fprintf(stderr, "%s: missing RPC reply type!\n", context);
50      break;
51
52    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
53      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
54      break;
55
56    case AMQP_RESPONSE_SERVER_EXCEPTION:
57      switch (x.reply.id) {
58        case AMQP_CONNECTION_CLOSE_METHOD: {
59          amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
60          fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
61                  context,
62                  m->reply_code,
63                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
64          break;
65        }
66        case AMQP_CHANNEL_CLOSE_METHOD: {
67          amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
68          fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
69                  context,
70                  m->reply_code,
71                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
72          break;
73        }
74        default:
75          fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
76          break;
77      }
78      break;
79  }
80
81  exit(1);
82}
83
84static void send_batch(amqp_connection_state_t conn, char const *queue_name, int rate_limit, int message_count)
85{
86  uint64_t start_time = now_microseconds();
87  int i;
88  int sent = 0;
89  int previous_sent = 0;
90  uint64_t previous_report_time = start_time;
91  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
92
93  char message[256];
94  amqp_bytes_t message_bytes;
95
96  for (i = 0; i < (int)sizeof(message); i++) {
97    message[i] = i & 0xff;
98  }
99
100  message_bytes.len = sizeof(message);
101  message_bytes.bytes = message;
102
103  for (i = 0; i < message_count; i++) {
104    uint64_t now = now_microseconds();
105
106    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing");
107    sent++;
108    if (now > next_summary_time) {
109      int countOverInterval = sent - previous_sent;
110      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
111      printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
112
113      previous_sent = sent;
114      previous_report_time = now;
115      next_summary_time += SUMMARY_EVERY_US;
116    }
117
118    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
119      microsleep(2000);
120      now = now_microseconds();
121    }
122  }
123
124  {
125    uint64_t stop_time = now_microseconds();
126    int total_delta = stop_time - start_time;
127
128    printf("PRODUCER - Message count: %d\n", message_count);
129    printf("Total time, milliseconds: %d\n", total_delta / 1000);
130    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
131  }
132}
133
134int main(int argc, char const * const *argv) {
135  char const *hostname;
136  int port;
137  int rate_limit;
138  int message_count;
139  int sockfd;
140  amqp_connection_state_t conn;
141
142  if (argc < 5) {
143    fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n");
144    return 1;
145  }
146
147  hostname = argv[1];
148  port = atoi(argv[2]);
149  rate_limit = atoi(argv[3]);
150  message_count = atoi(argv[4]);
151
152  conn = amqp_new_connection();
153
154  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
155
156  amqp_set_sockfd(conn, sockfd);
157
158  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
159
160  amqp_channel_open(conn, 1);
161
162  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
163
164  send_batch(conn, "test queue", rate_limit, message_count);
165
166  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
167  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
168  die_on_error(amqp_destroy_connection(conn), "Ending connection");
169
170  return 0;
171}
Note: See TracBrowser for help on using the repository browser.