1 /// 2 module kaleidc.api.dstatsd; 3 4 import std.format : formattedWrite; 5 6 import vibe.core.core; 7 import vibe.core.net; 8 9 import fixedsizearray; 10 11 /// 12 struct Counter 13 { 14 const string name; 15 const long change; 16 const double sampleRate; 17 18 /// 19 this(const string name, const long change = 1, 20 const double sampleRate = double.nan) 21 @safe pure nothrow @nogc 22 { 23 this.name = name; 24 this.change = change; 25 this.sampleRate = sampleRate; 26 } 27 28 /// 29 void toString(Buf)(Buf buf, const string prefix) const { 30 import std.math : isNaN; 31 if(this.sampleRate.isNaN()) { 32 formattedWrite(buf, "%s%s:%s|c", prefix, this.name, this.change); 33 } else { 34 formattedWrite(buf, "%s%s:%s|c|@%f", prefix, this.name, 35 this.change, this.sampleRate 36 ); 37 } 38 } 39 } 40 41 /// 42 struct Gauge { 43 const string name; 44 const ulong value; 45 46 this(const string name, const ulong value) 47 @safe pure nothrow @nogc 48 { 49 this.name = name; 50 this.value = value; 51 } 52 53 void toString(Buf)(Buf buf, const string prefix) const { 54 formattedWrite(buf, "%s%s:%s|g", prefix, this.name, this.value); 55 } 56 } 57 58 /// 59 struct Timer { 60 const string name; 61 const ulong time; 62 63 this(const string name, const ulong time) 64 @safe pure nothrow @nogc 65 { 66 this.name = name; 67 this.time = time; 68 } 69 70 void toString(Buf)(Buf buf, const string prefix) const { 71 formattedWrite(buf, "%s%s:%s|ms", prefix, this.name, this.time); 72 } 73 } 74 75 /// 76 struct Histogram 77 { 78 const string name; 79 const ulong value; 80 81 this(const string name, const ulong value) 82 @safe pure nothrow @nogc 83 { 84 this.name = name; 85 this.value = value; 86 } 87 88 void toString(Buf)(Buf buf, const string prefix) const { 89 formattedWrite(buf, "%s%s:%s|h", prefix, this.name, this.value); 90 } 91 } 92 93 /// 94 struct Meter 95 { 96 const string name; 97 const ulong increment; 98 99 this(const string name, const ulong increment) 100 @safe pure nothrow @nogc 101 { 102 this.name = name; 103 this.increment = increment; 104 } 105 106 void toString(Buf)(Buf buf, const string prefix) const { 107 formattedWrite(buf, "%s%s:%s|m", prefix, this.name, this.increment); 108 } 109 } 110 111 /// 112 struct Set 113 { 114 const string name; 115 const long value; 116 117 this(const string name, const long value) 118 @safe pure nothrow @nogc 119 { 120 this.name = name; 121 this.value = value; 122 } 123 124 void toString(Buf)(Buf buf, const string prefix) const { 125 formattedWrite(buf, "%s%s:%s|s", prefix, this.name, this.value); 126 } 127 } 128 129 /// 130 struct ScopeTimer 131 { 132 import core.time; 133 134 string name; 135 StatsD service; 136 MonoTime begin; 137 138 this(string name, StatsD service) { 139 this.name = name; 140 this.service = service; 141 this.begin = MonoTime.currTime; 142 } 143 144 ~this() { 145 this.service(Timer(this.name, 146 (MonoTime.currTime - this.begin).total!"msecs"()) 147 ); 148 } 149 150 } 151 152 /// 153 class StatsD 154 { 155 private string address; 156 private ushort port; 157 private string prefix; 158 private UDPConnection connection; 159 160 this(string address, ushort port, string prefix) { 161 import std.array : back, empty; 162 this.address = address; 163 this.port = port; 164 if(!prefix.empty && prefix.back != '.') { 165 this.prefix = prefix ~ "."; 166 } else { 167 this.prefix = prefix; 168 } 169 this.connection = listenUDP(0); 170 this.connection.connect(address, port); 171 } 172 173 void handleException(Exception e, const string f = __FILE__, 174 const int l = __LINE__) 175 { 176 import std.stdio : writefln; 177 writefln("%s:%s | %s", f, l, e.toString()); 178 } 179 180 void opCall(Values...)(Values values) { 181 if(values.length == 0) { 182 return; 183 } 184 185 FixedSizeArray!(char,256 * values.length) buf; 186 187 values[0].toString(buf[], this.prefix); 188 189 foreach(ref it; values[1 .. $]) { 190 buf.insertBack('\n'); 191 it.toString(buf[], this.prefix); 192 } 193 194 try { 195 this.connection.send(cast(ubyte[256*values.length])(buf.store)); 196 } catch(Exception e) { 197 this.handleException(e); 198 } 199 } 200 201 final void inc(const string name, const long value = 1, 202 const double sampleRate = double.nan) 203 { 204 this.opCall(Counter(name, value, sampleRate)); 205 } 206 207 final void dec(const string name, const long value = -1, 208 const double sampleRate = double.nan) 209 { 210 this.inc(name, value); 211 } 212 213 final void set(const string name, const long value) { 214 this.opCall(Set(name, value)); 215 } 216 217 final void meter(const string name, const ulong value) { 218 this.opCall(Set(name, value)); 219 } 220 221 final void histo(const string name, const ulong value) { 222 this.opCall(Histogram(name, value)); 223 } 224 225 final void time(const string name, const ulong time) { 226 this.opCall(Timer(name, time)); 227 } 228 229 final void gauge(const string name, const ulong value) { 230 this.opCall(Gauge(name, value)); 231 } 232 233 } 234 235 /// 236 unittest 237 { 238 { 239 FixedSizeArray!(char,128) textbuf; 240 string h = "Hello World"; 241 formattedWrite(textbuf[], h); 242 assert(cast(string)textbuf == h, cast(string)textbuf); 243 } 244 { 245 FixedSizeArray!(char,128) textbuf; 246 string h = "Hello World %s"; 247 string t = "Hello World 10"; 248 formattedWrite(textbuf[], h, 10); 249 assert(cast(string)textbuf == t, cast(string)textbuf); 250 } 251 } 252 253 /// 254 int main(string[] args) 255 { 256 import std.stdio; 257 import std.random; 258 import core.time; 259 runTask({ 260 auto udp_listener = listenUDP(1234); 261 while(true) { 262 auto pack = udp_listener.recv(); 263 writefln("Got packet: %s", cast(string)pack); 264 assert((cast(string)pack).length > 0); 265 } 266 }); 267 sleep(dur!"msecs"(100)); 268 269 auto s = new StatsD("127.0.0.1", 1234, ""); 270 foreach(i; 0 .. 20000) { 271 s(Counter("Foo"), 272 Counter("Bar", uniform(-10,10)), 273 Timer("Time", uniform(12,260)) 274 ); 275 //sleep(dur!"msecs"(2)); 276 } 277 { 278 auto a = ScopeTimer("args", s); 279 sleep(dur!"msecs"(10)); 280 } 281 sleep(dur!"msecs"(100)); 282 return 0; 283 }