1 module rabbitmq.examples.listenq;
2 import std.stdio;
3 import std.string;
4 import std.exception;
5 import std.conv:to;
6 import std.getopt;
7 
8 import symmetry.api.rabbitmq;
9 
10 struct Options
11 {
12 	string hostname;
13 	ushort port;
14 	string queuename;
15 	string caCert;
16 	bool verifyPeer;
17 	bool verifyHostname;
18 	string keyFile;
19 	string certFile;
20 }
21 
22 int main(string[] args)
23 {
24 	Options options;
25 	int status;
26 	amqp_socket_t *socket;
27 	amqp_connection_state_t conn;
28 
29 	auto helpInformation = getopt(	args,
30 					"hostname",	&options.hostname,
31 					"port",		&options.port,
32 					"queuename",	&options.queuename,
33 					"cacert",	&options.caCert,
34 					"verify-peer",	&options.verifyPeer,
35 					"verify-hostname", &options.verifyHostname,
36 					"keyFile",	&options.keyFile,
37 					"certFile",	&options.certFile,
38 	);
39 	
40 	if (helpInformation.helpWanted)
41 	{
42 		defaultGetoptPrinter("listenq", helpInformation.options);
43 		return -1;
44 	}
45 
46 	conn = amqp_new_connection();
47 	socket = amqp_ssl_socket_new(conn);
48 	enforce(socket !is null, "creating SSL/TLS socket");
49 
50 	amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1 : 0);
51 	amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
52 
53 	if (options.caCert.length > 0)
54 	{
55 		enforce (amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
56 	}
57 	
58 	if (options.keyFile.length >0)
59 	{
60 		enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file");
61 		enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) ==0, "setting client cert");
62 	}
63 
64 	enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening SSL/TLS connection");
65 
66 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
67 	amqp_channel_open(conn, 1);
68 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
69 
70 	amqp_basic_consume(conn, 1, amqp_string(options.queuename), cast(amqp_bytes_t)amqp_empty_bytes, 0, 0, 0, cast(amqp_table_t)amqp_empty_table);
71 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
72 
73 	{
74 		for (;;) {
75 			amqp_rpc_reply_t res;
76 			amqp_envelope_t envelope;
77 
78 			amqp_maybe_release_buffers(conn);
79 
80 			res = amqp_consume_message(conn, &envelope, null, 0);
81 
82 			if (AMQP_RESPONSE_NORMAL != res.reply_type) {
83 				break;
84 			}
85 
86 			writefln("Delivery %s, exchange %s routingkey %s",
87 			     envelope.delivery_tag,
88 			     cast(char[]) envelope.exchange.bytes[0..envelope.exchange.len],
89 			     cast(char[]) envelope.routing_key.bytes[0.. envelope.routing_key.len]);
90 
91 			if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
92 				writefln("Content-type: %s",
93 			        cast(char[]) envelope.message.properties.content_type.bytes
94 		       				[0..envelope.message.properties.content_type.len]);
95 			}
96 			writefln("----");
97 
98 			amqp_dump(cast(ubyte[]) envelope.message.body_.bytes[0.. envelope.message.body_.len]);
99 
100 			amqp_destroy_envelope(&envelope);
101 		}
102 	}
103 
104 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
105 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
106 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
107 
108 	return 0;
109 }