1 module listenq; 2 import std.stdio; 3 import std.string; 4 import std.exception; 5 import std.conv:to; 6 import std.getopt; 7 8 import kaleidic.api.rabbitmq; 9 import kaleidic.api.rabbitmq.utils; 10 11 struct Options 12 { 13 string hostname; 14 ushort port; 15 string queuename; 16 string caCert; 17 bool verifyPeer; 18 bool verifyHostname; 19 string keyFile; 20 string certFile; 21 } 22 23 int main(string[] args) 24 { 25 Options options; 26 int status; 27 amqp_socket_t *socket; 28 amqp_connection_state_t conn; 29 30 auto helpInformation = getopt( args, 31 "hostname", &options.hostname, 32 "port", &options.port, 33 "queuename", &options.queuename, 34 "cacert", &options.caCert, 35 "verify-peer", &options.verifyPeer, 36 "verify-hostname", &options.verifyHostname, 37 "keyFile", &options.keyFile, 38 "certFile", &options.certFile, 39 ); 40 41 if (helpInformation.helpWanted) 42 { 43 defaultGetoptPrinter("listenq", helpInformation.options); 44 return -1; 45 } 46 47 conn = amqp_new_connection(); 48 socket = amqp_ssl_socket_new(conn); 49 enforce(socket !is null, "creating SSL/TLS socket"); 50 51 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1 : 0); 52 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 53 54 if (options.caCert.length > 0) 55 { 56 enforce (amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 57 } 58 59 if (options.keyFile.length >0) 60 { 61 enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file"); 62 enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) ==0, "setting client cert"); 63 } 64 65 enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening SSL/TLS connection"); 66 67 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 68 amqp_channel_open(conn, 1); 69 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 70 71 amqp_basic_consume(conn, 1, amqp_string(options.queuename), cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, cast(amqp_table_t)amqp_empty_table); 72 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); 73 74 { 75 for (;;) { 76 amqp_rpc_reply_t res; 77 amqp_envelope_t envelope; 78 79 amqp_maybe_release_buffers(conn); 80 81 res = amqp_consume_message(conn, &envelope, null, 0); 82 83 if (AMQP_RESPONSE_NORMAL != res.reply_type) { 84 break; 85 } 86 87 writefln("Delivery %s, exchange %s routingkey %s", 88 envelope.delivery_tag, 89 cast(char[]) envelope.exchange.bytes[0..envelope.exchange.len], 90 cast(char[]) envelope.routing_key.bytes[0.. envelope.routing_key.len]); 91 92 if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { 93 writefln("Content-type: %s", 94 cast(char[]) envelope.message.properties.content_type.bytes 95 [0..envelope.message.properties.content_type.len]); 96 } 97 writefln("----"); 98 99 amqp_dump(cast(ubyte[]) envelope.message.body_.bytes[0.. envelope.message.body_.len]); 100 101 amqp_destroy_envelope(&envelope); 102 } 103 } 104 105 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 106 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 107 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 108 109 return 0; 110 }