1 module kaleidic.api.rabbitmq.examples.bind;
2 import std.stdio;
3 import std.string;
4 
5 import kaleidic.api.rabbitmq.rabbitmq;
6 import utils;
7 
8 enum SUMMARY_EVERY_US = 1000000;
9 
10 static void run(amqp_connection_state_t conn)
11 {
12   uint64_t start_time = now_microseconds();
13   int received = 0;
14   int previous_received = 0;
15   uint64_t previous_report_time = start_time;
16   uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
17 
18   amqp_frame_t frame;
19 
20   uint64_t now;
21 
22   for (;;) {
23     amqp_rpc_reply_t ret;
24     amqp_envelope_t envelope;
25 
26     now = now_microseconds();
27     if (now > next_summary_time) {
28       int countOverInterval = received - previous_received;
29       double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
30       printf("%d ms: Received %d - %d since last report (%d Hz)\n",
31              (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
32 
33       previous_received = received;
34       previous_report_time = now;
35       next_summary_time += SUMMARY_EVERY_US;
36     }
37 
38     amqp_maybe_release_buffers(conn);
39     ret = amqp_consume_message(conn, &envelope, NULL, 0);
40 
41     if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
42       if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
43           AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
44         if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
45           return;
46         }
47 
48         if (AMQP_FRAME_METHOD == frame.frame_type) {
49           switch (frame.payload.method.id) {
50             case AMQP_BASIC_ACK_METHOD:
51               /* if we've turned publisher confirms on, and we've published a message
52                * here is a message being confirmed
53                */
54 
55               break;
56             case AMQP_BASIC_RETURN_METHOD:
57               /* if a published message couldn't be routed and the mandatory flag was set
58                * this is what would be returned. The message then needs to be read.
59                */
60               {
61                 amqp_message_t message;
62                 ret = amqp_read_message(conn, frame.channel, &message, 0);
63                 if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
64                   return;
65                 }
66 
67                 amqp_destroy_message(&message);
68               }
69 
70               break;
71 
72             case AMQP_CHANNEL_CLOSE_METHOD:
73               /* a channel.close method happens when a channel exception occurs, this
74                * can happen by publishing to an exchange that doesn't exist for example
75                *
76                * In this case you would need to open another channel redeclare any queues
77                * that were declared auto-delete, and restart any consumers that were attached
78                * to the previous channel
79                */
80               return;
81 
82             case AMQP_CONNECTION_CLOSE_METHOD:
83               /* a connection.close method happens when a connection exception occurs,
84                * this can happen by trying to use a channel that isn't open for example.
85                *
86                * In this case the whole connection must be restarted.
87                */
88               return;
89 
90             default:
91               fprintf(stderr ,"An unexpected method was received %u\n", frame.payload.method.id);
92               return;
93           }
94         }
95       }
96 
97     } else {
98       amqp_destroy_envelope(&envelope);
99     }
100 
101 
102     received++;
103   }
104 }
105 
106 int main(int argc, char const *const *argv)
107 {
108   char const *hostname;
109   int port, status;
110   char const *exchange;
111   char const *bindingkey;
112   amqp_socket_t *socket;
113   amqp_connection_state_t conn;
114   amqp_bytes_t queuename;
115 
116   if (argc < 3) {
117     fprintf(stderr, "Usage: amqps_consumer host port "
118             "[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n");
119     return 1;
120   }
121 
122   hostname = argv[1];
123   port = atoi(argv[2]);
124   exchange = "amq.direct"; /* argv[3]; */
125   bindingkey = "test queue"; /* argv[4]; */
126 
127   conn = amqp_new_connection();
128 
129   socket = amqp_ssl_socket_new(conn);
130   if (!socket) {
131     die("creating SSL/TLS socket");
132   }
133 
134   amqp_ssl_socket_set_verify_peer(socket, 0);
135   amqp_ssl_socket_set_verify_hostname(socket, 0);
136 
137   if (argc > 3) {
138     int nextarg = 4;
139     status = amqp_ssl_socket_set_cacert(socket, argv[3]);
140     if (status) {
141       die("setting CA certificate");
142     }
143     if (argc > nextarg && !strcmp("verifypeer", argv[nextarg])) {
144       amqp_ssl_socket_set_verify_peer(socket, 1);
145       nextarg++;
146     }
147     if (argc > nextarg && !strcmp("verifyhostname", argv[nextarg])) {
148       amqp_ssl_socket_set_verify_hostname(socket, 1);
149       nextarg++;
150     }
151     if (argc > nextarg + 1) {
152       status =
153           amqp_ssl_socket_set_key(socket, argv[nextarg + 1], argv[nextarg]);
154       if (status) {
155         die("setting client key");
156       }
157     }
158   }
159 
160   status = amqp_socket_open(socket, hostname, port);
161   if (status) {
162     die("opening SSL/TLS connection");
163   }
164 
165   die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
166                     "Logging in");
167   amqp_channel_open(conn, 1);
168   die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
169 
170   {
171     amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
172                                  amqp_empty_table);
173     die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
174     queuename = amqp_bytes_malloc_dup(r->queue);
175     if (queuename.bytes == NULL) {
176       fprintf(stderr, "Out of memory while copying queue name");
177       return 1;
178     }
179   }
180 
181   amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
182                   amqp_empty_table);
183   die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
184 
185   amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
186   die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
187 
188   run(conn);
189 
190   die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
191   die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
192   die_on_error(amqp_destroy_connection(conn), "Ending connection");
193 
194   return 0;
195 }