1 module rabbitmq.examples.rpc_client_sendstring;
2 import std.stdio;
3 import std.string;
4 import std.getopt;
5 import std.exception;
6 
7 import symmetry.api.rabbitmq;
8 
9 struct Options
10 {
11 	string hostname;
12 	ushort port;
13 	string exchange;
14 	string routingKey;
15 	string messageBody;
16 	bool useSSL = false;
17 	string caCert;
18 	bool verifyPeer;
19 	bool verifyHostname;
20 	string keyFile;
21 	string certFile;
22 }
23 
24 int main(string[] args)
25 {
26 	Options options;
27 
28 	int status;
29 	amqp_socket_t *socket;
30 	amqp_connection_state_t conn;
31 	amqp_bytes_t reply_to_queue;
32 
33 	auto helpInformation = getopt(	args,
34 					"hostname",	&options.hostname,
35 					"port",		&options.port,
36 					"exchange",	&options.exchange,
37 					"routing-key",	&options.routingKey,
38 					"message-body",	&options.messageBody,
39 					"use-ssl",	&options.useSSL,
40 					"cacert",	&options.caCert,
41 					"verify-peer",	&options.verifyPeer,
42 					"verify-hostname", &options.verifyHostname,
43 					"key-file",	&options.keyFile,
44 					"cert-file",	&options.certFile
45 	);
46 
47 	if (helpInformation.helpWanted)
48 	{
49 		defaultGetoptPrinter("rpc_client_sendstring",helpInformation.options);
50 		return -1;
51 	}
52 
53 	conn = amqp_new_connection();
54 	socket = options.useSSL ? amqp_ssl_socket_new(conn): amqp_tcp_socket_new(conn);
55 	enforce(socket !is null, options.useSSL? "creating ssl/tls socket" : "creating tcp socket");
56 
57 	if(options.useSSL)
58 	{
59 		amqp_ssl_socket_set_verify_peer(socket, options.verifyPeer ? 1:0);
60 		amqp_ssl_socket_set_verify_hostname(socket, options.verifyHostname ? 1: 0);
61 
62 		if(options.caCert.length > 0)
63 		{
64 			enforce(amqp_ssl_socket_set_cacert(socket, options.caCert.toStringz) == 0, "setting CA certificate");
65 		}
66 
67 		if (options.keyFile.length > 0)
68 		{
69 			enforce(options.certFile.length > 0, "if you specify key-file you must also specify cert-file");
70 			enforce(amqp_ssl_socket_set_key(socket, options.certFile.toStringz, options.keyFile.toStringz) == 0, "setting client cert");
71 		}
72 	}
73 
74 	enforce(amqp_socket_open(socket, options.hostname.toStringz, options.port) == 0, "opening connection");
75 
76 	die_on_amqp_error(amqp_login(conn, "/".ptr, 0, 131072, 0, SaslMethod.plain, "guest".ptr, "guest".ptr), "Logging in");
77 	amqp_channel_open(conn, 1);
78 	die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
79 	
80 	// create private reply_to queue
81 	{
82 		amqp_queue_declare_ok_t* r = amqp_queue_declare(conn,1,amqp_empty_bytes, 0,0,0,1,amqp_empty_table);
83 		die_on_amqp_error(amqp_get_rpc_reply(conn),"Declaring queue");
84 		reply_to_queue = amqp_bytes_malloc_dup(r.queue);
85 		enforce(reply_to_queue.bytes !is null, "out of memory whilst copying queue name");
86 	}
87 
88 	// send the message
89 	{
90 		amqp_basic_properties_t props;
91 		props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG |
92 				AMQP_BASIC_CORRELATION_ID_FLAG;
93 		props.content_type = amqp_string("text/plain");
94 		props.delivery_mode = 2; /* persistent delivery mode */
95 		props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
96 		enforce(props.reply_to.bytes !is null, "out of memory whilst copying queue name");
97 		props.correlation_id = amqp_string("1");
98 
99 		// publish
100 		die_on_error(amqp_basic_publish(conn,
101 					    1,
102 					    amqp_string(options.exchange),
103 					    amqp_string(options.routingKey),
104 					    0,
105 					    0,
106 					    &props,
107 					    amqp_string(options.messageBody)),
108 			 "Publishing");
109 		amqp_bytes_free(props.reply_to);
110 	}
111 	
112 	// wait for an answer
113 	{
114 		amqp_basic_consume(conn,1,reply_to_queue,amqp_empty_bytes,0,1,0,amqp_empty_table);
115 		die_on_amqp_error(amqp_get_rpc_reply(conn),"Consuming");
116 		amqp_bytes_free(reply_to_queue);
117 		{
118 			amqp_frame_t frame;
119 			int result;
120 
121 			amqp_basic_deliver_t* d;
122 			amqp_basic_properties_t* p;
123 			size_t body_target;
124 			size_t body_received;
125 			for (;;)
126 			{
127 				amqp_maybe_release_buffers(conn);
128 				result = amqp_simple_wait_frame(conn,&frame);
129 				writefln("Result: %s",result);
130 				if(result < 0)
131 					break;
132 
133 				writefln("Frame type: %s channel: %s", frame.frame_type,frame.channel);
134 				if (frame.frame_type != AMQP_FRAME_METHOD)
135 					continue;
136 				writefln("Method: %s",amqp_method_name(frame.payload.method.id).fromStringz);
137 				if (frame.payload.method.id !=AMQP_BASIC_DELIVER_METHOD)
138 					continue;
139 
140 				d= cast(amqp_basic_deliver_t*) frame.payload.method.decoded;
141 
142 				writefln("Delivery: %s exchange: %s routingKey: %s", d.delivery_tag,
143 					asString(d.exchange), asString(d.routing_key));
144 
145 				result = amqp_simple_wait_frame(conn,&frame);
146 				if (result < 0)
147 					break;
148 				enforce(frame.frame_type == AMQP_FRAME_HEADER, "Expected header");
149 				p = cast(amqp_basic_properties_t*) frame.payload.properties.decoded;
150 				if (p._flags & AMQP_BASIC_CONTENT_TYPE_FLAG)
151 				{
152 					writefln("Content-type: %s", p.content_type.asString);
153 				}
154 				writefln("----");
155 
156 				body_target = cast(size_t) frame.payload.properties.body_size;
157 				body_received = 0;
158 
159 				while(body_received < body_target)
160 				{
161 					result = amqp_simple_wait_frame(conn,&frame);
162 					if (result <0)
163 						break;
164 					enforce(frame.frame_type == AMQP_FRAME_BODY, "expected body");
165 					body_received += frame.payload.body_fragment.len;
166 					enforce(body_received <=body_target);
167 					amqp_dump((cast(ubyte*)frame.payload.body_fragment.bytes)[0..frame.payload.body_fragment.len]);
168 				}
169 				if (body_received !=body_target)
170 					break;
171 				break;
172 			}
173 		}
174 
175 	}
176 	die_on_amqp_error(amqp_channel_close(conn, 1, ReplySuccess), "Closing channel");
177 	die_on_amqp_error(amqp_connection_close(conn, ReplySuccess), "Closing connection");
178 	die_on_error(amqp_destroy_connection(conn), "Ending connection");
179 	return 0;
180 }
181 
182 string asString(amqp_bytes_t bytes)
183 {
184 	return (cast(char*)bytes.bytes)[0..bytes.len].idup;
185 }