1 module rpc_client_sendstring;
2 import std.stdio;
3 import std.string;
4 import std.getopt;
5 import std.exception;
6 
7 import kaleidic.api.rabbitmq;
8 import kaleidic.api.rabbitmq.utils;
9 
10 struct Options
11 {
12 	string hostname;
13 	ushort port;
14 	string exchange;
15 	string routingKey;
16 	string messageBody;
17 	bool useSSL = false;
18 	string caCert;
19 	bool verifyPeer;
20 	bool verifyHostname;
21 	string keyFile;
22 	string certFile;
23 }
24 
25 int main(string[] args)
26 {
27 	Options options;
28 
29 	int status;
30 	amqp_socket_t *socket;
31 	amqp_connection_state_t conn;
32 	amqp_bytes_t reply_to_queue;
33 
34 	auto helpInformation = getopt(	args,
35 					"hostname",	&options.hostname,
36 					"port",		&options.port,
37 					"exchange",	&options.exchange,
38 					"routing-key",	&options.routingKey,
39 					"message-body",	&options.messageBody,
40 					"use-ssl",	&options.useSSL,
41 					"cacert",	&options.caCert,
42 					"verify-peer",	&options.verifyPeer,
43 					"verify-hostname", &options.verifyHostname,
44 					"key-file",	&options.keyFile,
45 					"cert-file",	&options.certFile
46 	);
47 
48 	if (helpInformation.helpWanted)
49 	{
50 		defaultGetoptPrinter("rpc_client_sendstring",helpInformation.options);
51 		return -1;
52 	}
53 
54 	conn = amqp_new_connection();
55 	socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn);
56 	enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket");
57 
58 	if(options.useSSL)
59 	{
60 		amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0);
61 		amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
62 
63 		if(options.caCert.length > 0)
64 		{
65 			enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
66 		}
67 
68 		if (options.keyFile.length > 0)
69 		{
70 			enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file");
71 			enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert");
72 		}
73 	}
74 
75 	enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection");
76 
77 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
78 	amqp_channel_open(conn, 1);
79 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
80 	
81 	// create private reply_to queue
82 	{
83 		amqp_queue_declare_ok_t* r = amqp_queue_declare(conn,1,amqp_empty_bytes, 0,0,0,1,amqp_empty_table);
84 		die_on_amqp_error(amqp_get_rpc_reply(conn),"Declaring queue");
85 		reply_to_queue = amqp_bytes_malloc_dup(r.queue);
86 		enforce(reply_to_queue.bytes !is null, "out of memory whilst copying queue name");
87 	}
88 
89 	// send the message
90 	{
91 		amqp_basic_properties_t props;
92 		props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG |
93 				AMQP_BASIC_CORRELATION_ID_FLAG;
94 		props.content_type = amqp_string("text/plain");
95 		props.delivery_mode = 2; /* persistent delivery mode */
96 		props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
97 		enforce(props.reply_to.bytes !is null, "out of memory whilst copying queue name");
98 		props.correlation_id = amqp_string("1");
99 
100 		// publish
101 		die_on_error(amqp_basic_publish(conn,
102 					    1,
103 					    amqp_string(options.exchange),
104 					    amqp_string(options.routingKey),
105 					    0,
106 					    0,
107 					    &props,
108 					    amqp_string(options.messageBody)),
109 			 "Publishing");
110 		amqp_bytes_free(props.reply_to);
111 	}
112 	
113 	// wait for an answer
114 	{
115 		amqp_basic_consume(conn,1,reply_to_queue,amqp_empty_bytes,0,1,0,amqp_empty_table);
116 		die_on_amqp_error(amqp_get_rpc_reply(conn),"Consuming");
117 		amqp_bytes_free(reply_to_queue);
118 		{
119 			amqp_frame_t frame;
120 			int result;
121 
122 			amqp_basic_deliver_t* d;
123 			amqp_basic_properties_t* p;
124 			size_t body_target;
125 			size_t body_received;
126 			for (;;)
127 			{
128 				amqp_maybe_release_buffers(conn);
129 				result = amqp_simple_wait_frame(conn,&frame);
130 				writefln("Result: %s",result);
131 				if(result < 0)
132 					break;
133 
134 				writefln("Frame type: %s channel: %s", frame.frame_type,frame.channel);
135 				if (frame.frame_type != AMQP_FRAME_METHOD)
136 					continue;
137 				writefln("Method: %s",amqp_method_name(frame.payload.method.id).fromStringz);
138 				if (frame.payload.method.id !=AMQP_BASIC_DELIVER_METHOD)
139 					continue;
140 
141 				d= cast(amqp_basic_deliver_t*) frame.payload.method.decoded;
142 
143 				writefln("Delivery: %s exchange: %s routingKey: %s", d.delivery_tag,
144 					asString(d.exchange), asString(d.routing_key));
145 
146 				result = amqp_simple_wait_frame(conn,&frame);
147 				if (result < 0)
148 					break;
149 				enforce(frame.frame_type == AMQP_FRAME_HEADER, "Expected header");
150 				p = cast(amqp_basic_properties_t*) frame.payload.properties.decoded;
151 				if (p._flags & AMQP_BASIC_CONTENT_TYPE_FLAG)
152 				{
153 					writefln("Content-type: %s", p.content_type.asString);
154 				}
155 				writefln("----");
156 
157 				body_target = cast(size_t) frame.payload.properties.body_size;
158 				body_received = 0;
159 
160 				while(body_received < body_target)
161 				{
162 					result = amqp_simple_wait_frame(conn,&frame);
163 					if (result <0)
164 						break;
165 					enforce(frame.frame_type == AMQP_FRAME_BODY, "expected body");
166 					body_received += frame.payload.body_fragment.len;
167 					enforce(body_received <=body_target);
168 					amqp_dump((cast(ubyte*)frame.payload.body_fragment.bytes)[0..frame.payload.body_fragment.len]);
169 				}
170 				if (body_received !=body_target)
171 					break;
172 				break;
173 			}
174 		}
175 
176 	}
177 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
178 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
179 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
180 	return 0;
181 }
182 
183 string asString(amqp_bytes_t bytes)
184 {
185 	return (cast(char*)bytes.bytes)[0..bytes.len].idup;
186 }