1 module rabbitmq.examples.unbind; 2 import std.stdio; 3 import std.string; 4 import std.getopt; 5 import std.exception; 6 7 import symmetry.api.rabbitmq; 8 9 struct Options 10 { 11 string hostname; 12 ushort port; 13 string exchange; 14 string routingKey; 15 string messageBody; 16 bool useSSL = false; 17 string caCert; 18 bool verifyPeer; 19 bool verifyHostname; 20 string keyFile; 21 string certFile; 22 } 23 24 int main(string[] args) 25 { 26 Options options; 27 28 int status; 29 amqp_socket_t *socket; 30 amqp_connection_state_t conn; 31 32 auto helpInformation = getopt( args, 33 "hostname", &options.hostname, 34 "port", &options.port, 35 "exchange", &options.exchange, 36 "routing-key", &options.routingKey, 37 "message-body", &options.messageBody, 38 "use-ssl", &options.useSSL, 39 "cacert", &options.caCert, 40 "verify-peer", &options.verifyPeer, 41 "verify-hostname", &options.verifyHostname, 42 "key-file", &options.keyFile, 43 "cert-file", &options.certFile 44 ); 45 46 if (helpInformation.helpWanted) 47 { 48 defaultGetoptPrinter("sendstring",helpInformation.options); 49 return -1; 50 } 51 52 conn = amqp_new_connection(); 53 socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn); 54 enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket"); 55 56 if(options.useSSL) 57 { 58 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0); 59 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 60 61 if(options.caCert.length > 0) 62 { 63 enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 64 } 65 66 if (options.keyFile.length > 0) 67 { 68 enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file"); 69 enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert"); 70 } 71 } 72 73 enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection"); 74 75 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 76 amqp_channel_open(conn, 1); 77 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 78 79 { 80 amqp_basic_properties_t props; 81 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 82 props.content_type = amqp_string("text/plain"); 83 props.delivery_mode = 2; /* persistent delivery mode */ 84 die_on_error(amqp_basic_publish(conn, 85 1, 86 amqp_string(options.exchange), 87 amqp_string(options.routingKey), 88 0, 89 0, 90 &props, 91 amqp_string(options.messageBody)), 92 "Publishing"); 93 } 94 95 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 96 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 97 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 98 return 0; 99 }