1 module rabbitmq.examples.listen; 2 import std.stdio; 3 import std.string; 4 import std.exception; 5 import std.conv:to; 6 import std.getopt; 7 8 import symmetry.api.rabbitmq; 9 10 struct Options 11 { 12 string hostname; 13 int port; 14 string exchange; 15 string bindingKey; 16 bool useSSL = false; 17 string caCert; 18 bool verifyPeer = false; 19 bool verifyHostname = false; 20 string keyFile; 21 string certFile; 22 } 23 24 int main(string[] args) 25 { 26 Options options; 27 int status; 28 amqp_socket_t *socket; 29 amqp_connection_state_t conn; 30 31 amqp_bytes_t queuename; 32 33 auto helpInformation = getopt( args, 34 std.getopt.config.required, 35 "hostname", &options.hostname, 36 std.getopt.config.required, 37 "port", &options.port, 38 std.getopt.config.required, 39 "exchange", &options.exchange, 40 std.getopt.config.required, 41 "use-ssl", &options.useSSL, 42 "binding-key", &options.bindingKey, 43 "cacert", &options.caCert, 44 "verify-peer", &options.verifyPeer, 45 "verify-hostname", &options.verifyHostname, 46 "key-file", &options.keyFile, 47 "cert-file", &options.certFile, 48 ); 49 50 if (helpInformation.helpWanted) 51 { 52 defaultGetoptPrinter("listen",helpInformation.options); 53 return -1; 54 } 55 56 conn = amqp_new_connection(); 57 socket = options.useSSL ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn); 58 enforce(socket !is null, options.useSSL? "creating SSL/TLS socket" : "creating TCP socket"); 59 60 if (options.useSSL) 61 { 62 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1: 0); 63 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1 : 0); 64 65 if (options.caCert.length > 0) 66 { 67 enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 68 } 69 70 if (options.keyFile.length > 0) 71 { 72 enforce (options.certFile.length > 0, "if you specify key-file you must specify cert-file"); 73 enforce( amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert"); 74 } 75 else 76 { 77 enforce(options.certFile.length == 0, "you cannot specify cert-file if you do not specify key-file"); 78 } 79 } 80 81 status = amqp_socket_open(socket, options.hostname.toStringz, options.port); 82 enforce(status == 0, "opening socket: " ~ amqp_error_string2(status).fromStringz); 83 84 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 85 amqp_channel_open(conn, 1); 86 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 87 88 { 89 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, 1, cast(amqp_table_t) amqp_empty_table); 90 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 91 queuename = amqp_bytes_malloc_dup(r.queue); 92 enforce(queuename.bytes !is null, "Out of memory while copying queue name"); 93 } 94 95 amqp_queue_bind(conn, 1, queuename, amqp_string(options.exchange), amqp_string(options.bindingKey), cast(amqp_table_t) amqp_empty_table); 96 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); 97 98 amqp_basic_consume(conn, 1, queuename, cast(amqp_bytes_t) amqp_empty_bytes, 0, 1, 0, cast(amqp_table_t) amqp_empty_table); 99 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); 100 101 { 102 for (;;) 103 { 104 amqp_rpc_reply_t res; 105 amqp_envelope_t envelope; 106 107 amqp_maybe_release_buffers(conn); 108 109 res = amqp_consume_message(conn, &envelope, null, 0); 110 111 if (AMQP_RESPONSE_NORMAL != res.reply_type) { 112 break; 113 } 114 115 writefln("Delivery %s, exchange %s routingkey %s", 116 envelope.delivery_tag, 117 fromBytes(envelope.exchange.bytes,envelope.exchange.len), 118 fromBytes(envelope.routing_key.bytes, envelope.routing_key.len)); 119 120 if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { 121 writefln("Content-type: %s", fromBytes(envelope.message.properties.content_type.bytes, 122 envelope.message.properties.content_type.len)); 123 } 124 writef("----\n"); 125 amqp_dump(cast(ubyte[])fromBytes(envelope.message.body_.bytes,envelope.message.body_.len)); 126 amqp_destroy_envelope(&envelope); 127 } 128 } 129 130 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 131 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 132 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 133 134 return 0; 135 } 136 137 private char[] fromBytes(void* ptr, ulong len) 138 { 139 return (cast(char*)ptr)[0..len].dup; 140 /+ 141 import std.array:Appender; 142 Appender!(char[]) ret; 143 auto cPtr = cast(char*) ptr; 144 foreach(i; 0 .. len) 145 ret.put(cPtr[len]); 146 writefln("fromBytes: %s, %s, %s ",len, (cast(char*)ptr)[0..len],ret.data); 147 return ret.data; 148 +/ 149 }