1 module rabbitmq.examples.rpc_client_sendstring; 2 import std.stdio; 3 import std.string; 4 import std.getopt; 5 import std.exception; 6 7 import symmetry.api.rabbitmq; 8 9 struct Options 10 { 11 string hostname; 12 ushort port; 13 string exchange; 14 string routingKey; 15 string messageBody; 16 bool useSSL = false; 17 string caCert; 18 bool verifyPeer; 19 bool verifyHostname; 20 string keyFile; 21 string certFile; 22 } 23 24 int main(string[] args) 25 { 26 Options options; 27 28 int status; 29 amqp_socket_t *socket; 30 amqp_connection_state_t conn; 31 amqp_bytes_t reply_to_queue; 32 33 auto helpInformation = getopt( args, 34 "hostname", &options.hostname, 35 "port", &options.port, 36 "exchange", &options.exchange, 37 "routing-key", &options.routingKey, 38 "message-body", &options.messageBody, 39 "use-ssl", &options.useSSL, 40 "cacert", &options.caCert, 41 "verify-peer", &options.verifyPeer, 42 "verify-hostname", &options.verifyHostname, 43 "key-file", &options.keyFile, 44 "cert-file", &options.certFile 45 ); 46 47 if (helpInformation.helpWanted) 48 { 49 defaultGetoptPrinter("rpc_client_sendstring",helpInformation.options); 50 return -1; 51 } 52 53 conn = amqp_new_connection(); 54 socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn); 55 enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket"); 56 57 if(options.useSSL) 58 { 59 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0); 60 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 61 62 if(options.caCert.length > 0) 63 { 64 enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 65 } 66 67 if (options.keyFile.length > 0) 68 { 69 enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file"); 70 enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert"); 71 } 72 } 73 74 enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection"); 75 76 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 77 amqp_channel_open(conn, 1); 78 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 79 80 // create private reply_to queue 81 { 82 amqp_queue_declare_ok_t* r = amqp_queue_declare(conn,1,amqp_empty_bytes, 0,0,0,1,amqp_empty_table); 83 die_on_amqp_error(amqp_get_rpc_reply(conn),"Declaring queue"); 84 reply_to_queue = amqp_bytes_malloc_dup(r.queue); 85 enforce(reply_to_queue.bytes !is null, "out of memory whilst copying queue name"); 86 } 87 88 // send the message 89 { 90 amqp_basic_properties_t props; 91 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | 92 AMQP_BASIC_CORRELATION_ID_FLAG; 93 props.content_type = amqp_string("text/plain"); 94 props.delivery_mode = 2; /* persistent delivery mode */ 95 props.reply_to = amqp_bytes_malloc_dup(reply_to_queue); 96 enforce(props.reply_to.bytes !is null, "out of memory whilst copying queue name"); 97 props.correlation_id = amqp_string("1"); 98 99 // publish 100 die_on_error(amqp_basic_publish(conn, 101 1, 102 amqp_string(options.exchange), 103 amqp_string(options.routingKey), 104 0, 105 0, 106 &props, 107 amqp_string(options.messageBody)), 108 "Publishing"); 109 amqp_bytes_free(props.reply_to); 110 } 111 112 // wait for an answer 113 { 114 amqp_basic_consume(conn,1,reply_to_queue,amqp_empty_bytes,0,1,0,amqp_empty_table); 115 die_on_amqp_error(amqp_get_rpc_reply(conn),"Consuming"); 116 amqp_bytes_free(reply_to_queue); 117 { 118 amqp_frame_t frame; 119 int result; 120 121 amqp_basic_deliver_t* d; 122 amqp_basic_properties_t* p; 123 size_t body_target; 124 size_t body_received; 125 for (;;) 126 { 127 amqp_maybe_release_buffers(conn); 128 result = amqp_simple_wait_frame(conn,&frame); 129 writefln("Result: %s",result); 130 if(result < 0) 131 break; 132 133 writefln("Frame type: %s channel: %s", frame.frame_type,frame.channel); 134 if (frame.frame_type != AMQP_FRAME_METHOD) 135 continue; 136 writefln("Method: %s",amqp_method_name(frame.payload.method.id).fromStringz); 137 if (frame.payload.method.id !=AMQP_BASIC_DELIVER_METHOD) 138 continue; 139 140 d= cast(amqp_basic_deliver_t*) frame.payload.method.decoded; 141 142 writefln("Delivery: %s exchange: %s routingKey: %s", d.delivery_tag, 143 asString(d.exchange), asString(d.routing_key)); 144 145 result = amqp_simple_wait_frame(conn,&frame); 146 if (result < 0) 147 break; 148 enforce(frame.frame_type == AMQP_FRAME_HEADER, "Expected header"); 149 p = cast(amqp_basic_properties_t*) frame.payload.properties.decoded; 150 if (p._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) 151 { 152 writefln("Content-type: %s", p.content_type.asString); 153 } 154 writefln("----"); 155 156 body_target = cast(size_t) frame.payload.properties.body_size; 157 body_received = 0; 158 159 while(body_received < body_target) 160 { 161 result = amqp_simple_wait_frame(conn,&frame); 162 if (result <0) 163 break; 164 enforce(frame.frame_type == AMQP_FRAME_BODY, "expected body"); 165 body_received += frame.payload.body_fragment.len; 166 enforce(body_received <=body_target); 167 amqp_dump((cast(ubyte*)frame.payload.body_fragment.bytes)[0..frame.payload.body_fragment.len]); 168 } 169 if (body_received !=body_target) 170 break; 171 break; 172 } 173 } 174 175 } 176 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 177 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 178 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 179 return 0; 180 } 181 182 string asString(amqp_bytes_t bytes) 183 { 184 return (cast(char*)bytes.bytes)[0..bytes.len].idup; 185 }