1 module symmetry.api.rabbitmq.wrap;
2 import std.exception;
3 
4 import symmetry.api.rabbitmq.bindings;
5 import symmetry.api.rabbitmq.enums;
6 import symmetry.api.rabbitmq.utils;
7 import symmetry.api.rabbitmq.platform_utils;
8 
9 private template from(string moduleName)
10 {
11   mixin("import from = " ~ moduleName ~ ";");
12 }
13 
14 amqp_bytes_t amqp_string(string s)
15 {
16 	import std..string: toStringz;
17 	return s.toStringz.amqp_cstring_bytes;
18 }
19 
20 amqp_bytes_t asRabbit(ubyte[] buf)
21 {
22 	amqp_bytes_t ret;
23 	ret.len = buf.length;
24 	ret.bytes = cast(void*) buf.ptr;
25 	return ret;
26 }
27 
28 private string asString(amqp_bytes_t buf)
29 {
30 	import std.conv : to;
31 	import std.exception : enforce;
32     enforce(buf.bytes !is null);
33 	auto p = cast(char*)buf.bytes;
34 	return p[0..buf.len].to!string;
35 }
36 
37 
38 struct RabbitCredentials
39 {
40 	string user;
41 	string pass;
42 }
43 
44 struct ConnectionParams
45 {
46 	string hostname;
47 	ushort port = 5672;
48 	RabbitCredentials credentials;
49 	string exchange;
50 	ushort channel =1 ;
51 	int connectionAttempts = 10;
52 	int socketTimeout = 1;
53 	bool useSSL = false;
54 	string caCert;
55 	bool verifyPeer = false;
56 	bool verifyHostname = false;
57 	string keyFile;
58 	string certFile;
59 	string rpcQueue;
60 }
61 
62 
63 
64 amqp_basic_properties_t basicProperties(RabbitBasicFlag flags, string contentType, RabbitDeliveryMode deliveryMode, string replyTo, string correlationID)
65 {
66 	import std.experimental.logger: tracef;
67 	tracef("basic properties: %s",flags);
68 	amqp_basic_properties_t props = {
69 						_flags: 		flags,
70 						content_type:		amqp_string(contentType),
71 						delivery_mode:		deliveryMode,
72 						reply_to:		amqp_string(replyTo),
73 						correlation_id:		amqp_string(correlationID),
74 	};
75 	return props;
76 }
77 
78 struct AmqpConnectionState
79 {
80 	amqp_connection_state_t conn;
81 	alias conn this;
82 
83 	ushort channel;
84 	bool isChannelOpen = false;
85 
86 	this(amqp_connection_state_t conn)
87 	{
88 		this.conn = conn;
89 	}
90 
91 	void openChannel(ushort channel)
92 	{
93 		this.channel = channel;
94 		conn.amqp_channel_open(channel);
95 		die_on_amqp_error(amqp_get_rpc_reply(conn),"opening channel");
96 		this.isChannelOpen = true;
97 	}
98 
99 	~this()
100 	{
101 		if(conn !is null)
102 		{
103 			if (isChannelOpen)
104             {
105                 amqp_channel_close(conn,1,ReplySuccess);
106                 amqp_connection_close(conn,ReplySuccess);
107                 amqp_destroy_connection(conn);
108                 conn = null;
109             }
110 		}
111 	}
112 }
113 
114 final class AmqpConnection
115 {
116 	import std.typecons: RefCounted;
117 	RefCounted!AmqpConnectionState conn;
118 	alias conn this;
119 
120 	this(AmqpConnectionState conn)
121 	{
122 		this.conn = conn;
123 	}
124 
125 	ushort channel()
126 	{
127 		return conn.channel;
128 	}
129 
130 	void openChannel(ushort channel)
131 	{
132 		conn.openChannel(channel);
133 	}
134 
135 	amqp_socket_t* openSocket(string hostname, ushort port, bool useSSL, bool verifyPeer, bool verifyHostname, string caCert, string certFile, string keyFile)
136 	{
137 		import std.experimental.logger : tracef;
138 		import std..string: toStringz;
139 		import std.exception : enforce;
140 		import std.format: format;
141 		tracef("opening new socket: hostname(%s), useSSL (%s), verifyPeer(%s), verifyHostName(%s), caCert(%s), certFile(%s), keyFile(%s)",
142 				hostname, useSSL,verifyPeer,verifyHostname,caCert,certFile, keyFile);
143 		enforce(conn !is null, "connection is null");
144 		version(OpenSSL)
145 			amqp_socket_t* socket = useSSL ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
146 		else
147 		{
148 			enforce(!useSSL, "must build rabbitmq-d with OpenSSL version enabled");
149 			amqp_socket_t* socket = amqp_tcp_socket_new(conn);
150 		}
151 		enforce(socket !is null, "creating rabbit socket");
152 		version(OpenSSL)
153 		{
154 			if (useSSL)
155 			{
156 				amqp_ssl_socket_set_verify_peer(socket, verifyPeer ? 1:0);
157 				amqp_ssl_socket_set_verify_hostname(socket, verifyHostname ? 1: 0);
158 				enforce((caCert.length == 0) || (amqp_ssl_socket_set_cacert(socket, caCert.toStringz) == 0), "setting CA certificate");
159 				enforce((keyFile.length ==0 || certFile.length > 0), "if you specify key-file you must also specify cert-file");
160 				enforce((keyFile.length ==0) ||
161 					(amqp_ssl_socket_set_key(socket, certFile.toStringz, keyFile.toStringz) == 0),
162 					"setting client cert");
163 			}
164 		}
165 		enforce(amqp_socket_open(socket,hostname.toStringz, port)==0,
166 					format!"opening rabbit connection: useSSL %s, verifyPeer %s, verifyHostName %s, caCert %s, keyFile %s"
167 					(useSSL,verifyPeer,verifyHostname,caCert,keyFile)
168 		);
169 		return socket;
170 	}
171 
172 	void login(string vhost, int channelMax, int frameMax, int heartBeat, SaslMethod saslMethod, string user, string pass)
173 	{
174 		import std.experimental.logger:infof;
175 		import std.range:repeat;
176 		import std.exception : enforce;
177 		import std..string: toStringz;
178 		infof("logging in as user %s, pass %s",user,'*'.repeat(pass.length));
179 		enforce(conn !is null, "connection is null");
180 		const(char)* pVhost = vhost.toStringz;
181 		const(char)* pUser = user.toStringz;
182 		const(char)* pPass = pass.toStringz;
183 		die_on_amqp_error(amqp_login(conn,pVhost,channelMax,frameMax,heartBeat,saslMethod,pUser,pPass),"Logging in");
184 	}
185 
186 	void consumeQueue(string queueName, bool noLocal, bool noAck, bool exclusive)
187 	{
188 		import std.experimental.logger;
189 		import std.exception : enforce;
190 		tracef("consuming queue: %s", queueName);
191 		enforce(conn !is null, "connection is null");
192 		auto consumerTag = amqp_empty_bytes;
193 		auto amqpArguments = amqp_empty_table;
194 
195 		auto tag = conn.amqp_basic_consume(channel,amqp_string(queueName),consumerTag,noLocal?1:0,noAck?1:0,exclusive?1:0,amqpArguments);
196 		enforce(tag !is null && (tag.consumer_tag.len >0), "basic consume of response to rpc message - tag = " ~ tag.consumer_tag.asString);
197 		die_on_amqp_error(amqp_get_rpc_reply(conn),"consuming");
198 		tracef("consumer tag: %s", tag.consumer_tag.asString);
199 	}
200 
201 	amqp_queue_declare_ok_t*
202 	declareQueue(string queue, bool isPassive, bool isDurable, bool isExclusive, bool autoDelete, bool noWait, amqp_table_t arguments)
203 	{
204 		import std.exception : enforce;
205 		enforce(conn !is null, "connection is null");
206 		amqp_queue_declare_t req;
207 		req.ticket = 0;
208 		req.queue = (queue.length ==0 ) ? amqp_empty_bytes : amqp_string(queue);
209 		req.passive = isPassive ? 1:0;
210 		req.durable = isDurable ? 1:0;
211 		req.exclusive = isExclusive ? 1:0;
212 		req.auto_delete = autoDelete ? 1:0;
213 		req.nowait = noWait ? 1:0;
214 		req.arguments = arguments;
215 		return cast(amqp_queue_declare_ok_t*) conn.amqp_simple_rpc_decoded(channel,AMQP_QUEUE_DECLARE_METHOD,AMQP_QUEUE_DECLARE_OK_METHOD,&req);
216 	}
217 
218 	string declareReplyToQueue()
219 	{
220 		import std..string: fromStringz;
221 		import std.exception : enforce;
222 		enforce(conn !is null, "connection is null");
223 		enum isPassive = false;
224 		enum isDurable = false;
225 		enum isExclusive = true;
226 		enum autoDelete = false;
227 		enum noWait = false;
228 
229 		auto r  = declareQueue(null,isPassive,isDurable,isExclusive,autoDelete,noWait,amqp_empty_table);
230 		enforce(r !is null, "declaring queue");
231 		die_on_amqp_error(amqp_get_rpc_reply(conn),"declaring queue");
232 		enforce(r.queue.bytes !is null, "reading queue name");
233 		return (cast(char*)r.queue.bytes).fromStringz.idup;
234 	}
235 
236 	void basicPublish(string exchange, string routingKey, ref amqp_basic_properties_t props, string messageBody)
237 	{
238 		import std.exception : enforce;
239 		import std.experimental.logger: tracef;
240 		import std..string: toStringz;
241 		tracef("basicPublish - channel: %s,  exchange: %s, routingKey: %s, messageBody: %s",channel,exchange,routingKey,messageBody);
242 		enforce(conn !is null, "connection is null");
243 		enum mandatory = false;
244 		enum immediate = false;
245 		amqp_bytes_t message;
246 		message.bytes = cast(void*)messageBody.toStringz;
247 		message.len = messageBody.length+1;
248 		auto pExchange = (exchange.length ==0) ? amqp_empty_bytes : amqp_string(exchange);
249 		die_on_error(amqp_basic_publish(conn,channel,pExchange,amqp_string(routingKey),mandatory?1:0,immediate?1:0,&props,message), "publishing message");
250 		die_on_amqp_error(amqp_get_rpc_reply(conn),"publishing message");
251 	}
252 
253 	static AmqpConnection newConnection()
254 	{
255 		import std.exception : enforce;
256 		auto c = amqp_new_connection();
257 		enforce(c!is null, "opening connection");
258 		return new AmqpConnection(AmqpConnectionState(c));
259 	}
260 
261 	void maybeReleaseBuffers()
262 	{
263 		import std.exception : enforce;
264 		enforce(conn !is null, "connection is null");
265 		amqp_maybe_release_buffers(conn);
266 	}
267 
268 
269 	FrameResult simpleWaitFrame(from!"core.time".Duration duration = from!"core.time".Duration.init)
270 	{
271 		import std.exception : enforce;
272 		import core.time: Duration;
273 		import std.conv:to;
274 		import std.experimental.logger:trace,tracef;
275 		FrameResult ret;
276 		timeval timeout;
277 
278 		if(duration != Duration.init)
279 		{
280 			auto splitDuration = duration.split!("seconds","usecs");
281 			timeout.tv_sec = splitDuration.seconds.to!int;
282 			timeout.tv_usec = splitDuration.usecs.to!int;
283 		}
284 
285 		version(TraceFrame) tracef("waiting for frame");
286 		enforce(conn !is null, "connection is null");
287 		ret.result = (duration == Duration.init) ? 	amqp_simple_wait_frame(conn,&ret.frame) :
288 								amqp_simple_wait_frame_noblock(conn,&ret.frame,&timeout);
289 		version(TraceFrame) tracef("received frame - result: %s",ret.result);
290 		version(TraceFrame) tracef("Frame type: %s channel: %s", (ret.result < 0) ? '-' : ret.frame.frame_type,(ret.result<0)? 0: ret.frame.channel);
291 		return ret;
292 	}
293 
294 	FrameResult getFrame(from!"core.time".Duration timeout = from!"core.time".Duration.init)
295 	{
296 		version(HaveVibe) import vibe.core.core:yield;
297 		import std.experimental.logger:trace,tracef;
298 		import core.time:dur, Duration;
299 		import std.datetime:Clock;
300 		import std.exception : enforce;
301 		enforce(conn !is null, "connection is null");
302 		FrameResult frame;
303 		enum duration = dur!"msecs"(100);
304 		auto startTime = Clock.currTime(); // TODO - monotime
305 		Duration elapsedTime;
306 		do
307 		{
308 			version(HaveVibe) yield();
309 			frame = simpleWaitFrame(duration);
310 			auto nowTime = Clock.currTime();
311 			elapsedTime = nowTime - startTime;
312 		} while(frame.result == AMQP_STATUS_TIMEOUT && ((timeout == Duration.init)||(elapsedTime<timeout)));
313 		if (timeout != Duration.init && elapsedTime >= timeout)
314 			return frame;
315 		if(frame.result >=0 && frame.frame.isBasicDeliverMethod)
316 		{
317 			amqp_basic_deliver_t* d= basicDeliver(frame.frame);
318 			tracef("Delivery: %s exchange: %s routingKey: %s", d.delivery_tag, d.getExchange, d.routingKey);
319 			auto p = basicProperties(frame.frame);
320 			if(p.hasBasicContent)
321 				tracef("Content-type: %s",  p.contentType);
322 		}
323 		return frame;
324 	}
325 
326 	string getBody(size_t bodyTarget,from!"core.time".Duration timeout = from!"core.time".Duration.init)
327 	{
328 		import std.experimental.logger:trace,tracef;
329 		import std.array:Appender;
330 		import std.exception : enforce;
331 		import std.format: format;
332 		Appender!string ret;
333 
334 		enforce(conn !is null, "connection is null");
335 		size_t bodyReceived = 0;
336 		FrameResult frame;
337 		do
338 		{
339 			frame = getFrame(timeout);
340 			if (frame.result <0)
341 			{
342 				tracef("result code %s when receiving frame body", frame.result);
343 			}
344 			else
345 			{
346 				enforce(frame.frame.isBodyFrame, "expected body frame");
347 				bodyReceived += frame.frame.bodyFragmentLength;
348 				ret.put(frame.frame.bodyFragment);
349 				enforce(bodyReceived <=bodyTarget);
350 			}
351 		}
352 		while(frame.result >=0 && (bodyReceived < bodyTarget));
353 		enforce(bodyReceived == bodyTarget, format!"bodyReceived (%s) != bodyTarget(%s)"(bodyReceived,bodyTarget));
354 		return ret.data;
355 	}
356 }
357 
358 struct FrameResult
359 {
360 	int result;
361 	amqp_frame_t frame;
362 }
363 
364 struct CorrelationID
365 {
366 	string value;
367 }
368 
369 struct RabbitClient
370 {
371 	AmqpConnection conn;
372 	amqp_socket_t* socket;
373 	ConnectionParams params;
374 	string replyToQueue;
375 
376 	this(ConnectionParams params)
377 	{
378 		this.params = params;
379 	}
380 
381 	private void openSocket()
382 	{
383 		import std.exception : enforce;
384 		import std.experimental.logger: tracef;
385 		tracef("opening new socket: %s",params);
386 		enforce(socket is null, "trying to openSocket when it is already open");
387 		conn = AmqpConnection.newConnection();
388 		socket = conn.openSocket(params.hostname,params.port,params.useSSL,params.verifyPeer,params.verifyHostname,params.caCert,params.certFile,params.keyFile);
389 	}
390 
391 	private void login(string user, string pass)
392 	{
393 		conn.login("/",0,131_072,0,SaslMethod.plain,user,pass);
394 	}
395 
396 	private void openChannel(ushort channel)
397 	{
398 		conn.openChannel(channel);
399 	}
400 
401 	private void createReplyToQueue()
402 	{
403 		import std.experimental.logger;
404 		enum noLocal = true;	// do not send messages to connection that published them
405 		enum noAck = true;	// do not expect acknowledgements to messages
406 		enum exclusive = true;	// only this consumer can access the queue
407 
408 		tracef("creating replyToQueue");
409 		tracef("first declaring replyToQueue");
410 		replyToQueue = conn.declareReplyToQueue();
411         	tracef("declare succeeded - reply to queue:%s",replyToQueue);
412         	tracef("consuming queue");
413 		conn.consumeQueue(replyToQueue,noLocal,noAck,exclusive);
414 	}
415 
416 	CorrelationID sendMessage(string messageBody, string routingKey, string contentType = "text/plain")
417 	{
418 		import std.experimental.logger;
419 		import std.uuid: randomUUID;
420 
421 		auto correlationID = randomUUID().toString();
422 		tracef("sending message: correlationID=%s,replyTo=%s,routingKey=%s",correlationID,replyToQueue,routingKey);
423 
424 		enum deliveryMode = RabbitDeliveryMode.nonPersistent;
425 		enum property = RabbitBasicFlag.replyTo | RabbitBasicFlag.correlationID;
426 		static assert( property == 0x600, "properties do not line up with discovered property using wireshark");
427 		// NB I thought we needed to set basicContent and basicDeliveryMode too but apparently not
428 
429 		auto props = basicProperties( property, contentType, deliveryMode, replyToQueue, correlationID);
430 		conn.basicPublish(params.exchange,routingKey,props, messageBody);
431 		tracef("sent message: %s",correlationID);
432 		return CorrelationID(correlationID);
433 	}
434 
435 	string getResponseBlocking(CorrelationID correlationID = CorrelationID.init,
436 							from!"core.time".Duration timeout = from!"core.time".Duration.init)
437 	{
438 		version(HaveVibe) import vibe.core.core:yield;
439 		import std.exception : enforce;
440 		import std.experimental.logger:trace,tracef;
441 		bool exhausted = false, correlationIDMatch = false;
442 		FrameResult frameResult, frameHeader;
443 		do
444 		{
445 			trace("releasing buffers");
446 			conn.maybeReleaseBuffers();
447 			frameResult = conn.getFrame(timeout);
448 			exhausted = (frameResult.result < 0);
449 			if(!exhausted && frameResult.frame.isBasicDeliverMethod)
450 			{
451 				frameHeader = conn.getFrame(timeout);
452 				exhausted = exhausted || (frameHeader.result < 0);
453 				enforce(exhausted || frameHeader.frame.isFrameHeader, "Expected header");
454 				correlationIDMatch = !exhausted && (correlationID.value.length ==0 || frameHeader.frame.correlationID == correlationID.value);
455 			}
456 			version(HaveVibe) yield();
457 		} while (!exhausted && !correlationIDMatch);
458 
459 		if (frameResult.result < 0 || frameHeader.result < 0)
460 			return "";
461 
462 		size_t bodyTarget = frameHeader.frame.fullBodyLength();
463 		if (bodyTarget ==0)
464 		{
465 			tracef("bodyTarget is zero");
466 			return "";
467 		}
468 		return conn.getBody(bodyTarget);
469 	}
470 
471 	void connect()
472 	{
473 		import std.exception : enforce;
474 		enforce(conn is null, "trying to connect to an open connection");
475 		openSocket();
476 		enforce(socket !is null, "could not open socket");
477 		login(params.credentials.user,params.credentials.pass);
478 		openChannel(params.channel);
479 		createReplyToQueue();
480 	}
481 
482 
483 	auto sendMessage(bool waitForResponse = true)(const(ubyte)[] message, string routingKey,
484 					from!"core.time".Duration timeout = from!"core.time".Duration.init,
485 					string contentType = "text/plain")
486 	{
487 		import std.exception : enforce;
488 		enforce(conn !is null, "trying to send a message to a null connection");
489 		enforce(socket !is null, "sending a message to a null socket");
490 		CorrelationID correlationID = sendMessage((cast(char[])message).idup,routingKey, contentType);
491 		static if (waitForResponse)
492 			return getResponseBlocking(correlationID,timeout);
493 		else
494 			return correlationID;
495 	}
496 }
497 
498 
499 
500 private string correlationID(const(amqp_frame_t) frame)
501 {
502 	auto p = basicProperties(frame);
503 	return p.correlationID;
504 }
505 
506 
507 private string contentType(amqp_basic_properties_t* p)
508 {
509 	import std.exception : enforce;
510     enforce(p !is null);
511     enforce(p.content_type.bytes !is null);
512 	return p.content_type.asString;
513 }
514 
515 private bool hasBasicContent(amqp_basic_properties_t* p)
516 {
517 	return  (p !is null && ((p._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) == AMQP_BASIC_CONTENT_TYPE_FLAG));
518 }
519 
520 private string correlationID(amqp_basic_properties_t* p)
521 {
522 	import std.exception : enforce;
523     enforce(p !is null);
524 	return p.correlation_id.asString;
525 }
526 
527 private bool isBasicDeliverMethod(amqp_frame_t frame)
528 {
529 	return (frame.frame_type == AMQP_FRAME_METHOD) && (frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD);
530 }
531 
532 private bool isFrameHeader(amqp_frame_t frame)
533 {
534 	return frame.frame_type == AMQP_FRAME_HEADER;
535 }
536 
537 private size_t fullBodyLength(amqp_frame_t frame)
538 {
539 	return cast(size_t) frame.payload.properties.body_size;
540 }
541 
542 private string bodyFragment(amqp_frame_t frame)
543 {
544 	import std.exception : enforce;
545     enforce(frame.payload.body_fragment.bytes !is null);
546 	return frame.payload.body_fragment.asString;
547 }
548 
549 private size_t bodyFragmentLength(amqp_frame_t frame)
550 {
551 	return frame.payload.body_fragment.len;
552 }
553 
554 private bool isBodyFrame(amqp_frame_t frame)
555 {
556 	return frame.frame_type == AMQP_FRAME_BODY;
557 }
558 
559 private amqp_basic_properties_t* basicProperties(const(amqp_frame_t) frame)
560 {
561 	// enforce(frame.payload.properties.decoded !is null);
562 	return cast(amqp_basic_properties_t*) frame.payload.properties.decoded;
563 }
564 
565 private string routingKey(amqp_basic_deliver_t* deliver)
566 {
567 	import std.exception : enforce;
568     enforce(deliver !is null);
569     enforce(deliver.routing_key.bytes !is null);
570 	return deliver.routing_key.asString;
571 }
572 
573 private string getExchange(amqp_basic_deliver_t* deliver)
574 {
575 	import std.exception : enforce;
576     enforce(deliver !is null);
577     enforce(deliver.exchange.bytes !is null);
578 	return deliver.exchange.asString;
579 }
580 
581 private amqp_basic_deliver_t* basicDeliver(const(amqp_frame_t) frame)
582 {
583 	import std.exception : enforce;
584 	enforce(frame.payload.method.decoded !is null);
585 	return cast(amqp_basic_deliver_t*) frame.payload.method.decoded;
586 }
587 
588 private string methodName(amqp_frame_t frame)
589 {
590 	import std.exception : enforce;
591 	import std..string:fromStringz;
592 	auto p = amqp_method_name(frame.payload.method.id);
593     enforce(p !is null);
594 	return p.fromStringz.idup;
595 }
596 
597