1 module rabbitmq.examples.listenq; 2 import std.stdio; 3 import std.string; 4 import std.exception; 5 import std.conv:to; 6 import std.getopt; 7 8 import symmetry.api.rabbitmq; 9 10 struct Options 11 { 12 string hostname; 13 ushort port; 14 string queuename; 15 string caCert; 16 bool verifyPeer; 17 bool verifyHostname; 18 string keyFile; 19 string certFile; 20 } 21 22 int main(string[] args) 23 { 24 Options options; 25 int status; 26 amqp_socket_t *socket; 27 amqp_connection_state_t conn; 28 29 auto helpInformation = getopt( args, 30 "hostname", &options.hostname, 31 "port", &options.port, 32 "queuename", &options.queuename, 33 "cacert", &options.caCert, 34 "verify-peer", &options.verifyPeer, 35 "verify-hostname", &options.verifyHostname, 36 "keyFile", &options.keyFile, 37 "certFile", &options.certFile, 38 ); 39 40 if (helpInformation.helpWanted) 41 { 42 defaultGetoptPrinter("listenq", helpInformation.options); 43 return -1; 44 } 45 46 conn = amqp_new_connection(); 47 socket = amqp_ssl_socket_new(conn); 48 enforce(socket !is null, "creating SSL/TLS socket"); 49 50 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1 : 0); 51 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 52 53 if (options.caCert.length > 0) 54 { 55 enforce (amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 56 } 57 58 if (options.keyFile.length >0) 59 { 60 enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file"); 61 enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) ==0, "setting client cert"); 62 } 63 64 enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening SSL/TLS connection"); 65 66 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 67 amqp_channel_open(conn, 1); 68 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 69 70 amqp_basic_consume(conn, 1, amqp_string(options.queuename), cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, cast(amqp_table_t)amqp_empty_table); 71 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); 72 73 { 74 for (;;) { 75 amqp_rpc_reply_t res; 76 amqp_envelope_t envelope; 77 78 amqp_maybe_release_buffers(conn); 79 80 res = amqp_consume_message(conn, &envelope, null, 0); 81 82 if (AMQP_RESPONSE_NORMAL != res.reply_type) { 83 break; 84 } 85 86 writefln("Delivery %s, exchange %s routingkey %s", 87 envelope.delivery_tag, 88 cast(char[]) envelope.exchange.bytes[0..envelope.exchange.len], 89 cast(char[]) envelope.routing_key.bytes[0.. envelope.routing_key.len]); 90 91 if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { 92 writefln("Content-type: %s", 93 cast(char[]) envelope.message.properties.content_type.bytes 94 [0..envelope.message.properties.content_type.len]); 95 } 96 writefln("----"); 97 98 amqp_dump(cast(ubyte[]) envelope.message.body_.bytes[0.. envelope.message.body_.len]); 99 100 amqp_destroy_envelope(&envelope); 101 } 102 } 103 104 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 105 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 106 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 107 108 return 0; 109 }