1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """Classes used to collect data"""
20
21 import os
22 import os.path
23 from haizea.common.utils import pickle, get_clock
24 from errno import EEXIST
25
27 """A container for all the accounting data. When Haizea saves
28 accounting data, it does so by pickling an object of this class.
29 """
30
32 """Initializes all the counters and data to empty values"""
33
34 self.counters = {}
35 self.counter_avg_type = {}
36
37
38 self.lease_stats_names = []
39 self.lease_stats = {}
40
41
42 self.stats_names = []
43 self.stats = {}
44
45
46 self.leases = {}
47
48
49 self.attrs = {}
50
51 self.starttime = None
52
53
55 """Accounting data collection
56
57 This class provides a framework to collect data while Haizea is running.
58 It is designed so that the code that collects the data is placed in
59 separate classes (the "probes"; a probe must be a child class of
60 AccountingProbe). Probes can collect three types of data:
61
62 Accounting probes can collect three types of data:
63
64 - Per-lease data: Data attributable to individual leases or derived
65 from how each lease was scheduled.
66 - Per-run data: Data from a single run of Haizea.
67 - Counters: A counter is a time-ordered list showing how some metric
68 varied throughout a single run of Haizea.
69
70 The the AccountingDataCollection class takes care of calling these
71 probes at three points when Haizea is running:
72 (1) at every time step, (2) when a lease is requested, and (3) when a
73 lease is done.
74
75 """
76
77 AVERAGE_NONE=0
78 AVERAGE_NORMAL=1
79 AVERAGE_TIMEWEIGHTED=2
80
81
83 """Constructor
84
85 @param datafile: Path to file where accounting data will be saved
86 @type datafile: C{str}
87 @param attrs: Run attributes
88 @type attrs: C{dict}
89 """
90 self.__data = AccountingData()
91 self.__datafile = datafile
92 self.__probes = []
93
94 self.__data.attrs = attrs
95
96
98 """Adds a new accounting probe
99
100 @param probe: Probe to add
101 @type probe: L{AccountingProbe}
102 """
103 self.__probes.append(probe)
104
105
107 """Adds a new counter.
108
109 Counters can store not just the value of the counter throughout
110 time, but also a running average. This is specified with the
111 avgtype parameter, which can be equal to:
112
113 - AccountingDataCollection.AVERAGE_NONE: Don't compute an average
114 - AccountingDataCollection.AVERAGE_NORMAL: For each entry, compute
115 the average of all the values including and preceding that entry.
116 - AccountingDataCollection.AVERAGE_TIMEWEIGHTED: For each entry,
117 compute the average of all the values including and preceding
118 that entry, weighing the average according to the time between
119 each entry.
120
121 @param counter_id: Name of the counter
122 @type counter_id: C{str}
123 @param avgtype: Type of average to compute
124 @type avgtype: C{int}
125 """
126 self.__data.counters[counter_id] = []
127 self.__data.counter_avg_type[counter_id] = avgtype
128
129
131 """Adds a new per-lease type of data ("stat").
132
133 @param stat_id: Name of the stat
134 @type stat_id: C{str}
135 """
136 self.__data.lease_stats_names.append(stat_id)
137
138
140 """Adds a new per-run type of data ("stat").
141
142 @param stat_id: Name of the stat
143 @type stat_id: C{str}
144 """
145 self.__data.stats_names.append(stat_id)
146
147
149 """Increment a counter
150
151 @param counter_id: Name of the counter
152 @type counter_id: C{str}
153 @param lease_id: Optionally, the lease that caused this increment.
154 @type lease_id: C{int}
155 """
156 time = get_clock().get_time()
157 self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] + 1, lease_id)
158
159
161 """Decrement a counter
162
163 @param counter_id: Name of the counter
164 @type counter_id: C{str}
165 @param lease_id: Optionally, the ID of the lease that caused this increment.
166 @type lease_id: C{int}
167 """
168 time = get_clock().get_time()
169 self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] - 1, lease_id)
170
171
173 """Append a value to a counter
174
175 @param counter_id: Name of the counter
176 @type counter_id: C{str}
177 @param value: Value to append
178 @type value: C{int} or C{float}
179 @param lease_id: Optionally, the ID of the lease that caused this increment.
180 @type lease_id: C{int}
181 """
182 time = get_clock().get_time()
183 if len(self.__data.counters[counter_id]) > 0:
184 prevtime = self.__data.counters[counter_id][-1][0]
185 prevlease = self.__data.counters[counter_id][-1][1]
186 prevval = self.__data.counters[counter_id][-1][2]
187 if time == prevtime:
188 self.__data.counters[counter_id][-1][2] = value
189 else:
190 if prevlease != lease_id or prevval != value:
191 self.__data.counters[counter_id].append([time, lease_id, value])
192 else:
193 self.__data.counters[counter_id].append([time, lease_id, value])
194
195
196
198 """Get the time of the last entry in a counter
199
200 """
201 return self.__data.counters[counter_id][-1][0]
202
203
205 """Get the value of the last entry in a counter
206
207 """
208 return self.__data.counters[counter_id][-1][2]
209
210
212 """Set the value of a per-lease datum
213
214 @param stat_id: Name of the stat
215 @type stat_id: C{str}
216 @param lease_id: The ID of the lease the value is associated to
217 @type lease_id: C{int}
218 @param value: Value of the stat
219 @type value: C{int} or C{float}
220 """
221 self.__data.lease_stats.setdefault(lease_id, {})[stat_id] = value
222
223
225 """Set the value of a per-run datum
226
227 @param stat_id: Name of the stat
228 @type stat_id: C{str}
229 @param value: Value of the stat
230 @type value: C{int} or C{float}
231 """
232 self.__data.stats[stat_id] = value
233
234
236 """Start collecting data
237
238 @param time: Time at which data started being collected
239 @type time: L{mx.DateTime}
240 """
241 self.__data.starttime = time
242
243
244 for counter_id in self.__data.counters:
245 self.append_to_counter(counter_id, 0)
246
247
271
272
274 """Save accounting data to disk.
275
276 @param leases: List of leases to be saved to disk
277 @type leases: List of L{Lease}s
278 """
279 try:
280 dirname = os.path.dirname(self.__datafile)
281 if not os.path.exists(dirname):
282 os.makedirs(dirname)
283 except OSError, e:
284 if e.errno != EEXIST:
285 raise e
286
287
288
289 for l in leases.values():
290 l.clear_rrs()
291 l.logger = None
292 self.__data.leases[l.id] = l
293
294
295 pickle(self.__data, self.__datafile)
296
297
299 """Invoke the probes' at_timestep handlers.
300
301 @param lease_scheduler: Lease Scheduler
302 @type lease_scheduler: L{LeaseScheduler}
303 """
304 for probe in self.__probes:
305 probe.at_timestep(lease_scheduler)
306
307
309 """Invoke the probes' at_lease_request handlers.
310
311 @param lease: Requested lease
312 @type lease: L{Lease}
313 """
314 for probe in self.__probes:
315 probe.at_lease_request(lease)
316
317
319 """Invoke the probes' at_lease_done handlers.
320
321 @param lease: Lease that was completed
322 @type lease: L{Lease}
323 """
324 for probe in self.__probes:
325 probe.at_lease_done(lease)
326
327
329 return [((v[0] - self.__data.starttime).seconds, v[1], v[2]) for v in counter]
330
331
333 return [(v[0], v[1], v[2], None) for v in counter]
334
335
337 accum = 0
338 prev_time = None
339 prev_value = None
340 stats = []
341 for v in counter:
342 time = v[0]
343 lease_id = v[1]
344 value = v[2]
345 if prev_time != None:
346 timediff = time - prev_time
347 weighted_value = prev_value*timediff
348 accum += weighted_value
349 avg = accum/time
350 else:
351 avg = value
352 stats.append((time, lease_id, value, avg))
353 prev_time = time
354 prev_value = value
355
356 return stats
357
358
360 accum = 0
361 count = 0
362 stats = []
363 for v in counter:
364 value = v[2]
365 accum += value
366 count += 1
367 avg = accum/count
368 stats.append((v[0], v[1], value, avg))
369
370 return stats
371
372
374 """Base class for accounting probes
375
376 Accounting probes must extend this class, and can override some of
377 the methods to make sure the accounting framework runs the probe
378 at certain points (see method documentation for details on when
379 to override a method).
380
381 """
383 """Constructor
384
385 Child classes must use their constructors to create counters
386 (with AccountingDataCollection.create_counter) and specify
387 per-lease data (with AccountingDataCollection.create_lease_stat)
388 and per-run data (with AccountingDataCollection.create_stat).
389 """
390 self.accounting = accounting
391
393 """Finalize data collection.
394
395 Override this method to perform any actions when data collection
396 stops. This is usually where per-run data is computed.
397 """
398 pass
399
401 """Collect data at a timestep.
402
403 Override this method to perform any actions every time the
404 Haizea scheduler wakes up.
405
406 @param lease_scheduler: Lease Scheduler
407 @type lease_scheduler: L{LeaseScheduler}
408 """
409 pass
410
412 """Collect data after a lease request.
413
414 Override this method to perform any actions after a lease
415 has been requested.
416
417 @param lease: Requested lease
418 @type lease: L{Lease}
419 """
420 pass
421
423 """Collect data when a lease is done (this includes successful
424 completion and rejected/cancelled/failed leases).
425
426 @param lease: Lease that was completed
427 @type lease: L{Lease}
428 """
429 pass
430
432 """Convenience function that sets the value of a per-run
433 stat with the last value of a counter.
434
435 @param stat_id: Name of per-run stat
436 @type stat_id: C{str}
437 @param counter_id: Name of counter
438 @type counter_id: C{str}
439 """
440 value = self.accounting.get_last_counter_value(counter_id)
441 self.accounting.set_stat(stat_id, value)
442