1 module amqp_producer; 2 import std.stdio; 3 import std.string; 4 import std.exception; 5 import kaleidic.api.rabbitmq; 6 import kaleidic.api.rabbitmq.utils; 7 import std.conv:to; 8 9 enum SUMMARY_EVERY_US = 1000000; 10 11 void sendBatch(amqp_connection_state_t conn, string queueName, int rateLimit, int messageCount) 12 { 13 ulong startTime = now_microseconds(); 14 int sent = 0; 15 int previousSent = 0; 16 ulong previousReportTime = startTime; 17 ulong nextSummaryTime = startTime + SUMMARY_EVERY_US; 18 19 char[256] message; 20 amqp_bytes_t message_bytes; 21 22 for(int i = 0; i < message.length; i++) 23 { 24 message[i] = i & 0xff; 25 } 26 27 message_bytes.len = message.length; 28 message_bytes.bytes = message.ptr; 29 30 for (int i = 0; i < messageCount; i++) 31 { 32 ulong now = now_microseconds(); 33 die_on_error(amqp_basic_publish(conn, 34 1, 35 amqp_string("amq.direct"), 36 amqp_string(queueName), 37 0, 38 0, 39 null, 40 message_bytes), 41 "Publishing"); 42 sent++; 43 44 if (now > nextSummaryTime) 45 { 46 int countOverInterval = sent - previousSent; 47 double intervalRate = countOverInterval / ((now - previousReportTime) / 1000000.0); 48 writefln("%s ms: Sent %s - %s since last report (%s Hz)", 49 (now - startTime) / 1000, sent, countOverInterval, intervalRate); 50 51 previousSent = sent; 52 previousReportTime = now; 53 nextSummaryTime += SUMMARY_EVERY_US; 54 } 55 56 while (((i * 1000000.0) / (now - startTime)) > rateLimit) { 57 microsleep(2000); 58 now = now_microseconds(); 59 } 60 } 61 62 ulong stopTime = now_microseconds(); 63 int totalDelta = (stopTime - startTime).to!int; 64 65 writefln("PRODUCER - Message count: %s", messageCount); 66 writefln("Total time, milliseconds: %s", totalDelta / 1000); 67 writefln("Overall messages-per-second: %s", (messageCount / (totalDelta / 1000000.0))); 68 } 69 70 struct Options 71 { 72 string hostname; 73 ushort port; 74 int rateLimit; 75 int messageCount; 76 string caCert; 77 bool verifyPeer; 78 bool verifyHostname; 79 string keyFile; 80 string certFile; 81 } 82 83 int main(string[] args) 84 { 85 import std.stdio: stderr, writef, writefln; 86 import std.conv:to; 87 import std.getopt; 88 89 int status; 90 Options options; 91 amqp_socket_t *socket; 92 amqp_connection_state_t conn; 93 94 auto helpInformation = getopt( args, 95 "hostname", &options.hostname, 96 "port", &options.port, 97 "rate-limit", &options.rateLimit, 98 "message-count", &options.messageCount, 99 "cacert", &options.caCert, 100 "verify-peer", &options.verifyPeer, 101 "verify-hostname", &options.verifyHostname, 102 "key-file", &options.keyFile, 103 "cert-file", &options.certFile, 104 ); 105 106 if (helpInformation.helpWanted) 107 { 108 defaultGetoptPrinter("amqp_producer", helpInformation.options); 109 return -1; 110 } 111 112 if (args.length < 5) { 113 stderr.writef("Usage: amqp_producer host port rate_limit message_count " ~ 114 "[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n"); 115 return 1; 116 } 117 118 options.hostname = args[1]; 119 options.port = args[2].to!ushort; 120 options.rateLimit = args[3].to!int; 121 options.messageCount = args[4].to!int; 122 123 conn = amqp_new_connection(); 124 125 socket = amqp_ssl_socket_new(conn); 126 enforce(socket !is null, "creating SSL/TLS socket"); 127 128 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer? 1: 0); 129 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 130 131 if(options.caCert.length >0) 132 { 133 status = amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz); 134 enforce(status == 0, "setting CA certificate"); 135 } 136 137 if (options.keyFile.length > 0) 138 { 139 enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file"); 140 status = amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz); 141 enforce(status ==0, "setting client cert"); 142 } 143 else 144 { 145 enforce(options.certFile.length ==0, "must specify a key-file if you specify a cert-file"); 146 } 147 148 status = amqp_socket_open(socket, options.hostname.toStringz, options.port); 149 enforce(status == 0, "opening SSL/TLS connection"); 150 151 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 152 amqp_channel_open(conn, 1); 153 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 154 155 sendBatch(conn, "test queue", options.rateLimit, options.messageCount); 156 157 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 158 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 159 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 160 return 0; 161 }