1 module rpc_client_sendstring; 2 import std.stdio; 3 import std.string; 4 import std.getopt; 5 import std.exception; 6 7 import kaleidic.api.rabbitmq; 8 import kaleidic.api.rabbitmq.utils; 9 10 struct Options 11 { 12 string hostname; 13 ushort port; 14 string exchange; 15 string routingKey; 16 string messageBody; 17 bool useSSL = false; 18 string caCert; 19 bool verifyPeer; 20 bool verifyHostname; 21 string keyFile; 22 string certFile; 23 } 24 25 int main(string[] args) 26 { 27 Options options; 28 29 int status; 30 amqp_socket_t *socket; 31 amqp_connection_state_t conn; 32 amqp_bytes_t reply_to_queue; 33 34 auto helpInformation = getopt( args, 35 "hostname", &options.hostname, 36 "port", &options.port, 37 "exchange", &options.exchange, 38 "routing-key", &options.routingKey, 39 "message-body", &options.messageBody, 40 "use-ssl", &options.useSSL, 41 "cacert", &options.caCert, 42 "verify-peer", &options.verifyPeer, 43 "verify-hostname", &options.verifyHostname, 44 "key-file", &options.keyFile, 45 "cert-file", &options.certFile 46 ); 47 48 if (helpInformation.helpWanted) 49 { 50 defaultGetoptPrinter("rpc_client_sendstring",helpInformation.options); 51 return -1; 52 } 53 54 conn = amqp_new_connection(); 55 socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn); 56 enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket"); 57 58 if(options.useSSL) 59 { 60 amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0); 61 amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0); 62 63 if(options.caCert.length > 0) 64 { 65 enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate"); 66 } 67 68 if (options.keyFile.length > 0) 69 { 70 enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file"); 71 enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert"); 72 } 73 } 74 75 enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection"); 76 77 die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in"); 78 amqp_channel_open(conn, 1); 79 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); 80 81 // create private reply_to queue 82 { 83 amqp_queue_declare_ok_t* r = amqp_queue_declare(conn,1,amqp_empty_bytes, 0,0,0,1,amqp_empty_table); 84 die_on_amqp_error(amqp_get_rpc_reply(conn),"Declaring queue"); 85 reply_to_queue = amqp_bytes_malloc_dup(r.queue); 86 enforce(reply_to_queue.bytes !is null, "out of memory whilst copying queue name"); 87 } 88 89 // send the message 90 { 91 amqp_basic_properties_t props; 92 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | 93 AMQP_BASIC_CORRELATION_ID_FLAG; 94 props.content_type = amqp_string("text/plain"); 95 props.delivery_mode = 2; /* persistent delivery mode */ 96 props.reply_to = amqp_bytes_malloc_dup(reply_to_queue); 97 enforce(props.reply_to.bytes !is null, "out of memory whilst copying queue name"); 98 props.correlation_id = amqp_string("1"); 99 100 // publish 101 die_on_error(amqp_basic_publish(conn, 102 1, 103 amqp_string(options.exchange), 104 amqp_string(options.routingKey), 105 0, 106 0, 107 &props, 108 amqp_string(options.messageBody)), 109 "Publishing"); 110 amqp_bytes_free(props.reply_to); 111 } 112 113 // wait for an answer 114 { 115 amqp_basic_consume(conn,1,reply_to_queue,amqp_empty_bytes,0,1,0,amqp_empty_table); 116 die_on_amqp_error(amqp_get_rpc_reply(conn),"Consuming"); 117 amqp_bytes_free(reply_to_queue); 118 { 119 amqp_frame_t frame; 120 int result; 121 122 amqp_basic_deliver_t* d; 123 amqp_basic_properties_t* p; 124 size_t body_target; 125 size_t body_received; 126 for (;;) 127 { 128 amqp_maybe_release_buffers(conn); 129 result = amqp_simple_wait_frame(conn,&frame); 130 writefln("Result: %s",result); 131 if(result < 0) 132 break; 133 134 writefln("Frame type: %s channel: %s", frame.frame_type,frame.channel); 135 if (frame.frame_type != AMQP_FRAME_METHOD) 136 continue; 137 writefln("Method: %s",amqp_method_name(frame.payload.method.id).fromStringz); 138 if (frame.payload.method.id !=AMQP_BASIC_DELIVER_METHOD) 139 continue; 140 141 d= cast(amqp_basic_deliver_t*) frame.payload.method.decoded; 142 143 writefln("Delivery: %s exchange: %s routingKey: %s", d.delivery_tag, 144 asString(d.exchange), asString(d.routing_key)); 145 146 result = amqp_simple_wait_frame(conn,&frame); 147 if (result < 0) 148 break; 149 enforce(frame.frame_type == AMQP_FRAME_HEADER, "Expected header"); 150 p = cast(amqp_basic_properties_t*) frame.payload.properties.decoded; 151 if (p._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) 152 { 153 writefln("Content-type: %s", p.content_type.asString); 154 } 155 writefln("----"); 156 157 body_target = cast(size_t) frame.payload.properties.body_size; 158 body_received = 0; 159 160 while(body_received < body_target) 161 { 162 result = amqp_simple_wait_frame(conn,&frame); 163 if (result <0) 164 break; 165 enforce(frame.frame_type == AMQP_FRAME_BODY, "expected body"); 166 body_received += frame.payload.body_fragment.len; 167 enforce(body_received <=body_target); 168 amqp_dump((cast(ubyte*)frame.payload.body_fragment.bytes)[0..frame.payload.body_fragment.len]); 169 } 170 if (body_received !=body_target) 171 break; 172 break; 173 } 174 } 175 176 } 177 die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel"); 178 die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection"); 179 die_on_error(amqp_destroy_connection(conn), "Ending connection"); 180 return 0; 181 } 182 183 string asString(amqp_bytes_t bytes) 184 { 185 return (cast(char*)bytes.bytes)[0..bytes.len].idup; 186 }