1 module listen; 2 import std.stdio; 3 import std.string; 4 import std.exception; 5 import std.conv:to; 6 import std.getopt; 7 8 import kaleidic.api.rabbitmq; 9 import kaleidic.api.rabbitmq.utils; 10 11 struct Options 12 { 13 string hostname; 14 int port; 15 string exchange; 16 string bindingKey; 17 bool useSSL = false; 18 string caCert; 19 bool verifyPeer = false; 20 bool verifyHostname = false; 21 string keyFile; 22 string certFile; 23 } 24 25 int main(string[] args) 26 { 27 Options options; 28 int status; 29 amqp_socket_t *socket; 30 amqp_connection_state_t conn; 31 32 amqp_bytes_t queuename; 33 34 auto helpInformation = getopt( args, 35 std.getopt.config.required, 36 "hostname", &options.hostname, 37 std.getopt.config.required, 38 "port", &options.port, 39 std.getopt.config.required, 40 "exchange", &options.exchange, 41 std.getopt.config.required, 42 "use-ssl", &options.useSSL, 43 "binding-key", &options.bindingKey, 44 "cacert", &options.caCert, 45 "verify-peer", &options.verifyPeer, 46 "verify-hostname", &options.verifyHostname, 47 "key-file", &options.keyFile, 48 "cert-file", &options.certFile, 49 ); 50 51 if (helpInformation.helpWanted) 52 { 53 defaultGetoptPrinter("listen",helpInformation.options); 54 return -1; 55 } 56 57 conn = amqp_new_connection(); 58 socket = options.useSSL ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn); 59 enforce(socket !is null, options.useSSL? "creating SSL/TLS socket" : "creating TCP socket"); 60 61 if (options.useSSL) 62 { 63 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1: 0); 64 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1 : 0); 65 66 if (options.caCert.length > 0) 67 { 68 enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 69 } 70 71 if (options.keyFile.length > 0) 72 { 73 enforce (options.certFile.length > 0, "if you specify key-file you must specify cert-file"); 74 enforce( amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert"); 75 } 76 else 77 { 78 enforce(options.certFile.length == 0, "you cannot specify cert-file if you do not specify key-file"); 79 } 80 } 81 82 status = amqp_socket_open(socket, options.hostname.toStringz, options.port); 83 enforce(status == 0, "opening socket: " ~ amqp_error_string2(status).fromStringz); 84 85 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 86 amqp_channel_open(conn, 1); 87 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 88 89 { 90 amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, 1, cast(amqp_table_t) amqp_empty_table); 91 die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); 92 queuename = amqp_bytes_malloc_dup(r.queue); 93 enforce(queuename.bytes !is null, "Out of memory while copying queue name"); 94 } 95 96 amqp_queue_bind(conn, 1, queuename, amqp_string(options.exchange), amqp_string(options.bindingKey), cast(amqp_table_t) amqp_empty_table); 97 die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); 98 99 amqp_basic_consume(conn, 1, queuename, cast(amqp_bytes_t) amqp_empty_bytes, 0, 1, 0, cast(amqp_table_t) amqp_empty_table); 100 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); 101 102 { 103 for (;;) 104 { 105 amqp_rpc_reply_t res; 106 amqp_envelope_t envelope; 107 108 amqp_maybe_release_buffers(conn); 109 110 res = amqp_consume_message(conn, &envelope, null, 0); 111 112 if (AMQP_RESPONSE_NORMAL != res.reply_type) { 113 break; 114 } 115 116 writefln("Delivery %s, exchange %s routingkey %s", 117 envelope.delivery_tag, 118 fromBytes(envelope.exchange.bytes,envelope.exchange.len), 119 fromBytes(envelope.routing_key.bytes, envelope.routing_key.len)); 120 121 if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { 122 writefln("Content-type: %s", fromBytes(envelope.message.properties.content_type.bytes, 123 envelope.message.properties.content_type.len)); 124 } 125 writef("----\n"); 126 amqp_dump(cast(ubyte[])fromBytes(envelope.message.body_.bytes,envelope.message.body_.len)); 127 amqp_destroy_envelope(&envelope); 128 } 129 } 130 131 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 132 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 133 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 134 135 return 0; 136 } 137 138 private char[] fromBytes(void* ptr, ulong len) 139 { 140 return (cast(char*)ptr)[0..len].dup; 141 /+ 142 import std.array:Appender; 143 Appender!(char[]) ret; 144 auto cPtr = cast(char*) ptr; 145 foreach(i; 0 .. len) 146 ret.put(cPtr[len]); 147 writefln("fromBytes: %s, %s, %s ",len, (cast(char*)ptr)[0..len],ret.data); 148 return ret.data; 149 +/ 150 } 151