1 module rabbitmq.examples.producer;
2 import std.stdio;
3 import std.string;
4 import std.exception;
5 import symmetry.api.rabbitmq;
6 import std.conv:to;
7 
8 enum SUMMARY_EVERY_US = 1000000;
9 
10 void sendBatch(amqp_connection_state_t conn, string queueName, int rateLimit, int messageCount)
11 {
12 	ulong startTime = now_microseconds();
13 	int sent = 0;
14 	int previousSent = 0;
15 	ulong previousReportTime = startTime;
16 	ulong nextSummaryTime = startTime + SUMMARY_EVERY_US;
17 
18 	char[256] message;
19 	amqp_bytes_t message_bytes;
20 
21 	for(int i = 0; i < message.length; i++)
22 	{
23 		message[i] = i & 0xff;
24 	}
25 
26 	message_bytes.len = message.length;
27 	message_bytes.bytes = message.ptr;
28 
29 	for (int i = 0; i < messageCount; i++)
30 	{
31 		ulong now = now_microseconds();
32 		die_on_error(amqp_basic_publish(conn,
33 				    1,
34 				    amqp_string("amq.direct"),
35 				    amqp_string(queueName),
36 				    0,
37 				    0,
38 				    null,
39 				    message_bytes),
40 		 "Publishing");
41 		sent++;
42 
43 		if (now > nextSummaryTime)
44 		{
45 			int countOverInterval = sent - previousSent;
46 			double intervalRate = countOverInterval / ((now - previousReportTime) / 1000000.0);
47 			writefln("%s ms: Sent %s - %s since last report (%s Hz)",
48 				(now - startTime) / 1000, sent, countOverInterval, intervalRate);
49 
50 			previousSent = sent;
51 			previousReportTime = now;
52 			nextSummaryTime += SUMMARY_EVERY_US;
53 		}
54 
55 		while (((i * 1000000.0) / (now - startTime)) > rateLimit) {
56 			microsleep(2000);
57 			now = now_microseconds();
58 		}
59 	}
60 
61 	ulong stopTime = now_microseconds();
62 	int totalDelta = (stopTime - startTime).to!int;
63 
64 	writefln("PRODUCER - Message count: %s", messageCount);
65 	writefln("Total time, milliseconds: %s", totalDelta / 1000);
66 	writefln("Overall messages-per-second: %s", (messageCount / (totalDelta / 1000000.0)));
67 }
68 
69 struct Options
70 {
71 	string hostname;
72 	ushort port;
73 	int rateLimit;
74 	int messageCount;
75 	string caCert;
76 	bool verifyPeer;
77 	bool verifyHostname;
78 	string keyFile;
79 	string certFile;
80 }
81 
82 int main(string[] args)
83 {
84 	import std.stdio: stderr, writef, writefln;
85 	import std.conv:to;
86 	import std.getopt;
87 
88 	int status;
89 	Options options;
90 	amqp_socket_t *socket;
91 	amqp_connection_state_t conn;
92 
93 	auto helpInformation = getopt(	args,
94 					"hostname",		&options.hostname,
95 					"port",			&options.port,
96 					"rate-limit",		&options.rateLimit,
97 					"message-count",	&options.messageCount,
98 					"cacert",		&options.caCert,
99 					"verify-peer",		&options.verifyPeer,
100 					"verify-hostname",	&options.verifyHostname,
101 					"key-file",		&options.keyFile,
102 					"cert-file",		&options.certFile,
103 	);
104 
105 	if (helpInformation.helpWanted)
106 	{
107 		defaultGetoptPrinter("amqp_producer", helpInformation.options);
108 		return -1;
109 	}
110 
111 	if (args.length < 5) {
112 		stderr.writef("Usage: amqp_producer host port rate_limit message_count " ~
113 		"[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n");
114 		return 1;
115 	}
116 
117 	options.hostname = args[1];
118 	options.port = args[2].to!ushort;
119 	options.rateLimit = args[3].to!int;
120 	options.messageCount = args[4].to!int;
121 
122 	conn = amqp_new_connection();
123 
124 	socket = amqp_ssl_socket_new(conn);
125 	enforce(socket !is null, "creating SSL/TLS socket");
126 
127 	amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer? 1: 0);
128 	amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
129 	
130 	if(options.caCert.length >0)
131 	{
132 		status = amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz);
133 		enforce(status == 0, "setting CA certificate");
134 	}
135 
136 	if (options.keyFile.length > 0)
137 	{
138 		enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file");
139 		status = amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz);
140 		enforce(status ==0, "setting client cert");
141 	}
142 	else
143 	{
144 		enforce(options.certFile.length ==0, "must specify a key-file if you specify a cert-file");
145 	}
146 
147 	status = amqp_socket_open(socket, options.hostname.toStringz, options.port);
148 	enforce(status == 0, "opening SSL/TLS connection");
149 
150 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
151 	amqp_channel_open(conn, 1);
152 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
153 
154 	sendBatch(conn, "test queue", options.rateLimit, options.messageCount);
155 
156 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
157 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
158 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
159 	return 0;
160 }