1 module rabbitmq.examples.listen;
2 import std.stdio;
3 import std.string;
4 import std.exception;
5 import std.conv:to;
6 import std.getopt;
7 
8 import symmetry.api.rabbitmq;
9 
10 struct Options
11 {
12 	string hostname;
13 	int port;
14 	string exchange;
15 	string bindingKey;
16 	bool useSSL = false;
17 	string caCert;
18 	bool verifyPeer = false;
19 	bool verifyHostname = false;
20 	string keyFile;
21 	string certFile;
22 }
23 
24 int main(string[] args)
25 {
26 	Options options;
27 	int status;
28 	amqp_socket_t *socket;
29 	amqp_connection_state_t conn;
30 
31 	amqp_bytes_t queuename;
32 
33 	auto helpInformation = getopt(	args,
34 					std.getopt.config.required,
35 					"hostname",	&options.hostname,
36 					std.getopt.config.required,
37 					"port",		&options.port,
38 					std.getopt.config.required,
39 					"exchange",	&options.exchange,
40 					std.getopt.config.required,
41 					"use-ssl",	&options.useSSL,
42 					"binding-key",	&options.bindingKey,
43 					"cacert",	&options.caCert,
44 					"verify-peer",	&options.verifyPeer,
45 					"verify-hostname", &options.verifyHostname,
46 					"key-file",	&options.keyFile,
47 					"cert-file",	&options.certFile,
48 	);
49 
50 	if (helpInformation.helpWanted)
51 	{
52 		defaultGetoptPrinter("listen",helpInformation.options);
53 		return -1;
54 	}
55 
56 	conn = amqp_new_connection();
57 	socket = options.useSSL ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
58 	enforce(socket !is null, options.useSSL? "creating SSL/TLS socket" : "creating TCP socket");
59 
60 	if (options.useSSL)
61 	{
62 		amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1: 0);
63 		amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1 : 0);
64 
65 		if (options.caCert.length > 0)
66 		{
67 			enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
68 		}
69 
70 		if (options.keyFile.length > 0)
71 		{
72 			enforce (options.certFile.length > 0, "if you specify key-file you must specify cert-file");
73 			enforce( amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert");
74 		}
75 		else
76 		{
77 			enforce(options.certFile.length == 0, "you cannot specify cert-file if you do not specify key-file");
78 		}
79 	}
80 
81 	status = amqp_socket_open(socket, options.hostname.toStringz, options.port);
82 	enforce(status	== 0, "opening socket: " ~ amqp_error_string2(status).fromStringz);
83 
84 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
85 	amqp_channel_open(conn, 1);
86 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
87 
88 	{
89 		amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, 1, cast(amqp_table_t) amqp_empty_table);
90 		die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
91 		queuename = amqp_bytes_malloc_dup(r.queue);
92 		enforce(queuename.bytes !is null, "Out of memory while copying queue name");
93 	}
94 
95 	amqp_queue_bind(conn, 1, queuename, amqp_string(options.exchange), amqp_string(options.bindingKey), cast(amqp_table_t) amqp_empty_table);
96 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
97 
98 	amqp_basic_consume(conn, 1, queuename, cast(amqp_bytes_t) amqp_empty_bytes, 0, 1, 0, cast(amqp_table_t) amqp_empty_table);
99 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
100 
101 	{
102 		for (;;)
103 		{
104 			amqp_rpc_reply_t res;
105 			amqp_envelope_t envelope;
106 
107 			amqp_maybe_release_buffers(conn);
108 
109 			res = amqp_consume_message(conn, &envelope, null, 0);
110 
111 			if (AMQP_RESPONSE_NORMAL != res.reply_type) {
112 				break;
113 			}
114 
115 			writefln("Delivery %s, exchange %s routingkey %s",
116 			     envelope.delivery_tag,
117 			     fromBytes(envelope.exchange.bytes,envelope.exchange.len),
118 			     fromBytes(envelope.routing_key.bytes, envelope.routing_key.len));
119 
120 			if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
121 				writefln("Content-type: %s", fromBytes(envelope.message.properties.content_type.bytes,
122 						envelope.message.properties.content_type.len));
123 			}
124 			writef("----\n");
125 			amqp_dump(cast(ubyte[])fromBytes(envelope.message.body_.bytes,envelope.message.body_.len));
126 			amqp_destroy_envelope(&envelope);
127 		}
128 	}
129 
130 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
131 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
132 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
133 
134 	return 0;
135 }
136 
137 private char[] fromBytes(void* ptr, ulong len)
138 {
139 	return (cast(char*)ptr)[0..len].dup;
140 	/+
141 	import std.array:Appender;
142 	Appender!(char[]) ret;
143 	auto cPtr = cast(char*) ptr;
144 	foreach(i; 0 .. len)
145 		ret.put(cPtr[len]);
146 	writefln("fromBytes: %s, %s, %s ",len, (cast(char*)ptr)[0..len],ret.data);
147 	return ret.data;
148 	+/
149 }