1 module kaleidic.api.rabbitmq.examples.bind; 2 import std.stdio; 3 import std.string; 4 5 import kaleidic.api.rabbitmq.rabbitmq; 6 import utils; 7 8 enum SUMMARY_EVERY_US = 1000000; 9 10 static void run(amqp_connection_state_t conn) 11 { 12 uint64_t start_time = now_microseconds(); 13 int received = 0; 14 int previous_received = 0; 15 uint64_t previous_report_time = start_time; 16 uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; 17 18 amqp_frame_t frame; 19 20 uint64_t now; 21 22 for (;;) { 23 amqp_rpc_reply_t ret; 24 amqp_envelope_t envelope; 25 26 now = now_microseconds(); 27 if (now > next_summary_time) { 28 int countOverInterval = received - previous_received; 29 double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); 30 printf("%d ms: Received %d - %d since last report (%d Hz)\n", 31 (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); 32 33 previous_received = received; 34 previous_report_time = now; 35 next_summary_time += SUMMARY_EVERY_US; 36 } 37 38 amqp_maybe_release_buffers(conn); 39 ret = amqp_consume_message(conn, &envelope, NULL, 0); 40 41 if (AMQP_RESPONSE_NORMAL != ret.reply_type) { 42 if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && 43 AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { 44 if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) { 45 return; 46 } 47 48 if (AMQP_FRAME_METHOD == frame.frame_type) { 49 switch (frame.payload.method.id) { 50 case AMQP_BASIC_ACK_METHOD: 51 /* if we've turned publisher confirms on, and we've published a message 52 * here is a message being confirmed 53 */ 54 55 break; 56 case AMQP_BASIC_RETURN_METHOD: 57 /* if a published message couldn't be routed and the mandatory flag was set 58 * this is what would be returned. The message then needs to be read. 59 */ 60 { 61 amqp_message_t message; 62 ret = amqp_read_message(conn, frame.channel, &message, 0); 63 if (AMQP_RESPONSE_NORMAL != ret.reply_type) { 64 return; 65 } 66 67 amqp_destroy_message(&message); 68 } 69 70 break; 71 72 case AMQP_CHANNEL_CLOSE_METHOD: 73 /* a channel.close method happens when a channel exception occurs, this 74 * can happen by publishing to an exchange that doesn't exist for example 75 * 76 * In this case you would need to open another channel redeclare any queues 77 * that were declared auto-delete, and restart any consumers that were attached 78 * to the previous channel 79 */ 80 return; 81 82 case AMQP_CONNECTION_CLOSE_METHOD: 83 /* a connection.close method happens when a connection exception occurs, 84 * this can happen by trying to use a channel that isn't open for example. 85 * 86 * In this case the whole connection must be restarted. 87 */ 88 return; 89 90 default: 91 fprintf(stderr ,"An unexpected method was received %u\n", frame.payload.method.id); 92 return; 93 } 94 } 95 } 96 97 } else { 98 amqp_destroy_envelope(&envelope); 99 } 100 101 102 received++; 103 } 104 } 105 106 int main(int argc, char const *const *argv) 107 { 108 char const *hostname; 109 int port, status; 110 char const *exchange; 111 char const *bindingkey; 112 amqp_socket_t *socket; 113 amqp_connection_state_t conn; 114 amqp_bytes_t queuename; 115 116 if (argc < 3) { 117 fprintf(stderr, "Usage: amqps_consumer host port " 118 "[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n"); 119 return 1; 120 } 121 122 hostname = argv[1]; 123 port = atoi(argv[2]); 124 exchange = "amq.direct"; /* argv[3]; */ 125 bindingkey = "test queue"; /* argv[4]; */ 126 127 conn = amqp_new_connection(); 128 129 socket = amqp_ssl_socket_new(conn); 130 if (!socket) { 131 die("creating SSL/TLS socket"); 132 } 133 134 amqp_ssl_socket_set_verify_peer(socket, 0); 135 amqp_ssl_socket_set_verify_hostname(socket, 0); 136 137 if (argc > 3) { 138 int nextarg = 4; 139 status = amqp_ssl_socket_set_cacert(socket, argv[3]); 140 if (status) { 141 die("setting CA certificate"); 142 } 143 if (argc > nextarg && !strcmp("verifypeer", argv[nextarg])) { 144 amqp_ssl_socket_set_verify_peer(socket, 1); 145 nextarg++; 146 } 147 if (argc > nextarg && !strcmp("verifyhostname", argv[nextarg])) { 148 amqp_ssl_socket_set_verify_hostname(socket, 1); 149 nextarg++; 150 } 151 if (argc > nextarg + 1) { 152 status = 153 amqp_ssl_socket_set_key(socket, argv[nextarg + 1], argv[nextarg]); 154 if (status) { 155 die("setting client key"); 156 } 157 } 158 } 159 160 status = amqp_socket_open(socket, hostname, port); 161 if (status) { 162 die("opening SSL/TLS connection"); 163 } 164 165 die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), 166 "Logging in"); 167 amqp_channel_open(conn, 1); 168 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 169 170 { 171 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, 172 amqp_empty_table); 173 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 174 queuename = amqp_bytes_malloc_dup(r->queue); 175 if (queuename.bytes == NULL) { 176 fprintf(stderr, "Out of memory while copying queue name"); 177 return 1; 178 } 179 } 180 181 amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), 182 amqp_empty_table); 183 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); 184 185 amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); 186 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); 187 188 run(conn); 189 190 die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); 191 die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 192 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 193 194 return 0; 195 }