1 module listenq;
2 import std.stdio;
3 import std.string;
4 import std.exception;
5 import std.conv:to;
6 import std.getopt;
7 
8 import kaleidic.api.rabbitmq;
9 import kaleidic.api.rabbitmq.utils;
10 
11 struct Options
12 {
13 	string hostname;
14 	ushort port;
15 	string queuename;
16 	string caCert;
17 	bool verifyPeer;
18 	bool verifyHostname;
19 	string keyFile;
20 	string certFile;
21 }
22 
23 int main(string[] args)
24 {
25 	Options options;
26 	int status;
27 	amqp_socket_t *socket;
28 	amqp_connection_state_t conn;
29 
30 	auto helpInformation = getopt(	args,
31 					"hostname",	&options.hostname,
32 					"port",		&options.port,
33 					"queuename",	&options.queuename,
34 					"cacert",	&options.caCert,
35 					"verify-peer",	&options.verifyPeer,
36 					"verify-hostname", &options.verifyHostname,
37 					"keyFile",	&options.keyFile,
38 					"certFile",	&options.certFile,
39 	);
40 	
41 	if (helpInformation.helpWanted)
42 	{
43 		defaultGetoptPrinter("listenq", helpInformation.options);
44 		return -1;
45 	}
46 
47 	conn = amqp_new_connection();
48 	socket = amqp_ssl_socket_new(conn);
49 	enforce(socket !is null, "creating SSL/TLS socket");
50 
51 	amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1 : 0);
52 	amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
53 
54 	if (options.caCert.length > 0)
55 	{
56 		enforce (amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
57 	}
58 	
59 	if (options.keyFile.length >0)
60 	{
61 		enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file");
62 		enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) ==0, "setting client cert");
63 	}
64 
65 	enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening SSL/TLS connection");
66 
67 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
68 	amqp_channel_open(conn, 1);
69 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
70 
71 	amqp_basic_consume(conn, 1, amqp_string(options.queuename), cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, cast(amqp_table_t)amqp_empty_table);
72 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
73 
74 	{
75 		for (;;) {
76 			amqp_rpc_reply_t res;
77 			amqp_envelope_t envelope;
78 
79 			amqp_maybe_release_buffers(conn);
80 
81 			res = amqp_consume_message(conn, &envelope, null, 0);
82 
83 			if (AMQP_RESPONSE_NORMAL != res.reply_type) {
84 				break;
85 			}
86 
87 			writefln("Delivery %s, exchange %s routingkey %s",
88 			     envelope.delivery_tag,
89 			     cast(char[]) envelope.exchange.bytes[0..envelope.exchange.len],
90 			     cast(char[]) envelope.routing_key.bytes[0.. envelope.routing_key.len]);
91 
92 			if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
93 				writefln("Content-type: %s",
94 			        cast(char[]) envelope.message.properties.content_type.bytes
95 		       				[0..envelope.message.properties.content_type.len]);
96 			}
97 			writefln("----");
98 
99 			amqp_dump(cast(ubyte[]) envelope.message.body_.bytes[0.. envelope.message.body_.len]);
100 
101 			amqp_destroy_envelope(&envelope);
102 		}
103 	}
104 
105 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
106 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
107 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
108 
109 	return 0;
110 }