1 module kaleidic.api.rabbitmq.utils;
2 import kaleidic.api.rabbitmq;
3 public import kaleidic.api.rabbitmq.platform_utils;
4 import std.stdio:writeln,writef;
5 import std.ascii;
6 import core.stdc.stdarg;
7 import core.stdc.stdio:stderr,printf,vfprintf;
8 import std.exception;
9 import std.format:format;
10 
11 void die(const char *fmt, ...);
12 void die(const char *fmt, ...)
13 {
14 	import core.stdc.stdio:stderr;
15 	va_list ap;
16 	va_start(ap, fmt);
17 	vfprintf(stderr, fmt, ap);
18 	va_end(ap);
19 	throw new Exception("");
20 }
21 
22 void die_on_error(int x, const(char) *context)
23 {
24 	enforce(x>=0, format!"%s: %s\n"(context, amqp_error_string2(x)));
25 }
26 
27 void die_on_amqp_error(amqp_rpc_reply_t x, string context)
28 {
29 	import std.stdio:stderr;
30 	switch (x.reply_type) with (ResponseType)
31 	{
32 		case normal:
33 			return;
34 
35 		case none:
36 			throw new Exception(format!"%s: missing RPC reply type!"(context));
37 
38 		case libraryException:
39 			throw new Exception(format!"%s: %s"(context, amqp_error_string2(x.library_error)));
40 
41 		case serverException:
42 			switch (x.reply.id) {
43 				case AMQP_CONNECTION_CLOSE_METHOD:
44 					amqp_connection_close_t *m = cast (amqp_connection_close_t *) x.reply.decoded;
45 					throw new Exception(format!"%s: server connection error %s, message: %.*s"(
46 						context, m.reply_code,
47 						cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes));
48     
49 				case AMQP_CHANNEL_CLOSE_METHOD:
50 				      amqp_channel_close_t *m = cast(amqp_channel_close_t *) x.reply.decoded;
51 				      throw new Exception(format!"%s: server channel error %uh, message: %.*s"(
52 					      context,
53 					      m.reply_code,
54 					      cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes));
55 
56 			    default:
57 			      throw new Exception(format!"%s: unknown server error, method id 0x%08X"(context, x.reply.id));
58 		    }
59 
60 		default:
61 		    break;
62 	}
63 
64 	throw new Exception(format!"unknown Response Type: %s"(x.reply_type));
65 }
66 
67 void dump_row(long count, int numinrow, int *chs)
68 {
69   int i;
70 
71   printf("%08lX:", count - numinrow);
72 
73   if (numinrow > 0) {
74     for (i = 0; i < numinrow; i++) {
75       if (i == 8) {
76         printf(" :");
77       }
78       printf(" %02X", chs[i]);
79     }
80     for (i = numinrow; i < 16; i++) {
81       if (i == 8) {
82         printf(" :");
83       }
84       printf("   ");
85     }
86     printf("  ");
87     for (i = 0; i < numinrow; i++) {
88       if (isAlphaNum(chs[i])) {
89         printf("%c", chs[i]);
90       } else {
91         printf(".");
92       }
93     }
94   }
95   printf("\n");
96 }
97 
98 int rows_eq(int *a, int *b)
99 {
100   int i;
101 
102   for (i=0; i<16; i++)
103     if (a[i] != b[i]) {
104       return 0;
105     }
106 
107   return 1;
108 }
109 
110 void amqp_dump(ubyte[] buffer)
111 {
112   char *buf = cast(char *) buffer.ptr;
113   size_t len = buffer.length;
114   long count = 0;
115   int numinrow = 0;
116   int[16] chs;
117   int[16] oldchs;
118   int showed_dots;
119   size_t i;
120 
121   for (i = 0; i < len; i++) {
122     int ch = buf[i];
123 
124     if (numinrow == 16) {
125       int j;
126 
127       if (rows_eq(oldchs.ptr, chs.ptr)) {
128         if (!showed_dots) {
129           showed_dots = 1;
130           printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
131         }
132       } else {
133         showed_dots = 0;
134         dump_row(count, numinrow, chs.ptr);
135       }
136 
137       for (j=0; j<16; j++) {
138         oldchs[j] = chs[j];
139       }
140 
141       numinrow = 0;
142     }
143 
144     count++;
145     chs[numinrow++] = ch;
146   }
147 
148   dump_row(count, numinrow, chs.ptr);
149 
150   if (numinrow != 0) {
151     printf("%08lX:\n", count);
152   }
153 }