source: trunk/Monitoring/CNClient/send_AMQP_msg.c @ 852

Last change on this file since 852 was 852, checked in by jripsl, 11 years ago
  • AMQP C-client implementation.
File size: 9.3 KB
Line 
1#include <stdlib.h>
2#include <stdio.h>
3#include <string.h>
4#include <stdint.h>
5#include <sys/time.h>
6#include <unistd.h>
7
8
9#include <ctype.h>
10
11#include <amqp.h>
12#include <amqp_framing.h>
13
14#include "utils.h"
15
16
17#define SUMMARY_EVERY_US 1000000
18
19
20
21
22uint64_t now_microseconds(void)
23{
24  struct timeval tv;
25  gettimeofday(&tv, NULL);
26  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
27}
28
29void microsleep(int usec)
30{
31  usleep(usec);
32}
33
34void die_on_error(int x, char const *context) {
35  if (x < 0) {
36    char *errstr = amqp_error_string(-x);
37    fprintf(stderr, "%s: %s\n", context, errstr);
38    free(errstr);
39    exit(1);
40  }
41}
42
43void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
44  switch (x.reply_type) {
45    case AMQP_RESPONSE_NORMAL:
46      return;
47
48    case AMQP_RESPONSE_NONE:
49      fprintf(stderr, "%s: missing RPC reply type!\n", context);
50      break;
51
52    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
53      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
54      break;
55
56    case AMQP_RESPONSE_SERVER_EXCEPTION:
57      switch (x.reply.id) {
58        case AMQP_CONNECTION_CLOSE_METHOD: {
59          amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
60          fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
61                  context,
62                  m->reply_code,
63                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
64          break;
65        }
66        case AMQP_CHANNEL_CLOSE_METHOD: {
67          amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
68          fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
69                  context,
70                  m->reply_code,
71                  (int) m->reply_text.len, (char *) m->reply_text.bytes);
72          break;
73        }
74        default:
75          fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
76          break;
77      }
78      break;
79  }
80
81  exit(1);
82}
83
84static void send_batch(amqp_connection_state_t conn, char const *queue_name, int rate_limit, int message_count)
85{
86  uint64_t start_time = now_microseconds();
87  int i;
88  int sent = 0;
89  int previous_sent = 0;
90  uint64_t previous_report_time = start_time;
91  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
92
93  char message[256];
94  amqp_bytes_t message_bytes;
95
96  for (i = 0; i < (int)sizeof(message); i++) {
97    message[i] = i & 0xff;
98  }
99
100  message_bytes.len = sizeof(message);
101  message_bytes.bytes = message;
102
103  for (i = 0; i < message_count; i++) {
104    uint64_t now = now_microseconds();
105
106    { 
107      amqp_basic_properties_t props;
108      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
109      props.content_type = amqp_cstring_bytes("text/plain");
110      props.delivery_mode = 2; /* persistent delivery mode */
111
112      // works ! (queue get populated even if no consumer)
113      die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
114
115      // don't works ! (queue get populated only if consumer up)
116      //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, &props, message_bytes), "Publishing");
117    }
118
119        // note that "amq.direct" is a special exchange
120        // (The empty exchange name is an alias for amq.direct)
121    //die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name), 0, 0, NULL, message_bytes), "Publishing");
122
123
124    sent++;
125    if (now > next_summary_time) {
126      int countOverInterval = sent - previous_sent;
127      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
128      printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
129
130      previous_sent = sent;
131      previous_report_time = now;
132      next_summary_time += SUMMARY_EVERY_US;
133    }
134
135    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
136      microsleep(2000);
137      now = now_microseconds();
138    }
139  }
140
141  {
142    uint64_t stop_time = now_microseconds();
143    int total_delta = stop_time - start_time;
144
145    printf("PRODUCER - Message count: %d\n", message_count);
146    printf("Total time, milliseconds: %d\n", total_delta / 1000);
147    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
148  }
149}
150
151//int main(int argc, char const * const *argv) {
152int main_light(int argc, char const * const *argv) {
153  char const *hostname;
154  int port;
155  int rate_limit;
156  int message_count;
157  int sockfd;
158  amqp_connection_state_t conn;
159
160  amqp_bytes_t reply_to_queue;
161
162  if (argc < 5) {
163    fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n");
164    return 1;
165  }
166
167  hostname = argv[1];
168  port = atoi(argv[2]);
169  rate_limit = atoi(argv[3]);
170  message_count = atoi(argv[4]);
171
172  conn = amqp_new_connection();
173
174  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
175
176  amqp_set_sockfd(conn, sockfd);
177
178  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
179
180  amqp_channel_open(conn, 1);
181
182  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
183
184  //JRA
185  /*
186  {
187    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("test queue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
188    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
189    reply_to_queue = amqp_bytes_malloc_dup(r->queue);
190    if (reply_to_queue.bytes == NULL) {
191      fprintf(stderr, "Out of memory while copying queue name");
192      return 1;
193    }
194  }
195  */
196
197  send_batch(conn, "test queue", rate_limit, message_count); // note that "test queue" here is used as the routing key
198
199  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
200  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
201  die_on_error(amqp_destroy_connection(conn), "Ending connection");
202
203  return 0;
204}
205
206
207//int main_complex(int argc, char const * const *argv) {
208int main(int argc, char const * const *argv) {
209  char const *hostname;
210  int port;
211  char const *exchange;
212  char const *routingkey;
213  char const *messagebody;
214
215
216 
217  int sockfd;
218  amqp_connection_state_t conn;
219
220  if (argc < 4) {
221    fprintf(stderr, "Usage: amqp_sendstring host port messagebody\n");
222    return 1;
223  }
224
225  hostname = argv[1];
226  port = atoi(argv[2]);
227  //exchange = argv[3];
228  //routingkey = argv[4];
229  messagebody = argv[5];
230
231  conn = amqp_new_connection();
232
233  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
234  amqp_set_sockfd(conn, sockfd);
235  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
236  amqp_channel_open(conn, 1);
237  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
238
239
240
241
242  //JRA
243  amqp_exchange_declare(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
244  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
245
246  //JRA
247  amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 1, 0, 0, amqp_empty_table); // durable && no auto-delete
248  die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
249
250  amqp_queue_bind(conn, 1, amqp_cstring_bytes("myqueue"), amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), amqp_empty_table); //no need for binding key as we use the fanout exchange type
251  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding");
252
253
254
255  { 
256    amqp_basic_properties_t props;
257    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
258    props.content_type = amqp_cstring_bytes("text/plain");
259    props.delivery_mode = 2; /* persistent delivery mode */
260    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("myexchange"), amqp_cstring_bytes(""), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing");
261  }
262
263  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
264  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
265  die_on_error(amqp_destroy_connection(conn), "Ending connection");
266  return 0;
267}
268
269// -- not used -- //
270
271static void dump_row(long count, int numinrow, int *chs) {
272  int i;
273
274  printf("%08lX:", count - numinrow);
275
276  if (numinrow > 0) {
277    for (i = 0; i < numinrow; i++) {
278      if (i == 8)
279    printf(" :");
280      printf(" %02X", chs[i]);
281    }
282    for (i = numinrow; i < 16; i++) {
283      if (i == 8)
284    printf(" :");
285      printf("   ");
286    }
287    printf("  ");
288    for (i = 0; i < numinrow; i++) {
289      if (isprint(chs[i]))
290    printf("%c", chs[i]);
291      else
292    printf(".");
293    }
294  }
295  printf("\n");
296}
297
298static int rows_eq(int *a, int *b) {
299  int i;
300
301  for (i=0; i<16; i++)
302    if (a[i] != b[i])
303      return 0;
304
305  return 1;
306}
307
308void amqp_dump(void const *buffer, size_t len) {
309  unsigned char *buf = (unsigned char *) buffer;
310  long count = 0;
311  int numinrow = 0;
312  int chs[16];
313  int oldchs[16] = {0};
314  int showed_dots = 0;
315  size_t i;
316
317  for (i = 0; i < len; i++) {
318    int ch = buf[i];
319
320    if (numinrow == 16) {
321      int i;
322
323      if (rows_eq(oldchs, chs)) {
324    if (!showed_dots) {
325      showed_dots = 1;
326      printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
327    } 
328      } else {
329    showed_dots = 0;
330    dump_row(count, numinrow, chs);
331      }
332
333      for (i=0; i<16; i++)
334    oldchs[i] = chs[i];
335
336      numinrow = 0;
337    }
338
339    count++;
340    chs[numinrow++] = ch;
341  }
342
343  dump_row(count, numinrow, chs);
344
345  if (numinrow != 0)
346    printf("%08lX:\n", count);
347}
348
Note: See TracBrowser for help on using the repository browser.