1 module kaleidic.api.rabbitmq.utils; 2 import kaleidic.api.rabbitmq; 3 public import kaleidic.api.rabbitmq.platform_utils; 4 import std.stdio:writeln,writef,stderr; 5 import std.ascii; 6 import core.stdc.stdarg; 7 import core.stdc.stdio:stderr,printf,vfprintf; 8 import std.exception; 9 import std.format:format; 10 11 /+ 12 void die(T...)(string formatString,T args) 13 { 14 stderr.writefln(formatString,args); 15 throw new Exception(""); 16 } 17 18 void die(const char *fmt, ...) 19 { 20 import core.stdc.stdio:stderr; 21 va_list ap; 22 va_start(ap, fmt); 23 vfprintf(stderr, fmt, ap); 24 va_end(ap); 25 throw new Exception(""); 26 } 27 void die_on_error(int x, const(char) *context) 28 { 29 enforce(x>=0, format!"%s: %s\n"(context, amqp_error_string2(x))); 30 } 31 +/ 32 void die_on_error(int x, string context) 33 { 34 import std.string:fromStringz; 35 enforce(x>=0, format!"%s:%s"(context,amqp_error_string2(x).fromStringz)); 36 } 37 void die_on_amqp_error(amqp_rpc_reply_t x, string context) 38 { 39 import std.stdio:stderr; 40 switch (x.reply_type) with (ResponseType) 41 { 42 case normal: 43 return; 44 45 case none: 46 throw new Exception(format!"%s: missing RPC reply type!"(context)); 47 48 /+ case libraryException: 49 throw new Exception(format!"%s: %s"(context, amqp_error_string2(x.library_error))); 50 51 case serverException: 52 switch (x.reply.id) { 53 case AMQP_CONNECTION_CLOSE_METHOD: 54 amqp_connection_close_t *m = cast (amqp_connection_close_t *) x.reply.decoded; 55 throw new Exception(format!"%s: server connection error %s, message: %.*s"( 56 context, m.reply_code, 57 cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes)); 58 59 case AMQP_CHANNEL_CLOSE_METHOD: 60 amqp_channel_close_t *m = cast(amqp_channel_close_t *) x.reply.decoded; 61 throw new Exception(format!"%s: server channel error %uh, message: %.*s"( 62 context, 63 m.reply_code, 64 cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes)); 65 default: 66 throw new Exception(format!"%s: unknown server error, method id 0x%08X"(context, x.reply.id)); 67 } 68 +/ 69 default: 70 break; 71 } 72 throw new Exception(format!"unknown Response Type: %s"(x.reply_type)); 73 } 74 void dump_row(long count, int numinrow, int *chs) 75 { 76 int i; 77 78 printf("%08lX:", count - numinrow); 79 80 if (numinrow > 0) { 81 for (i = 0; i < numinrow; i++) { 82 if (i == 8) { 83 printf(" :"); 84 } 85 printf(" %02X", chs[i]); 86 } 87 for (i = numinrow; i < 16; i++) { 88 if (i == 8) { 89 printf(" :"); 90 } 91 printf(" "); 92 } 93 printf(" "); 94 for (i = 0; i < numinrow; i++) { 95 if (isAlphaNum(chs[i])) { 96 printf("%c", chs[i]); 97 } else { 98 printf("."); 99 } 100 } 101 } 102 printf("\n"); 103 } 104 105 int rows_eq(int *a, int *b) 106 { 107 int i; 108 109 for (i=0; i<16; i++) 110 if (a[i] != b[i]) { 111 return 0; 112 } 113 114 return 1; 115 } 116 117 void amqp_dump(ubyte[] buffer) 118 { 119 char *buf = cast(char *) buffer.ptr; 120 size_t len = buffer.length; 121 long count = 0; 122 int numinrow = 0; 123 int[16] chs; 124 int[16] oldchs; 125 int showed_dots; 126 size_t i; 127 128 for (i = 0; i < len; i++) { 129 int ch = buf[i]; 130 131 if (numinrow == 16) { 132 int j; 133 134 if (rows_eq(oldchs.ptr, chs.ptr)) { 135 if (!showed_dots) { 136 showed_dots = 1; 137 printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); 138 } 139 } else { 140 showed_dots = 0; 141 dump_row(count, numinrow, chs.ptr); 142 } 143 144 for (j=0; j<16; j++) { 145 oldchs[j] = chs[j]; 146 } 147 148 numinrow = 0; 149 } 150 151 count++; 152 chs[numinrow++] = ch; 153 } 154 155 dump_row(count, numinrow, chs.ptr); 156 157 if (numinrow != 0) { 158 printf("%08lX:\n", count); 159 } 160 }