1 module symmetry.api.rabbitmq.wrap; 2 import std.exception; 3 4 import symmetry.api.rabbitmq.bindings; 5 import symmetry.api.rabbitmq.enums; 6 import symmetry.api.rabbitmq.utils; 7 import symmetry.api.rabbitmq.platform_utils; 8 9 private template from(string moduleName) 10 { 11 mixin("import from = " ~ moduleName ~ ";"); 12 } 13 14 amqp_bytes_t amqp_string(string s) 15 { 16 import std..string: toStringz; 17 return s.toStringz.amqp_cstring_bytes; 18 } 19 20 amqp_bytes_t asRabbit(ubyte[] buf) 21 { 22 amqp_bytes_t ret; 23 ret.len = buf.length; 24 ret.bytes = cast(void*) buf.ptr; 25 return ret; 26 } 27 28 private string asString(amqp_bytes_t buf) 29 { 30 import std.conv : to; 31 import std.exception : enforce; 32 enforce(buf.bytes !is null); 33 auto p = cast(char*)buf.bytes; 34 return p[0..buf.len].to!string; 35 } 36 37 38 struct RabbitCredentials 39 { 40 string user; 41 string pass; 42 } 43 44 struct ConnectionParams 45 { 46 string hostname; 47 ushort port = 5672; 48 RabbitCredentials credentials; 49 string exchange; 50 ushort channel =1 ; 51 int connectionAttempts = 10; 52 int socketTimeout = 1; 53 bool useSSL = false; 54 string caCert; 55 bool verifyPeer = false; 56 bool verifyHostname = false; 57 string keyFile; 58 string certFile; 59 string rpcQueue; 60 } 61 62 63 64 amqp_basic_properties_t basicProperties(RabbitBasicFlag flags, string contentType, RabbitDeliveryMode deliveryMode, string replyTo, string correlationID) 65 { 66 import std.experimental.logger: tracef; 67 tracef("basic properties: %s",flags); 68 amqp_basic_properties_t props = { 69 _flags: flags, 70 content_type: amqp_string(contentType), 71 delivery_mode: deliveryMode, 72 reply_to: amqp_string(replyTo), 73 correlation_id: amqp_string(correlationID), 74 }; 75 return props; 76 } 77 78 struct AmqpConnectionState 79 { 80 amqp_connection_state_t conn; 81 alias conn this; 82 83 ushort channel; 84 bool isChannelOpen = false; 85 86 this(amqp_connection_state_t conn) 87 { 88 this.conn = conn; 89 } 90 91 void openChannel(ushort channel) 92 { 93 this.channel = channel; 94 conn.amqp_channel_open(channel); 95 die_on_amqp_error(amqp_get_rpc_reply(conn),"opening channel"); 96 this.isChannelOpen = true; 97 } 98 99 ~this() 100 { 101 if(conn !is null) 102 { 103 if (isChannelOpen) 104 { 105 amqp_channel_close(conn,1,ReplySuccess); 106 amqp_connection_close(conn,ReplySuccess); 107 amqp_destroy_connection(conn); 108 conn = null; 109 } 110 } 111 } 112 } 113 114 final class AmqpConnection 115 { 116 import std.typecons: RefCounted; 117 RefCounted!AmqpConnectionState conn; 118 alias conn this; 119 120 this(AmqpConnectionState conn) 121 { 122 this.conn = conn; 123 } 124 125 ushort channel() 126 { 127 return conn.channel; 128 } 129 130 void openChannel(ushort channel) 131 { 132 conn.openChannel(channel); 133 } 134 135 amqp_socket_t* openSocket(string hostname, ushort port, bool useSSL, bool verifyPeer, bool verifyHostname, string caCert, string certFile, string keyFile) 136 { 137 import std.experimental.logger : tracef; 138 import std..string: toStringz; 139 import std.exception : enforce; 140 import std.format: format; 141 tracef("opening new socket: hostname(%s), useSSL (%s), verifyPeer(%s), verifyHostName(%s), caCert(%s), certFile(%s), keyFile(%s)", 142 hostname, useSSL,verifyPeer,verifyHostname,caCert,certFile, keyFile); 143 enforce(conn !is null, "connection is null"); 144 version(OpenSSL) 145 amqp_socket_t* socket = useSSL ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn); 146 else 147 { 148 enforce(!useSSL, "must build rabbitmq-d with OpenSSL version enabled"); 149 amqp_socket_t* socket = amqp_tcp_socket_new(conn); 150 } 151 enforce(socket !is null, "creating rabbit socket"); 152 version(OpenSSL) 153 { 154 if (useSSL) 155 { 156 amqp_ssl_socket_set_verify_peer(socket, verifyPeer ? 1:0); 157 amqp_ssl_socket_set_verify_hostname(socket, verifyHostname ? 1: 0); 158 enforce((caCert.length == 0) || (amqp_ssl_socket_set_cacert(socket, caCert.toStringz) == 0), "setting CA certificate"); 159 enforce((keyFile.length ==0 || certFile.length > 0), "if you specify key-file you must also specify cert-file"); 160 enforce((keyFile.length ==0) || 161 (amqp_ssl_socket_set_key(socket, certFile.toStringz, keyFile.toStringz) == 0), 162 "setting client cert"); 163 } 164 } 165 enforce(amqp_socket_open(socket,hostname.toStringz, port)==0, 166 format!"opening rabbit connection: useSSL %s, verifyPeer %s, verifyHostName %s, caCert %s, keyFile %s" 167 (useSSL,verifyPeer,verifyHostname,caCert,keyFile) 168 ); 169 return socket; 170 } 171 172 void login(string vhost, int channelMax, int frameMax, int heartBeat, SaslMethod saslMethod, string user, string pass) 173 { 174 import std.experimental.logger:infof; 175 import std.range:repeat; 176 import std.exception : enforce; 177 import std..string: toStringz; 178 infof("logging in as user %s, pass %s",user,'*'.repeat(pass.length)); 179 enforce(conn !is null, "connection is null"); 180 const(char)* pVhost = vhost.toStringz; 181 const(char)* pUser = user.toStringz; 182 const(char)* pPass = pass.toStringz; 183 die_on_amqp_error(amqp_login(conn,pVhost,channelMax,frameMax,heartBeat,saslMethod,pUser,pPass),"Logging in"); 184 } 185 186 void consumeQueue(string queueName, bool noLocal, bool noAck, bool exclusive) 187 { 188 import std.experimental.logger; 189 import std.exception : enforce; 190 tracef("consuming queue: %s", queueName); 191 enforce(conn !is null, "connection is null"); 192 auto consumerTag = amqp_empty_bytes; 193 auto amqpArguments = amqp_empty_table; 194 195 auto tag = conn.amqp_basic_consume(channel,amqp_string(queueName),consumerTag,noLocal?1:0,noAck?1:0,exclusive?1:0,amqpArguments); 196 enforce(tag !is null && (tag.consumer_tag.len >0), "basic consume of response to rpc message - tag = " ~ tag.consumer_tag.asString); 197 die_on_amqp_error(amqp_get_rpc_reply(conn),"consuming"); 198 tracef("consumer tag: %s", tag.consumer_tag.asString); 199 } 200 201 amqp_queue_declare_ok_t* 202 declareQueue(string queue, bool isPassive, bool isDurable, bool isExclusive, bool autoDelete, bool noWait, amqp_table_t arguments) 203 { 204 import std.exception : enforce; 205 enforce(conn !is null, "connection is null"); 206 amqp_queue_declare_t req; 207 req.ticket = 0; 208 req.queue = (queue.length ==0 ) ? amqp_empty_bytes : amqp_string(queue); 209 req.passive = isPassive ? 1:0; 210 req.durable = isDurable ? 1:0; 211 req.exclusive = isExclusive ? 1:0; 212 req.auto_delete = autoDelete ? 1:0; 213 req.nowait = noWait ? 1:0; 214 req.arguments = arguments; 215 return cast(amqp_queue_declare_ok_t*) conn.amqp_simple_rpc_decoded(channel,AMQP_QUEUE_DECLARE_METHOD,AMQP_QUEUE_DECLARE_OK_METHOD,&req); 216 } 217 218 string declareReplyToQueue() 219 { 220 import std..string: fromStringz; 221 import std.exception : enforce; 222 enforce(conn !is null, "connection is null"); 223 enum isPassive = false; 224 enum isDurable = false; 225 enum isExclusive = true; 226 enum autoDelete = false; 227 enum noWait = false; 228 229 auto r = declareQueue(null,isPassive,isDurable,isExclusive,autoDelete,noWait,amqp_empty_table); 230 enforce(r !is null, "declaring queue"); 231 die_on_amqp_error(amqp_get_rpc_reply(conn),"declaring queue"); 232 enforce(r.queue.bytes !is null, "reading queue name"); 233 return (cast(char*)r.queue.bytes).fromStringz.idup; 234 } 235 236 void basicPublish(string exchange, string routingKey, ref amqp_basic_properties_t props, string messageBody) 237 { 238 import std.exception : enforce; 239 import std.experimental.logger: tracef; 240 import std..string: toStringz; 241 tracef("basicPublish - channel: %s, exchange: %s, routingKey: %s, messageBody: %s",channel,exchange,routingKey,messageBody); 242 enforce(conn !is null, "connection is null"); 243 enum mandatory = false; 244 enum immediate = false; 245 amqp_bytes_t message; 246 message.bytes = cast(void*)messageBody.toStringz; 247 message.len = messageBody.length+1; 248 auto pExchange = (exchange.length ==0) ? amqp_empty_bytes : amqp_string(exchange); 249 die_on_error(amqp_basic_publish(conn,channel,pExchange,amqp_string(routingKey),mandatory?1:0,immediate?1:0,&props,message), "publishing message"); 250 die_on_amqp_error(amqp_get_rpc_reply(conn),"publishing message"); 251 } 252 253 static AmqpConnection newConnection() 254 { 255 import std.exception : enforce; 256 auto c = amqp_new_connection(); 257 enforce(c!is null, "opening connection"); 258 return new AmqpConnection(AmqpConnectionState(c)); 259 } 260 261 void maybeReleaseBuffers() 262 { 263 import std.exception : enforce; 264 enforce(conn !is null, "connection is null"); 265 amqp_maybe_release_buffers(conn); 266 } 267 268 269 FrameResult simpleWaitFrame(from!"core.time".Duration duration = from!"core.time".Duration.init) 270 { 271 import std.exception : enforce; 272 import core.time: Duration; 273 import std.conv:to; 274 import std.experimental.logger:trace,tracef; 275 FrameResult ret; 276 timeval timeout; 277 278 if(duration != Duration.init) 279 { 280 auto splitDuration = duration.split!("seconds","usecs"); 281 timeout.tv_sec = splitDuration.seconds.to!int; 282 timeout.tv_usec = splitDuration.usecs.to!int; 283 } 284 285 version(TraceFrame) tracef("waiting for frame"); 286 enforce(conn !is null, "connection is null"); 287 ret.result = (duration == Duration.init) ? amqp_simple_wait_frame(conn,&ret.frame) : 288 amqp_simple_wait_frame_noblock(conn,&ret.frame,&timeout); 289 version(TraceFrame) tracef("received frame - result: %s",ret.result); 290 version(TraceFrame) tracef("Frame type: %s channel: %s", (ret.result < 0) ? '-' : ret.frame.frame_type,(ret.result<0)? 0: ret.frame.channel); 291 return ret; 292 } 293 294 FrameResult getFrame(from!"core.time".Duration timeout = from!"core.time".Duration.init) 295 { 296 version(HaveVibe) import vibe.core.core:yield; 297 import std.experimental.logger:trace,tracef; 298 import core.time:dur, Duration; 299 import std.datetime:Clock; 300 import std.exception : enforce; 301 enforce(conn !is null, "connection is null"); 302 FrameResult frame; 303 enum duration = dur!"msecs"(100); 304 auto startTime = Clock.currTime(); // TODO - monotime 305 Duration elapsedTime; 306 do 307 { 308 version(HaveVibe) yield(); 309 frame = simpleWaitFrame(duration); 310 auto nowTime = Clock.currTime(); 311 elapsedTime = nowTime - startTime; 312 } while(frame.result == AMQP_STATUS_TIMEOUT && ((timeout == Duration.init)||(elapsedTime<timeout))); 313 if (timeout != Duration.init && elapsedTime >= timeout) 314 return frame; 315 if(frame.result >=0 && frame.frame.isBasicDeliverMethod) 316 { 317 amqp_basic_deliver_t* d= basicDeliver(frame.frame); 318 tracef("Delivery: %s exchange: %s routingKey: %s", d.delivery_tag, d.getExchange, d.routingKey); 319 auto p = basicProperties(frame.frame); 320 if(p.hasBasicContent) 321 tracef("Content-type: %s", p.contentType); 322 } 323 return frame; 324 } 325 326 string getBody(size_t bodyTarget,from!"core.time".Duration timeout = from!"core.time".Duration.init) 327 { 328 import std.experimental.logger:trace,tracef; 329 import std.array:Appender; 330 import std.exception : enforce; 331 import std.format: format; 332 Appender!string ret; 333 334 enforce(conn !is null, "connection is null"); 335 size_t bodyReceived = 0; 336 FrameResult frame; 337 do 338 { 339 frame = getFrame(timeout); 340 if (frame.result <0) 341 { 342 tracef("result code %s when receiving frame body", frame.result); 343 } 344 else 345 { 346 enforce(frame.frame.isBodyFrame, "expected body frame"); 347 bodyReceived += frame.frame.bodyFragmentLength; 348 ret.put(frame.frame.bodyFragment); 349 enforce(bodyReceived <=bodyTarget); 350 } 351 } 352 while(frame.result >=0 && (bodyReceived < bodyTarget)); 353 enforce(bodyReceived == bodyTarget, format!"bodyReceived (%s) != bodyTarget(%s)"(bodyReceived,bodyTarget)); 354 return ret.data; 355 } 356 } 357 358 struct FrameResult 359 { 360 int result; 361 amqp_frame_t frame; 362 } 363 364 struct CorrelationID 365 { 366 string value; 367 } 368 369 struct RabbitClient 370 { 371 AmqpConnection conn; 372 amqp_socket_t* socket; 373 ConnectionParams params; 374 string replyToQueue; 375 376 this(ConnectionParams params) 377 { 378 this.params = params; 379 } 380 381 private void openSocket() 382 { 383 import std.exception : enforce; 384 import std.experimental.logger: tracef; 385 tracef("opening new socket: %s",params); 386 enforce(socket is null, "trying to openSocket when it is already open"); 387 conn = AmqpConnection.newConnection(); 388 socket = conn.openSocket(params.hostname,params.port,params.useSSL,params.verifyPeer,params.verifyHostname,params.caCert,params.certFile,params.keyFile); 389 } 390 391 private void login(string user, string pass) 392 { 393 conn.login("/",0,131_072,0,SaslMethod.plain,user,pass); 394 } 395 396 private void openChannel(ushort channel) 397 { 398 conn.openChannel(channel); 399 } 400 401 private void createReplyToQueue() 402 { 403 import std.experimental.logger; 404 enum noLocal = true; // do not send messages to connection that published them 405 enum noAck = true; // do not expect acknowledgements to messages 406 enum exclusive = true; // only this consumer can access the queue 407 408 tracef("creating replyToQueue"); 409 tracef("first declaring replyToQueue"); 410 replyToQueue = conn.declareReplyToQueue(); 411 tracef("declare succeeded - reply to queue:%s",replyToQueue); 412 tracef("consuming queue"); 413 conn.consumeQueue(replyToQueue,noLocal,noAck,exclusive); 414 } 415 416 CorrelationID sendMessage(string messageBody, string routingKey, string contentType = "text/plain") 417 { 418 import std.experimental.logger; 419 import std.uuid: randomUUID; 420 421 auto correlationID = randomUUID().toString(); 422 tracef("sending message: correlationID=%s,replyTo=%s,routingKey=%s",correlationID,replyToQueue,routingKey); 423 424 enum deliveryMode = RabbitDeliveryMode.nonPersistent; 425 enum property = RabbitBasicFlag.replyTo | RabbitBasicFlag.correlationID; 426 static assert( property == 0x600, "properties do not line up with discovered property using wireshark"); 427 // NB I thought we needed to set basicContent and basicDeliveryMode too but apparently not 428 429 auto props = basicProperties( property, contentType, deliveryMode, replyToQueue, correlationID); 430 conn.basicPublish(params.exchange,routingKey,props, messageBody); 431 tracef("sent message: %s",correlationID); 432 return CorrelationID(correlationID); 433 } 434 435 string getResponseBlocking(CorrelationID correlationID = CorrelationID.init, 436 from!"core.time".Duration timeout = from!"core.time".Duration.init) 437 { 438 version(HaveVibe) import vibe.core.core:yield; 439 import std.exception : enforce; 440 import std.experimental.logger:trace,tracef; 441 bool exhausted = false, correlationIDMatch = false; 442 FrameResult frameResult, frameHeader; 443 do 444 { 445 trace("releasing buffers"); 446 conn.maybeReleaseBuffers(); 447 frameResult = conn.getFrame(timeout); 448 exhausted = (frameResult.result < 0); 449 if(!exhausted && frameResult.frame.isBasicDeliverMethod) 450 { 451 frameHeader = conn.getFrame(timeout); 452 exhausted = exhausted || (frameHeader.result < 0); 453 enforce(exhausted || frameHeader.frame.isFrameHeader, "Expected header"); 454 correlationIDMatch = !exhausted && (correlationID.value.length ==0 || frameHeader.frame.correlationID == correlationID.value); 455 } 456 version(HaveVibe) yield(); 457 } while (!exhausted && !correlationIDMatch); 458 459 if (frameResult.result < 0 || frameHeader.result < 0) 460 return ""; 461 462 size_t bodyTarget = frameHeader.frame.fullBodyLength(); 463 if (bodyTarget ==0) 464 { 465 tracef("bodyTarget is zero"); 466 return ""; 467 } 468 return conn.getBody(bodyTarget); 469 } 470 471 void connect() 472 { 473 import std.exception : enforce; 474 enforce(conn is null, "trying to connect to an open connection"); 475 openSocket(); 476 enforce(socket !is null, "could not open socket"); 477 login(params.credentials.user,params.credentials.pass); 478 openChannel(params.channel); 479 createReplyToQueue(); 480 } 481 482 483 auto sendMessage(bool waitForResponse = true)(const(ubyte)[] message, string routingKey, 484 from!"core.time".Duration timeout = from!"core.time".Duration.init, 485 string contentType = "text/plain") 486 { 487 import std.exception : enforce; 488 enforce(conn !is null, "trying to send a message to a null connection"); 489 enforce(socket !is null, "sending a message to a null socket"); 490 CorrelationID correlationID = sendMessage((cast(char[])message).idup,routingKey, contentType); 491 static if (waitForResponse) 492 return getResponseBlocking(correlationID,timeout); 493 else 494 return correlationID; 495 } 496 } 497 498 499 500 private string correlationID(const(amqp_frame_t) frame) 501 { 502 auto p = basicProperties(frame); 503 return p.correlationID; 504 } 505 506 507 private string contentType(amqp_basic_properties_t* p) 508 { 509 import std.exception : enforce; 510 enforce(p !is null); 511 enforce(p.content_type.bytes !is null); 512 return p.content_type.asString; 513 } 514 515 private bool hasBasicContent(amqp_basic_properties_t* p) 516 { 517 return (p !is null && ((p._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) == AMQP_BASIC_CONTENT_TYPE_FLAG)); 518 } 519 520 private string correlationID(amqp_basic_properties_t* p) 521 { 522 import std.exception : enforce; 523 enforce(p !is null); 524 return p.correlation_id.asString; 525 } 526 527 private bool isBasicDeliverMethod(amqp_frame_t frame) 528 { 529 return (frame.frame_type == AMQP_FRAME_METHOD) && (frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD); 530 } 531 532 private bool isFrameHeader(amqp_frame_t frame) 533 { 534 return frame.frame_type == AMQP_FRAME_HEADER; 535 } 536 537 private size_t fullBodyLength(amqp_frame_t frame) 538 { 539 return cast(size_t) frame.payload.properties.body_size; 540 } 541 542 private string bodyFragment(amqp_frame_t frame) 543 { 544 import std.exception : enforce; 545 enforce(frame.payload.body_fragment.bytes !is null); 546 return frame.payload.body_fragment.asString; 547 } 548 549 private size_t bodyFragmentLength(amqp_frame_t frame) 550 { 551 return frame.payload.body_fragment.len; 552 } 553 554 private bool isBodyFrame(amqp_frame_t frame) 555 { 556 return frame.frame_type == AMQP_FRAME_BODY; 557 } 558 559 private amqp_basic_properties_t* basicProperties(const(amqp_frame_t) frame) 560 { 561 // enforce(frame.payload.properties.decoded !is null); 562 return cast(amqp_basic_properties_t*) frame.payload.properties.decoded; 563 } 564 565 private string routingKey(amqp_basic_deliver_t* deliver) 566 { 567 import std.exception : enforce; 568 enforce(deliver !is null); 569 enforce(deliver.routing_key.bytes !is null); 570 return deliver.routing_key.asString; 571 } 572 573 private string getExchange(amqp_basic_deliver_t* deliver) 574 { 575 import std.exception : enforce; 576 enforce(deliver !is null); 577 enforce(deliver.exchange.bytes !is null); 578 return deliver.exchange.asString; 579 } 580 581 private amqp_basic_deliver_t* basicDeliver(const(amqp_frame_t) frame) 582 { 583 import std.exception : enforce; 584 enforce(frame.payload.method.decoded !is null); 585 return cast(amqp_basic_deliver_t*) frame.payload.method.decoded; 586 } 587 588 private string methodName(amqp_frame_t frame) 589 { 590 import std.exception : enforce; 591 import std..string:fromStringz; 592 auto p = amqp_method_name(frame.payload.method.id); 593 enforce(p !is null); 594 return p.fromStringz.idup; 595 } 596 597