1 module amqp_consumer; 2 import std.stdio; 3 import std.string; 4 import std.conv:to; 5 6 import kaleidic.api.rabbitmq; 7 import kaleidic.api.rabbitmq.utils; 8 9 10 enum SUMMARY_EVERY_US = 1000000; 11 12 void run(amqp_connection_state_t conn) 13 { 14 ulong start_time = now_microseconds(); 15 int received = 0; 16 int previous_received = 0; 17 ulong previous_report_time = start_time; 18 ulong next_summary_time = start_time + SUMMARY_EVERY_US; 19 20 amqp_frame_t frame; 21 22 ulong now; 23 24 while(true) 25 { 26 amqp_rpc_reply_t ret; 27 amqp_envelope_t envelope; 28 29 now = now_microseconds(); 30 if (now > next_summary_time) { 31 int countOverInterval = received - previous_received; 32 double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); 33 printf("%d ms: Received %d - %d since last report (%d Hz)\n", 34 cast(int)(now - start_time) / 1000, received, countOverInterval, cast(int) intervalRate); 35 36 previous_received = received; 37 previous_report_time = now; 38 next_summary_time += SUMMARY_EVERY_US; 39 } 40 41 amqp_maybe_release_buffers(conn); 42 ret = amqp_consume_message(conn, &envelope, null, 0); 43 44 if (AMQP_RESPONSE_NORMAL != ret.reply_type) { 45 if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && 46 AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { 47 if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) { 48 return; 49 } 50 51 if (AMQP_FRAME_METHOD == frame.frame_type) { 52 switch (frame.payload.method.id) { 53 case AMQP_BASIC_ACK_METHOD: 54 /* if we've turned publisher confirms on, and we've published a message 55 * here is a message being confirmed 56 */ 57 58 break; 59 case AMQP_BASIC_RETURN_METHOD: 60 /* if a published message couldn't be routed and the mandatory flag was set 61 * this is what would be returned. The message then needs to be read. 62 */ 63 { 64 amqp_message_t message; 65 ret = amqp_read_message(conn, frame.channel, &message, 0); 66 if (AMQP_RESPONSE_NORMAL != ret.reply_type) { 67 return; 68 } 69 70 amqp_destroy_message(&message); 71 } 72 73 break; 74 75 case AMQP_CHANNEL_CLOSE_METHOD: 76 /* a channel.close method happens when a channel exception occurs, this 77 * can happen by publishing to an exchange that doesn't exist for example 78 * 79 * In this case you would need to open another channel redeclare any queues 80 * that were declared auto-delete, and restart any consumers that were attached 81 * to the previous channel 82 */ 83 return; 84 85 case AMQP_CONNECTION_CLOSE_METHOD: 86 /* a connection.close method happens when a connection exception occurs, 87 * this can happen by trying to use a channel that isn't open for example. 88 * 89 * In this case the whole connection must be restarted. 90 */ 91 return; 92 93 default: 94 stderr.writefln("An unexpected method was received: %s", frame.payload.method.id); 95 return; 96 } 97 } 98 } 99 100 } else { 101 amqp_destroy_envelope(&envelope); 102 } 103 104 received++; 105 } 106 } 107 108 int main(string[] args) 109 { 110 int status; 111 string exchange, bindingKey; 112 amqp_socket_t *socket = null; 113 amqp_connection_state_t conn; 114 115 amqp_bytes_t queuename; 116 117 if (args.length< 3) { 118 stderr.writeln("Usage: amqp_consumer host port\n"); 119 return 1; 120 } 121 122 auto hostname = args[1]; 123 auto port = args[2].to!ushort; 124 exchange = "amq.direct"; /* argv[3]; */ 125 bindingKey = "test queue"; /* argv[4]; */ 126 127 conn = amqp_new_connection(); 128 129 socket = amqp_tcp_socket_new(conn); 130 if (!socket) { 131 die("creating TCP socket"); 132 } 133 134 status = amqp_socket_open(socket, hostname.toStringz, port); 135 if (status) { 136 die("opening TCP socket"); 137 } 138 139 die_on_amqp_error(amqp_login(conn, "/".toStringz, 0, 131072, 0, SaslMethod.plain, "guest".toStringz, "guest".toStringz), 140 "Logging in".toStringz); 141 amqp_channel_open(conn, 1); 142 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 143 144 { 145 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1.to!ushort, cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, 1, 146 cast(amqp_table_t) amqp_empty_table); 147 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 148 queuename = amqp_bytes_malloc_dup(r.queue); 149 if (queuename.bytes is null) { 150 stderr.writeln("Out of memory while copying queue name"); 151 return 1; 152 } 153 } 154 155 amqp_queue_bind(conn, 1.to!ushort, queuename, amqp_cstring_bytes(exchange.toStringz), amqp_cstring_bytes(bindingKey.toStringz), 156 cast(amqp_table_t) amqp_empty_table); 157 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); 158 159 amqp_basic_consume(conn, 1.to!ushort, queuename,cast(amqp_bytes_t) amqp_empty_bytes, 0, 1, 0, cast(amqp_table_t)amqp_empty_table); 160 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); 161 162 run(conn); 163 164 die_on_amqp_error(amqp_channel_close(conn, 1.to!ushort, AMQP_REPLY_SUCCESS), "Closing channel"); 165 die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); 166 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 167 168 return 0; 169 }