1 module rabbitmq.examples.producer; 2 import std.stdio; 3 import std.string; 4 import std.exception; 5 import symmetry.api.rabbitmq; 6 import std.conv:to; 7 8 enum SUMMARY_EVERY_US = 1000000; 9 10 void sendBatch(amqp_connection_state_t conn, string queueName, int rateLimit, int messageCount) 11 { 12 ulong startTime = now_microseconds(); 13 int sent = 0; 14 int previousSent = 0; 15 ulong previousReportTime = startTime; 16 ulong nextSummaryTime = startTime + SUMMARY_EVERY_US; 17 18 char[256] message; 19 amqp_bytes_t message_bytes; 20 21 for(int i = 0; i < message.length; i++) 22 { 23 message[i] = i & 0xff; 24 } 25 26 message_bytes.len = message.length; 27 message_bytes.bytes = message.ptr; 28 29 for (int i = 0; i < messageCount; i++) 30 { 31 ulong now = now_microseconds(); 32 die_on_error(amqp_basic_publish(conn, 33 1, 34 amqp_string("amq.direct"), 35 amqp_string(queueName), 36 0, 37 0, 38 null, 39 message_bytes), 40 "Publishing"); 41 sent++; 42 43 if (now > nextSummaryTime) 44 { 45 int countOverInterval = sent - previousSent; 46 double intervalRate = countOverInterval / ((now - previousReportTime) / 1000000.0); 47 writefln("%s ms: Sent %s - %s since last report (%s Hz)", 48 (now - startTime) / 1000, sent, countOverInterval, intervalRate); 49 50 previousSent = sent; 51 previousReportTime = now; 52 nextSummaryTime += SUMMARY_EVERY_US; 53 } 54 55 while (((i * 1000000.0) / (now - startTime)) > rateLimit) { 56 microsleep(2000); 57 now = now_microseconds(); 58 } 59 } 60 61 ulong stopTime = now_microseconds(); 62 int totalDelta = (stopTime - startTime).to!int; 63 64 writefln("PRODUCER - Message count: %s", messageCount); 65 writefln("Total time, milliseconds: %s", totalDelta / 1000); 66 writefln("Overall messages-per-second: %s", (messageCount / (totalDelta / 1000000.0))); 67 } 68 69 struct Options 70 { 71 string hostname; 72 ushort port; 73 int rateLimit; 74 int messageCount; 75 string caCert; 76 bool verifyPeer; 77 bool verifyHostname; 78 string keyFile; 79 string certFile; 80 } 81 82 int main(string[] args) 83 { 84 import std.stdio: stderr, writef, writefln; 85 import std.conv:to; 86 import std.getopt; 87 88 int status; 89 Options options; 90 amqp_socket_t *socket; 91 amqp_connection_state_t conn; 92 93 auto helpInformation = getopt( args, 94 "hostname", &options.hostname, 95 "port", &options.port, 96 "rate-limit", &options.rateLimit, 97 "message-count", &options.messageCount, 98 "cacert", &options.caCert, 99 "verify-peer", &options.verifyPeer, 100 "verify-hostname", &options.verifyHostname, 101 "key-file", &options.keyFile, 102 "cert-file", &options.certFile, 103 ); 104 105 if (helpInformation.helpWanted) 106 { 107 defaultGetoptPrinter("amqp_producer", helpInformation.options); 108 return -1; 109 } 110 111 if (args.length < 5) { 112 stderr.writef("Usage: amqp_producer host port rate_limit message_count " ~ 113 "[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n"); 114 return 1; 115 } 116 117 options.hostname = args[1]; 118 options.port = args[2].to!ushort; 119 options.rateLimit = args[3].to!int; 120 options.messageCount = args[4].to!int; 121 122 conn = amqp_new_connection(); 123 124 socket = amqp_ssl_socket_new(conn); 125 enforce(socket !is null, "creating SSL/TLS socket"); 126 127 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer? 1: 0); 128 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 129 130 if(options.caCert.length >0) 131 { 132 status = amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz); 133 enforce(status == 0, "setting CA certificate"); 134 } 135 136 if (options.keyFile.length > 0) 137 { 138 enforce(options.certFile.length > 0, "must specify a cert-file if you specify a key-file"); 139 status = amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz); 140 enforce(status ==0, "setting client cert"); 141 } 142 else 143 { 144 enforce(options.certFile.length ==0, "must specify a key-file if you specify a cert-file"); 145 } 146 147 status = amqp_socket_open(socket, options.hostname.toStringz, options.port); 148 enforce(status == 0, "opening SSL/TLS connection"); 149 150 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 151 amqp_channel_open(conn, 1); 152 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 153 154 sendBatch(conn, "test queue", options.rateLimit, options.messageCount); 155 156 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 157 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 158 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 159 return 0; 160 }