1 module sendstring; 2 import std.stdio; 3 import std.string; 4 import std.getopt; 5 import std.exception; 6 7 import kaleidic.api.rabbitmq; 8 import kaleidic.api.rabbitmq.utils; 9 10 struct Options 11 { 12 string hostname; 13 ushort port; 14 string exchange; 15 string routingKey; 16 string messageBody; 17 bool useSSL = false; 18 string caCert; 19 bool verifyPeer; 20 bool verifyHostname; 21 string keyFile; 22 string certFile; 23 } 24 25 int main(string[] args) 26 { 27 Options options; 28 29 int status; 30 amqp_socket_t *socket; 31 amqp_connection_state_t conn; 32 33 auto helpInformation = getopt( args, 34 "hostname", &options.hostname, 35 "port", &options.port, 36 "exchange", &options.exchange, 37 "routing-key", &options.routingKey, 38 "message-body", &options.messageBody, 39 "use-ssl", &options.useSSL, 40 "cacert", &options.caCert, 41 "verify-peer", &options.verifyPeer, 42 "verify-hostname", &options.verifyHostname, 43 "key-file", &options.keyFile, 44 "cert-file", &options.certFile 45 ); 46 47 if (helpInformation.helpWanted) 48 { 49 defaultGetoptPrinter("sendstring",helpInformation.options); 50 return -1; 51 } 52 53 conn = amqp_new_connection(); 54 socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn); 55 enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket"); 56 57 if(options.useSSL) 58 { 59 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0); 60 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 61 62 if(options.caCert.length > 0) 63 { 64 enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 65 } 66 67 if (options.keyFile.length > 0) 68 { 69 enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file"); 70 enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert"); 71 } 72 } 73 74 enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection"); 75 76 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 77 amqp_channel_open(conn, 1); 78 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 79 80 { 81 amqp_basic_properties_t props; 82 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 83 props.content_type = amqp_string("text/plain"); 84 props.delivery_mode = 2; /* persistent delivery mode */ 85 die_on_error(amqp_basic_publish(conn, 86 1, 87 amqp_string(options.exchange), 88 amqp_string(options.routingKey), 89 0, 90 0, 91 &props, 92 amqp_string(options.messageBody)), 93 "Publishing"); 94 } 95 96 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 97 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 98 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 99 return 0; 100 }