#include #include #include #include #include #include #include #include #include #include #include "utils.h" #define SUMMARY_EVERY_US 1000000 uint64_t now_microseconds(void) { struct timeval tv; gettimeofday(&tv, NULL); return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec; } void microsleep(int usec) { usleep(usec); } void die_on_error(int x, char const *context) { if (x < 0) { char *errstr = amqp_error_string(-x); fprintf(stderr, "%s: %s\n", context, errstr); free(errstr); exit(1); } } void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: return; case AMQP_RESPONSE_NONE: fprintf(stderr, "%s: missing RPC reply type!\n", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; fprintf(stderr, "%s: server connection error %d, message: %.*s\n", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; fprintf(stderr, "%s: server channel error %d, message: %.*s\n", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } default: fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } break; } exit(1); } static void dump_row(long count, int numinrow, int *chs) { int i; printf("%08lX:", count - numinrow); if (numinrow > 0) { for (i = 0; i < numinrow; i++) { if (i == 8) printf(" :"); printf(" %02X", chs[i]); } for (i = numinrow; i < 16; i++) { if (i == 8) printf(" :"); printf(" "); } printf(" "); for (i = 0; i < numinrow; i++) { if (isprint(chs[i])) printf("%c", chs[i]); else printf("."); } } printf("\n"); } static int rows_eq(int *a, int *b) { int i; for (i=0; i<16; i++) if (a[i] != b[i]) return 0; return 1; } void amqp_dump(void const *buffer, size_t len) { unsigned char *buf = (unsigned char *) buffer; long count = 0; int numinrow = 0; int chs[16]; int oldchs[16] = {0}; int showed_dots = 0; size_t i; for (i = 0; i < len; i++) { int ch = buf[i]; if (numinrow == 16) { int i; if (rows_eq(oldchs, chs)) { if (!showed_dots) { showed_dots = 1; printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); } } else { showed_dots = 0; dump_row(count, numinrow, chs); } for (i=0; i<16; i++) oldchs[i] = chs[i]; numinrow = 0; } count++; chs[numinrow++] = ch; } dump_row(count, numinrow, chs); if (numinrow != 0) printf("%08lX:\n", count); } static void run(amqp_connection_state_t conn) { uint64_t start_time = now_microseconds(); int received = 0; int previous_received = 0; uint64_t previous_report_time = start_time; uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; amqp_frame_t frame; int result; size_t body_received; size_t body_target; uint64_t now; while (1) { now = now_microseconds(); if (now > next_summary_time) { int countOverInterval = received - previous_received; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Received %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); previous_received = received; previous_report_time = now; next_summary_time += SUMMARY_EVERY_US; } amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); if (result < 0) return; if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) continue; result = amqp_simple_wait_frame(conn, &frame); if (result < 0) return; if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); abort(); } body_target = frame.payload.properties.body_size; body_received = 0; while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); if (result < 0) return; if (frame.frame_type != AMQP_FRAME_BODY) { fprintf(stderr, "Expected body!"); abort(); } body_received += frame.payload.body_fragment.len; assert(body_received <= body_target); } received++; } } int main(int argc, char const * const *argv) { char const *hostname; int port; char const *exchange; char const *bindingkey; int sockfd; amqp_connection_state_t conn; amqp_bytes_t queuename; if (argc < 3) { fprintf(stderr, "Usage: amqp_consumer host port\n"); return 1; } hostname = argv[1]; port = atoi(argv[2]); exchange = "amq.direct"; /* argv[3]; */ bindingkey = "test queue"; /* argv[4]; */ conn = amqp_new_connection(); die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); /* { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { fprintf(stderr, "Out of memory while copying queue name"); return 1; } } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); */ amqp_basic_consume(conn, 1, amqp_cstring_bytes("test queue"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); run(conn); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; }