1 ///
2 module kaleidc.api.dstatsd;
3 
4 import std.format : formattedWrite;
5 
6 import vibe.core.core;
7 import vibe.core.net;
8 
9 import fixedsizearray;
10 
11 ///
12 struct Counter
13 {
14 	const string name;
15 	const long change;
16 	const double sampleRate;
17 
18 	///
19 	this(const string name, const long change = 1, 
20 			const double sampleRate = double.nan) 
21 			@safe pure nothrow @nogc
22 	{
23 		this.name = name;
24 		this.change = change;
25 		this.sampleRate = sampleRate;
26 	}
27 
28 	///
29 	void toString(Buf)(Buf buf, const string prefix) const {
30 		import std.math : isNaN;
31 		if(this.sampleRate.isNaN()) {
32 			formattedWrite(buf, "%s%s:%s|c", prefix, this.name, this.change);
33 		} else {
34 			formattedWrite(buf, "%s%s:%s|c|@%f", prefix, this.name,
35 					this.change, this.sampleRate
36 			);
37 		}
38 	}
39 }
40 
41 ///
42 struct Gauge {
43 	const string name;
44 	const ulong value;
45 
46 	this(const string name, const ulong value)
47 			@safe pure nothrow @nogc
48 	{
49 		this.name = name;
50 		this.value = value;
51 	}
52 
53 	void toString(Buf)(Buf buf, const string prefix) const {
54 		formattedWrite(buf, "%s%s:%s|g", prefix, this.name, this.value);
55 	}
56 }
57 
58 ///
59 struct Timer {
60 	const string name;
61 	const ulong time;
62 
63 	this(const string name, const ulong time)
64 			@safe pure nothrow @nogc
65 	{
66 		this.name = name;
67 		this.time = time;
68 	}
69 
70 	void toString(Buf)(Buf buf, const string prefix) const {
71 		formattedWrite(buf, "%s%s:%s|ms", prefix, this.name, this.time);
72 	}
73 }
74 
75 ///
76 struct Histogram
77 {
78 	const string name;
79 	const ulong value;
80 
81 	this(const string name, const ulong value)
82 			@safe pure nothrow @nogc
83 	{
84 		this.name = name;
85 		this.value = value;
86 	}
87 
88 	void toString(Buf)(Buf buf, const string prefix) const {
89 		formattedWrite(buf, "%s%s:%s|h", prefix, this.name, this.value);
90 	}
91 }
92 
93 ///
94 struct Meter
95 {
96 	const string name;
97 	const ulong increment;
98 
99 	this(const string name, const ulong increment)
100 			@safe pure nothrow @nogc
101 	{
102 		this.name = name;
103 		this.increment = increment;
104 	}
105 
106 	void toString(Buf)(Buf buf, const string prefix) const {
107 		formattedWrite(buf, "%s%s:%s|m", prefix, this.name, this.increment);
108 	}
109 }
110 
111 ///
112 struct Set
113 {
114 	const string name;
115 	const long value;
116 
117 	this(const string name, const long value)
118 			@safe pure nothrow @nogc
119 	{
120 		this.name = name;
121 		this.value = value;
122 	}
123 
124 	void toString(Buf)(Buf buf, const string prefix) const {
125 		formattedWrite(buf, "%s%s:%s|s", prefix, this.name, this.value);
126 	}
127 }
128 
129 ///
130 struct ScopeTimer
131 {
132 	import core.time;
133 
134 	string name;
135 	StatsD service;
136 	MonoTime begin;
137 
138 	this(string name, StatsD service) {
139 		this.name = name;
140 		this.service = service;
141 		this.begin = MonoTime.currTime;
142 	}
143 
144 	~this() {
145 		this.service(Timer(this.name, 
146 					(MonoTime.currTime - this.begin).total!"msecs"())
147 		);
148 	}
149 
150 }
151 
152 ///
153 class StatsD
154 {
155 	private string address;
156 	private ushort port;
157 	private string prefix;
158 	private UDPConnection connection;
159 
160 	this(string address, ushort port, string prefix) {
161 		import std.array : back, empty;
162 		this.address = address;
163 		this.port = port;
164 		if(!prefix.empty && prefix.back != '.') {
165 			this.prefix = prefix ~ ".";
166 		} else {
167 			this.prefix = prefix;
168 		}
169 		this.connection = listenUDP(0);
170 		this.connection.connect(address, port);
171 	}
172 
173 	void handleException(Exception e, const string f = __FILE__, 
174 			const int l = __LINE__) 
175 	{
176 		import std.stdio : writefln;
177 		writefln("%s:%s | %s", f, l, e.toString());
178 	}
179 
180 	void opCall(Values...)(Values values) {
181 		if(values.length == 0) {
182 			return;
183 		}
184 
185 		FixedSizeArray!(char,256 * values.length) buf;
186 
187 		values[0].toString(buf[], this.prefix);
188 
189 		foreach(ref it; values[1 .. $]) {
190 			buf.insertBack('\n');
191 			it.toString(buf[], this.prefix);
192 		}
193 
194 		try {
195 			this.connection.send(cast(ubyte[256*values.length])(buf.store));
196 		} catch(Exception e) {
197 			this.handleException(e);
198 		}
199 	}
200 
201 	final void inc(const string name, const long value = 1, 
202 			const double sampleRate = double.nan)
203 	{
204 		this.opCall(Counter(name, value, sampleRate));
205 	}
206 
207 	final void dec(const string name, const long value = -1, 
208 			const double sampleRate = double.nan) 
209 	{
210 		this.inc(name, value);
211 	}
212 
213 	final void set(const string name, const long value) {
214 		this.opCall(Set(name, value));
215 	}
216 
217 	final void meter(const string name, const ulong value) {
218 		this.opCall(Set(name, value));
219 	}
220 
221 	final void histo(const string name, const ulong value) {
222 		this.opCall(Histogram(name, value));
223 	}
224 
225 	final void time(const string name, const ulong time) {
226 		this.opCall(Timer(name, time));
227 	}
228 
229 	final void gauge(const string name, const ulong value) {
230 		this.opCall(Gauge(name, value));
231 	}
232 
233 }
234 
235 ///
236 unittest
237 {
238 	{
239 		FixedSizeArray!(char,128) textbuf;
240 		string h = "Hello World";
241 		formattedWrite(textbuf[], h);
242 		assert(cast(string)textbuf == h, cast(string)textbuf);
243 	}
244 	{
245 		FixedSizeArray!(char,128) textbuf;
246 		string h = "Hello World %s";
247 		string t = "Hello World 10";
248 		formattedWrite(textbuf[], h, 10);
249 		assert(cast(string)textbuf == t, cast(string)textbuf);
250 	}
251 }
252 
253 ///
254 int main(string[] args)
255 {
256 	import std.stdio;
257 	import std.random;
258 	import core.time;
259 	runTask({
260 		auto udp_listener = listenUDP(1234);
261 		while(true) {
262 			auto pack = udp_listener.recv();
263 			writefln("Got packet: %s", cast(string)pack);
264 			assert((cast(string)pack).length > 0);
265 		}
266 	});
267 	sleep(dur!"msecs"(100));
268 
269 	auto s = new StatsD("127.0.0.1", 1234, "");
270 	foreach(i; 0 .. 20000) {
271 		s(Counter("Foo"), 
272 			Counter("Bar", uniform(-10,10)), 
273 			Timer("Time", uniform(12,260))
274 		);
275 		//sleep(dur!"msecs"(2));
276 	}
277 	{
278 		auto a = ScopeTimer("args", s);
279 		sleep(dur!"msecs"(10));
280 	}
281 	sleep(dur!"msecs"(100));
282 	return 0;
283 }