1 module listen;
2 import std.stdio;
3 import std.string;
4 import std.exception;
5 import std.conv:to;
6 import std.getopt;
7 
8 import kaleidic.api.rabbitmq;
9 import kaleidic.api.rabbitmq.utils;
10 
11 struct Options
12 {
13 	string hostname;
14 	int port;
15 	string exchange;
16 	string bindingKey;
17 	bool useSSL = false;
18 	string caCert;
19 	bool verifyPeer = false;
20 	bool verifyHostname = false;
21 	string keyFile;
22 	string certFile;
23 }
24 
25 int main(string[] args)
26 {
27 	Options options;
28 	int status;
29 	amqp_socket_t *socket;
30 	amqp_connection_state_t conn;
31 
32 	amqp_bytes_t queuename;
33 
34 	auto helpInformation = getopt(	args,
35 					std.getopt.config.required,
36 					"hostname",	&options.hostname,
37 					std.getopt.config.required,
38 					"port",		&options.port,
39 					std.getopt.config.required,
40 					"exchange",	&options.exchange,
41 					std.getopt.config.required,
42 					"use-ssl",	&options.useSSL,
43 					"binding-key",	&options.bindingKey,
44 					"cacert",	&options.caCert,
45 					"verify-peer",	&options.verifyPeer,
46 					"verify-hostname", &options.verifyHostname,
47 					"key-file",	&options.keyFile,
48 					"cert-file",	&options.certFile,
49 	);
50 	
51 	if (helpInformation.helpWanted)
52 	{
53 		defaultGetoptPrinter("listen",helpInformation.options);
54 		return -1;
55 	}
56 
57 	conn = amqp_new_connection();
58 	socket = options.useSSL ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
59 	enforce(socket !is null, options.useSSL? "creating SSL/TLS socket" : "creating TCP socket");
60 
61 	if (options.useSSL)
62 	{
63 		amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1: 0);
64 		amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1 : 0);
65 
66 		if (options.caCert.length > 0)
67 		{
68 			enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
69 		}
70 
71 		if (options.keyFile.length > 0)
72 		{
73 			enforce (options.certFile.length > 0, "if you specify key-file you must specify cert-file");
74 			enforce( amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert");
75 		}
76 		else
77 		{
78 			enforce(options.certFile.length == 0, "you cannot specify cert-file if you do not specify key-file");
79 		}
80 	}
81 
82 	status = amqp_socket_open(socket, options.hostname.toStringz, options.port);
83 	enforce(status	== 0, "opening socket: " ~ amqp_error_string2(status).fromStringz);
84 
85 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
86 	amqp_channel_open(conn, 1);
87 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
88 
89 	{
90 		amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, 1, cast(amqp_table_t) amqp_empty_table);
91 		die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
92 		queuename = amqp_bytes_malloc_dup(r.queue);
93 		enforce(queuename.bytes !is null, "Out of memory while copying queue name");
94 	}
95 
96 	amqp_queue_bind(conn, 1, queuename, amqp_string(options.exchange), amqp_string(options.bindingKey), cast(amqp_table_t) amqp_empty_table);
97 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
98 
99 	amqp_basic_consume(conn, 1, queuename, cast(amqp_bytes_t) amqp_empty_bytes, 0, 1, 0, cast(amqp_table_t) amqp_empty_table);
100 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
101 
102 	{
103 		for (;;)
104 		{
105 			amqp_rpc_reply_t res;
106 			amqp_envelope_t envelope;
107 
108 			amqp_maybe_release_buffers(conn);
109 
110 			res = amqp_consume_message(conn, &envelope, null, 0);
111 
112 			if (AMQP_RESPONSE_NORMAL != res.reply_type) {
113 				break;
114 			}
115 
116 			writefln("Delivery %s, exchange %s routingkey %s",
117 			     envelope.delivery_tag,
118 			     fromBytes(envelope.exchange.bytes,envelope.exchange.len),
119 			     fromBytes(envelope.routing_key.bytes, envelope.routing_key.len));
120 
121 			if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
122 				writefln("Content-type: %s", fromBytes(envelope.message.properties.content_type.bytes,
123 						envelope.message.properties.content_type.len));
124 			}
125 			writef("----\n");
126 			amqp_dump(cast(ubyte[])fromBytes(envelope.message.body_.bytes,envelope.message.body_.len));
127 			amqp_destroy_envelope(&envelope);
128 		}
129 	}
130 
131 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
132 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
133 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
134 
135 	return 0;
136 }
137 
138 private char[] fromBytes(void* ptr, ulong len)
139 {
140 	return (cast(char*)ptr)[0..len].dup;
141 	/+
142 	import std.array:Appender;
143 	Appender!(char[]) ret;
144 	auto cPtr = cast(char*) ptr;
145 	foreach(i; 0 .. len)
146 		ret.put(cPtr[len]);
147 	writefln("fromBytes: %s, %s, %s ",len, (cast(char*)ptr)[0..len],ret.data);
148 	return ret.data;
149 	+/
150 }
151