1 module amqp_producer;
2 import std.stdio;
3 import std.string;
4 import std.exception;
5 import kaleidic.api.rabbitmq;
6 import kaleidic.api.rabbitmq.utils;
7 import std.conv:to;
8 
9 enum SUMMARY_EVERY_US = 1000000;
10 
11 void sendBatch(amqp_connection_state_t conn, string queueName, int rateLimit, int messageCount)
12 {
13 	ulong startTime = now_microseconds();
14 	int sent = 0;
15 	int previousSent = 0;
16 	ulong previousReportTime = startTime;
17 	ulong nextSummaryTime = startTime + SUMMARY_EVERY_US;
18 
19 	char[256] message;
20 	amqp_bytes_t message_bytes;
21 
22 	for(int i = 0; i < message.length; i++)
23 	{
24 		message[i] = i & 0xff;
25 	}
26 
27 	message_bytes.len = message.length;
28 	message_bytes.bytes = message.ptr;
29 
30 	for (int i = 0; i < messageCount; i++)
31 	{
32 		ulong now = now_microseconds();
33 		die_on_error(amqp_basic_publish(conn,
34 				    1,
35 				    amqp_string("amq.direct"),
36 				    amqp_string(queueName),
37 				    0,
38 				    0,
39 				    null,
40 				    message_bytes),
41 		 "Publishing");
42 		sent++;
43 
44 		if (now > nextSummaryTime)
45 		{
46 			int countOverInterval = sent - previousSent;
47 			double intervalRate = countOverInterval / ((now - previousReportTime) / 1000000.0);
48 			writefln("%s ms: Sent %s - %s since last report (%s Hz)",
49 				(now - startTime) / 1000, sent, countOverInterval, intervalRate);
50 
51 			previousSent = sent;
52 			previousReportTime = now;
53 			nextSummaryTime += SUMMARY_EVERY_US;
54 		}
55 
56 		while (((i * 1000000.0) / (now - startTime)) > rateLimit) {
57 			microsleep(2000);
58 			now = now_microseconds();
59 		}
60 	}
61 
62 	ulong stopTime = now_microseconds();
63 	int totalDelta = (stopTime - startTime).to!int;
64 
65 	writefln("PRODUCER - Message count: %s", messageCount);
66 	writefln("Total time, milliseconds: %s", totalDelta / 1000);
67 	writefln("Overall messages-per-second: %s", (messageCount / (totalDelta / 1000000.0)));
68 }
69 
70 struct Options
71 {
72 	string hostname;
73 	ushort port;
74 	int rateLimit;
75 	int messageCount;
76 	string caCert;
77 	bool verifyPeer;
78 	bool verifyHostname;
79 	string keyFile;
80 	string certFile;
81 }
82 
83 int main(string[] args)
84 {
85 	import std.stdio: stderr, writef, writefln;
86 	import std.conv:to;
87 	import std.getopt;
88 
89 	int status;
90 	Options options;
91 	amqp_socket_t *socket;
92 	amqp_connection_state_t conn;
93 
94 	auto helpInformation = getopt(	args,
95 					"hostname",		&options.hostname,
96 					"port",			&options.port,
97 					"rate-limit",		&options.rateLimit,
98 					"message-count",	&options.messageCount,
99 					"cacert",		&options.caCert,
100 					"verify-peer",		&options.verifyPeer,
101 					"verify-hostname",	&options.verifyHostname,
102 					"key-file",		&options.keyFile,
103 					"cert-file",		&options.certFile,
104 	);
105 
106 	if (helpInformation.helpWanted)
107 	{
108 		defaultGetoptPrinter("amqp_producer", helpInformation.options);
109 		return -1;
110 	}
111 
112 	if (args.length < 5) {
113 		stderr.writef("Usage: amqp_producer host port rate_limit message_count " ~
114 		"[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n");
115 		return 1;
116 	}
117 
118 	options.hostname = args[1];
119 	options.port = args[2].to!ushort;
120 	options.rateLimit = args[3].to!int;
121 	options.messageCount = args[4].to!int;
122 
123 	conn = amqp_new_connection();
124 
125 	socket = amqp_ssl_socket_new(conn);
126 	enforce(socket !is null, "creating SSL/TLS socket");
127 
128 	amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer? 1: 0);
129 	amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
130 	
131 	if(options.caCert.length >0)
132 	{
133 		status = amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz);
134 		enforce(status == 0, "setting CA certificate");
135 	}
136 
137 	if (options.keyFile.length > 0)
138 	{
139 		enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file");
140 		status = amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz);
141 		enforce(status ==0, "setting client cert");
142 	}
143 	else
144 	{
145 		enforce(options.certFile.length ==0, "must specify a key-file if you specify a cert-file");
146 	}
147 
148 	status = amqp_socket_open(socket, options.hostname.toStringz, options.port);
149 	enforce(status == 0, "opening SSL/TLS connection");
150 
151 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
152 	amqp_channel_open(conn, 1);
153 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
154 
155 	sendBatch(conn, "test queue", options.rateLimit, options.messageCount);
156 
157 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
158 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
159 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
160 	return 0;
161 }