1 module sendstring;
2 import std.stdio;
3 import std.string;
4 import std.getopt;
5 import std.exception;
6 
7 import kaleidic.api.rabbitmq;
8 import kaleidic.api.rabbitmq.utils;
9 
10 struct Options
11 {
12 	string hostname;
13 	ushort port;
14 	string exchange;
15 	string routingKey;
16 	string messageBody;
17 	bool useSSL = false;
18 	string caCert;
19 	bool verifyPeer;
20 	bool verifyHostname;
21 	string keyFile;
22 	string certFile;
23 }
24 
25 int main(string[] args)
26 {
27 	Options options;
28 
29 	int status;
30 	amqp_socket_t *socket;
31 	amqp_connection_state_t conn;
32 
33 	auto helpInformation = getopt(	args,
34 					"hostname",	&options.hostname,
35 					"port",		&options.port,
36 					"exchange",	&options.exchange,
37 					"routing-key",	&options.routingKey,
38 					"message-body",	&options.messageBody,
39 					"use-ssl",	&options.useSSL,
40 					"cacert",	&options.caCert,
41 					"verify-peer",	&options.verifyPeer,
42 					"verify-hostname", &options.verifyHostname,
43 					"key-file",	&options.keyFile,
44 					"cert-file",	&options.certFile
45 	);
46 
47 	if (helpInformation.helpWanted)
48 	{
49 		defaultGetoptPrinter("sendstring",helpInformation.options);
50 		return -1;
51 	}
52 
53 	conn = amqp_new_connection();
54 	socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn);
55 	enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket");
56 
57 	if(options.useSSL)
58 	{
59 		amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0);
60 		amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
61 
62 		if(options.caCert.length > 0)
63 		{
64 			enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
65 		}
66 
67 		if (options.keyFile.length > 0)
68 		{
69 			enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file");
70 			enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert");
71 		}
72 	}
73 
74 	enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection");
75 
76 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
77 	amqp_channel_open(conn, 1);
78 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
79 
80 	{
81 		amqp_basic_properties_t props;
82 		props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
83 		props.content_type = amqp_string("text/plain");
84 		props.delivery_mode = 2; /* persistent delivery mode */
85 		die_on_error(amqp_basic_publish(conn,
86 					    1,
87 					    amqp_string(options.exchange),
88 					    amqp_string(options.routingKey),
89 					    0,
90 					    0,
91 					    &props,
92 					    amqp_string(options.messageBody)),
93 			 "Publishing");
94 	}
95 
96 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
97 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
98 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
99 	return 0;
100 }