1 module kaleidic.api.rabbitmq.utils; 2 import kaleidic.api.rabbitmq; 3 public import kaleidic.api.rabbitmq.platform_utils; 4 import std.stdio:writeln,writef; 5 import std.ascii; 6 import core.stdc.stdarg; 7 import core.stdc.stdio:stderr,printf,vfprintf; 8 import std.exception; 9 import std.format:format; 10 11 void die(const char *fmt, ...); 12 void die(const char *fmt, ...) 13 { 14 import core.stdc.stdio:stderr; 15 va_list ap; 16 va_start(ap, fmt); 17 vfprintf(stderr, fmt, ap); 18 va_end(ap); 19 throw new Exception(""); 20 } 21 22 void die_on_error(int x, const(char) *context) 23 { 24 enforce(x>=0, format!"%s: %s\n"(context, amqp_error_string2(x))); 25 } 26 27 void die_on_amqp_error(amqp_rpc_reply_t x, string context) 28 { 29 import std.stdio:stderr; 30 switch (x.reply_type) with (ResponseType) 31 { 32 case normal: 33 return; 34 35 case none: 36 throw new Exception(format!"%s: missing RPC reply type!"(context)); 37 38 case libraryException: 39 throw new Exception(format!"%s: %s"(context, amqp_error_string2(x.library_error))); 40 41 case serverException: 42 switch (x.reply.id) { 43 case AMQP_CONNECTION_CLOSE_METHOD: 44 amqp_connection_close_t *m = cast (amqp_connection_close_t *) x.reply.decoded; 45 throw new Exception(format!"%s: server connection error %s, message: %.*s"( 46 context, m.reply_code, 47 cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes)); 48 49 case AMQP_CHANNEL_CLOSE_METHOD: 50 amqp_channel_close_t *m = cast(amqp_channel_close_t *) x.reply.decoded; 51 throw new Exception(format!"%s: server channel error %uh, message: %.*s"( 52 context, 53 m.reply_code, 54 cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes)); 55 56 default: 57 throw new Exception(format!"%s: unknown server error, method id 0x%08X"(context, x.reply.id)); 58 } 59 60 default: 61 break; 62 } 63 64 throw new Exception(format!"unknown Response Type: %s"(x.reply_type)); 65 } 66 67 void dump_row(long count, int numinrow, int *chs) 68 { 69 int i; 70 71 printf("%08lX:", count - numinrow); 72 73 if (numinrow > 0) { 74 for (i = 0; i < numinrow; i++) { 75 if (i == 8) { 76 printf(" :"); 77 } 78 printf(" %02X", chs[i]); 79 } 80 for (i = numinrow; i < 16; i++) { 81 if (i == 8) { 82 printf(" :"); 83 } 84 printf(" "); 85 } 86 printf(" "); 87 for (i = 0; i < numinrow; i++) { 88 if (isAlphaNum(chs[i])) { 89 printf("%c", chs[i]); 90 } else { 91 printf("."); 92 } 93 } 94 } 95 printf("\n"); 96 } 97 98 int rows_eq(int *a, int *b) 99 { 100 int i; 101 102 for (i=0; i<16; i++) 103 if (a[i] != b[i]) { 104 return 0; 105 } 106 107 return 1; 108 } 109 110 void amqp_dump(ubyte[] buffer) 111 { 112 char *buf = cast(char *) buffer.ptr; 113 size_t len = buffer.length; 114 long count = 0; 115 int numinrow = 0; 116 int[16] chs; 117 int[16] oldchs; 118 int showed_dots; 119 size_t i; 120 121 for (i = 0; i < len; i++) { 122 int ch = buf[i]; 123 124 if (numinrow == 16) { 125 int j; 126 127 if (rows_eq(oldchs.ptr, chs.ptr)) { 128 if (!showed_dots) { 129 showed_dots = 1; 130 printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); 131 } 132 } else { 133 showed_dots = 0; 134 dump_row(count, numinrow, chs.ptr); 135 } 136 137 for (j=0; j<16; j++) { 138 oldchs[j] = chs[j]; 139 } 140 141 numinrow = 0; 142 } 143 144 count++; 145 chs[numinrow++] = ch; 146 } 147 148 dump_row(count, numinrow, chs.ptr); 149 150 if (numinrow != 0) { 151 printf("%08lX:\n", count); 152 } 153 }