1 module symmetry.api.rabbitmq.utils; 2 import symmetry.api.rabbitmq.bindings; 3 import symmetry.api.rabbitmq.enums; 4 public import symmetry.api.rabbitmq.platform_utils; 5 import std.stdio:writeln,writef,stderr; 6 import std.ascii; 7 import core.stdc.stdarg; 8 import core.stdc.stdio:stderr,printf,vfprintf; 9 import std.exception; 10 import std.format:format; 11 12 /+ 13 void die(T...)(string formatString,T args) 14 { 15 stderr.writefln(formatString,args); 16 throw new Exception(""); 17 } 18 19 void die(const char *fmt, ...) 20 { 21 import core.stdc.stdio:stderr; 22 va_list ap; 23 va_start(ap, fmt); 24 vfprintf(stderr, fmt, ap); 25 va_end(ap); 26 throw new Exception(""); 27 } 28 void die_on_error(int x, const(char) *context) 29 { 30 enforce(x>=0, format!"%s: %s\n"(context, amqp_error_string2(x))); 31 } 32 +/ 33 void die_on_error(int x, string context) 34 { 35 import std.string:fromStringz; 36 enforce(x>=0, format!"%s:%s"(context,amqp_error_string2(x).fromStringz)); 37 } 38 void die_on_amqp_error(amqp_rpc_reply_t x, string context) 39 { 40 import std.stdio:stderr; 41 switch (x.reply_type) with (RabbitResponseType) 42 { 43 case normal: 44 return; 45 46 case none: 47 throw new Exception(format!"%s: missing RPC reply type!"(context)); 48 49 /+ case libraryException: 50 throw new Exception(format!"%s: %s"(context, amqp_error_string2(x.library_error))); 51 52 case serverException: 53 switch (x.reply.id) { 54 case AMQP_CONNECTION_CLOSE_METHOD: 55 amqp_connection_close_t *m = cast (amqp_connection_close_t *) x.reply.decoded; 56 throw new Exception(format!"%s: server connection error %s, message: %.*s"( 57 context, m.reply_code, 58 cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes)); 59 60 case AMQP_CHANNEL_CLOSE_METHOD: 61 amqp_channel_close_t *m = cast(amqp_channel_close_t *) x.reply.decoded; 62 throw new Exception(format!"%s: server channel error %uh, message: %.*s"( 63 context, 64 m.reply_code, 65 cast(int) m.reply_text.len, cast(char *) m.reply_text.bytes)); 66 default: 67 throw new Exception(format!"%s: unknown server error, method id 0x%08X"(context, x.reply.id)); 68 } 69 +/ 70 default: 71 break; 72 } 73 throw new Exception(format!"unknown Response Type: %s"(x.reply_type)); 74 } 75 void dump_row(long count, int numinrow, int *chs) 76 { 77 int i; 78 79 printf("%08lX:", count - numinrow); 80 81 if (numinrow > 0) { 82 for (i = 0; i < numinrow; i++) { 83 if (i == 8) { 84 printf(" :"); 85 } 86 printf(" %02X", chs[i]); 87 } 88 for (i = numinrow; i < 16; i++) { 89 if (i == 8) { 90 printf(" :"); 91 } 92 printf(" "); 93 } 94 printf(" "); 95 for (i = 0; i < numinrow; i++) { 96 if (isAlphaNum(chs[i])) { 97 printf("%c", chs[i]); 98 } else { 99 printf("."); 100 } 101 } 102 } 103 printf("\n"); 104 } 105 106 int rows_eq(int *a, int *b) 107 { 108 int i; 109 110 for (i=0; i<16; i++) 111 if (a[i] != b[i]) { 112 return 0; 113 } 114 115 return 1; 116 } 117 118 void amqp_dump(ubyte[] buffer) 119 { 120 char *buf = cast(char *) buffer.ptr; 121 size_t len = buffer.length; 122 long count = 0; 123 int numinrow = 0; 124 int[16] chs; 125 int[16] oldchs; 126 int showed_dots; 127 size_t i; 128 129 for (i = 0; i < len; i++) { 130 int ch = buf[i]; 131 132 if (numinrow == 16) { 133 int j; 134 135 if (rows_eq(oldchs.ptr, chs.ptr)) { 136 if (!showed_dots) { 137 showed_dots = 1; 138 printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); 139 } 140 } else { 141 showed_dots = 0; 142 dump_row(count, numinrow, chs.ptr); 143 } 144 145 for (j=0; j<16; j++) { 146 oldchs[j] = chs[j]; 147 } 148 149 numinrow = 0; 150 } 151 152 count++; 153 chs[numinrow++] = ch; 154 } 155 156 dump_row(count, numinrow, chs.ptr); 157 158 if (numinrow != 0) { 159 printf("%08lX:\n", count); 160 } 161 }