Package haizea :: Package core :: Module accounting
[hide private]
[frames] | no frames]

Source Code for Module haizea.core.accounting

  1  # -------------------------------------------------------------------------- # 
  2  # Copyright 2006-2009, University of Chicago                                 # 
  3  # Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   # 
  4  # Complutense de Madrid (dsa-research.org)                                   # 
  5  #                                                                            # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); you may    # 
  7  # not use this file except in compliance with the License. You may obtain    # 
  8  # a copy of the License at                                                   # 
  9  #                                                                            # 
 10  # http://www.apache.org/licenses/LICENSE-2.0                                 # 
 11  #                                                                            # 
 12  # Unless required by applicable law or agreed to in writing, software        # 
 13  # distributed under the License is distributed on an "AS IS" BASIS,          # 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   # 
 15  # See the License for the specific language governing permissions and        # 
 16  # limitations under the License.                                             # 
 17  # -------------------------------------------------------------------------- # 
 18   
 19  """Classes used to collect data""" 
 20   
 21  import os 
 22  import os.path 
 23  from haizea.common.utils import pickle, get_clock 
 24  from errno import EEXIST 
 25   
26 -class AccountingData(object):
27 """A container for all the accounting data. When Haizea saves 28 accounting data, it does so by pickling an object of this class. 29 """ 30
31 - def __init__(self):
32 """Initializes all the counters and data to empty values""" 33 # Counters 34 self.counters = {} 35 self.counter_avg_type = {} 36 37 # Per-lease data 38 self.lease_stats_names = [] 39 self.lease_stats = {} 40 41 # Per-run data ("statistics") 42 self.stats_names = [] 43 self.stats = {} 44 45 # Leases 46 self.leases = {} 47 48 # Attributes 49 self.attrs = {} 50 51 self.starttime = None
52 53
54 -class AccountingDataCollection(object):
55 """Accounting data collection 56 57 This class provides a framework to collect data while Haizea is running. 58 It is designed so that the code that collects the data is placed in 59 separate classes (the "probes"; a probe must be a child class of 60 AccountingProbe). Probes can collect three types of data: 61 62 Accounting probes can collect three types of data: 63 64 - Per-lease data: Data attributable to individual leases or derived 65 from how each lease was scheduled. 66 - Per-run data: Data from a single run of Haizea. 67 - Counters: A counter is a time-ordered list showing how some metric 68 varied throughout a single run of Haizea. 69 70 The the AccountingDataCollection class takes care of calling these 71 probes at three points when Haizea is running: 72 (1) at every time step, (2) when a lease is requested, and (3) when a 73 lease is done. 74 75 """ 76 77 AVERAGE_NONE=0 78 AVERAGE_NORMAL=1 79 AVERAGE_TIMEWEIGHTED=2 80 81
82 - def __init__(self, datafile, attrs):
83 """Constructor 84 85 @param datafile: Path to file where accounting data will be saved 86 @type datafile: C{str} 87 @param attrs: Run attributes 88 @type attrs: C{dict} 89 """ 90 self.__data = AccountingData() 91 self.__datafile = datafile 92 self.__probes = [] 93 94 self.__data.attrs = attrs
95 96
97 - def add_probe(self, probe):
98 """Adds a new accounting probe 99 100 @param probe: Probe to add 101 @type probe: L{AccountingProbe} 102 """ 103 self.__probes.append(probe)
104 105
106 - def create_counter(self, counter_id, avgtype):
107 """Adds a new counter. 108 109 Counters can store not just the value of the counter throughout 110 time, but also a running average. This is specified with the 111 avgtype parameter, which can be equal to: 112 113 - AccountingDataCollection.AVERAGE_NONE: Don't compute an average 114 - AccountingDataCollection.AVERAGE_NORMAL: For each entry, compute 115 the average of all the values including and preceding that entry. 116 - AccountingDataCollection.AVERAGE_TIMEWEIGHTED: For each entry, 117 compute the average of all the values including and preceding 118 that entry, weighing the average according to the time between 119 each entry. 120 121 @param counter_id: Name of the counter 122 @type counter_id: C{str} 123 @param avgtype: Type of average to compute 124 @type avgtype: C{int} 125 """ 126 self.__data.counters[counter_id] = [] 127 self.__data.counter_avg_type[counter_id] = avgtype
128 129
130 - def create_lease_stat(self, stat_id):
131 """Adds a new per-lease type of data ("stat"). 132 133 @param stat_id: Name of the stat 134 @type stat_id: C{str} 135 """ 136 self.__data.lease_stats_names.append(stat_id)
137 138
139 - def create_stat(self, stat_id):
140 """Adds a new per-run type of data ("stat"). 141 142 @param stat_id: Name of the stat 143 @type stat_id: C{str} 144 """ 145 self.__data.stats_names.append(stat_id)
146 147
148 - def incr_counter(self, counter_id, lease_id = None):
149 """Increment a counter 150 151 @param counter_id: Name of the counter 152 @type counter_id: C{str} 153 @param lease_id: Optionally, the lease that caused this increment. 154 @type lease_id: C{int} 155 """ 156 time = get_clock().get_time() 157 self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] + 1, lease_id)
158 159
160 - def decr_counter(self, counter_id, lease_id = None):
161 """Decrement a counter 162 163 @param counter_id: Name of the counter 164 @type counter_id: C{str} 165 @param lease_id: Optionally, the ID of the lease that caused this increment. 166 @type lease_id: C{int} 167 """ 168 time = get_clock().get_time() 169 self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] - 1, lease_id)
170 171
172 - def append_to_counter(self, counter_id, value, lease_id = None):
173 """Append a value to a counter 174 175 @param counter_id: Name of the counter 176 @type counter_id: C{str} 177 @param value: Value to append 178 @type value: C{int} or C{float} 179 @param lease_id: Optionally, the ID of the lease that caused this increment. 180 @type lease_id: C{int} 181 """ 182 time = get_clock().get_time() 183 if len(self.__data.counters[counter_id]) > 0: 184 prevtime = self.__data.counters[counter_id][-1][0] 185 prevlease = self.__data.counters[counter_id][-1][1] 186 prevval = self.__data.counters[counter_id][-1][2] 187 if time == prevtime: 188 self.__data.counters[counter_id][-1][2] = value 189 else: 190 if prevlease != lease_id or prevval != value: 191 self.__data.counters[counter_id].append([time, lease_id, value]) 192 else: 193 self.__data.counters[counter_id].append([time, lease_id, value])
194 195 196
197 - def get_last_counter_time(self, counter_id):
198 """Get the time of the last entry in a counter 199 200 """ 201 return self.__data.counters[counter_id][-1][0]
202 203
204 - def get_last_counter_value(self, counter_id):
205 """Get the value of the last entry in a counter 206 207 """ 208 return self.__data.counters[counter_id][-1][2]
209 210
211 - def set_lease_stat(self, stat_id, lease_id, value):
212 """Set the value of a per-lease datum 213 214 @param stat_id: Name of the stat 215 @type stat_id: C{str} 216 @param lease_id: The ID of the lease the value is associated to 217 @type lease_id: C{int} 218 @param value: Value of the stat 219 @type value: C{int} or C{float} 220 """ 221 self.__data.lease_stats.setdefault(lease_id, {})[stat_id] = value
222 223
224 - def set_stat(self, stat_id, value):
225 """Set the value of a per-run datum 226 227 @param stat_id: Name of the stat 228 @type stat_id: C{str} 229 @param value: Value of the stat 230 @type value: C{int} or C{float} 231 """ 232 self.__data.stats[stat_id] = value
233 234
235 - def start(self, time):
236 """Start collecting data 237 238 @param time: Time at which data started being collected 239 @type time: L{mx.DateTime} 240 """ 241 self.__data.starttime = time 242 243 # Start the counters 244 for counter_id in self.__data.counters: 245 self.append_to_counter(counter_id, 0)
246 247
248 - def stop(self):
249 """Stop collecting data 250 251 """ 252 time = get_clock().get_time() 253 254 # Stop the counters 255 for counter_id in self.__data.counters: 256 self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2]) 257 258 # Add the averages 259 for counter_id in self.__data.counters: 260 l = self.__normalize_times(self.__data.counters[counter_id]) 261 avgtype = self.__data.counter_avg_type[counter_id] 262 if avgtype == AccountingDataCollection.AVERAGE_NONE: 263 self.__data.counters[counter_id] = self.__add_no_average(l) 264 elif avgtype == AccountingDataCollection.AVERAGE_NORMAL: 265 self.__data.counters[counter_id] = self.__add_average(l) 266 elif avgtype == AccountingDataCollection.AVERAGE_TIMEWEIGHTED: 267 self.__data.counters[counter_id] = self.__add_timeweighted_average(l) 268 269 for probe in self.__probes: 270 probe.finalize_accounting()
271 272
273 - def save_to_disk(self, leases):
274 """Save accounting data to disk. 275 276 @param leases: List of leases to be saved to disk 277 @type leases: List of L{Lease}s 278 """ 279 try: 280 dirname = os.path.dirname(self.__datafile) 281 if not os.path.exists(dirname): 282 os.makedirs(dirname) 283 except OSError, e: 284 if e.errno != EEXIST: 285 raise e 286 287 # Add lease data 288 # Remove some data that won't be necessary in the reporting tools 289 for l in leases.values(): 290 l.clear_rrs() 291 l.logger = None 292 self.__data.leases[l.id] = l 293 294 # Save data 295 pickle(self.__data, self.__datafile)
296 297
298 - def at_timestep(self, lease_scheduler):
299 """Invoke the probes' at_timestep handlers. 300 301 @param lease_scheduler: Lease Scheduler 302 @type lease_scheduler: L{LeaseScheduler} 303 """ 304 for probe in self.__probes: 305 probe.at_timestep(lease_scheduler)
306 307
308 - def at_lease_request(self, lease):
309 """Invoke the probes' at_lease_request handlers. 310 311 @param lease: Requested lease 312 @type lease: L{Lease} 313 """ 314 for probe in self.__probes: 315 probe.at_lease_request(lease)
316 317
318 - def at_lease_done(self, lease):
319 """Invoke the probes' at_lease_done handlers. 320 321 @param lease: Lease that was completed 322 @type lease: L{Lease} 323 """ 324 for probe in self.__probes: 325 probe.at_lease_done(lease)
326 327
328 - def __normalize_times(self, counter):
329 return [((v[0] - self.__data.starttime).seconds, v[1], v[2]) for v in counter]
330 331
332 - def __add_no_average(self, counter):
333 return [(v[0], v[1], v[2], None) for v in counter]
334 335
336 - def __add_timeweighted_average(self, counter):
337 accum = 0 338 prev_time = None 339 prev_value = None 340 stats = [] 341 for v in counter: 342 time = v[0] 343 lease_id = v[1] 344 value = v[2] 345 if prev_time != None: 346 timediff = time - prev_time 347 weighted_value = prev_value*timediff 348 accum += weighted_value 349 avg = accum/time 350 else: 351 avg = value 352 stats.append((time, lease_id, value, avg)) 353 prev_time = time 354 prev_value = value 355 356 return stats
357 358
359 - def __add_average(self, counter):
360 accum = 0 361 count = 0 362 stats = [] 363 for v in counter: 364 value = v[2] 365 accum += value 366 count += 1 367 avg = accum/count 368 stats.append((v[0], v[1], value, avg)) 369 370 return stats
371 372
373 -class AccountingProbe(object):
374 """Base class for accounting probes 375 376 Accounting probes must extend this class, and can override some of 377 the methods to make sure the accounting framework runs the probe 378 at certain points (see method documentation for details on when 379 to override a method). 380 381 """
382 - def __init__(self, accounting):
383 """Constructor 384 385 Child classes must use their constructors to create counters 386 (with AccountingDataCollection.create_counter) and specify 387 per-lease data (with AccountingDataCollection.create_lease_stat) 388 and per-run data (with AccountingDataCollection.create_stat). 389 """ 390 self.accounting = accounting
391
392 - def finalize_accounting(self):
393 """Finalize data collection. 394 395 Override this method to perform any actions when data collection 396 stops. This is usually where per-run data is computed. 397 """ 398 pass
399
400 - def at_timestep(self, lease_scheduler):
401 """Collect data at a timestep. 402 403 Override this method to perform any actions every time the 404 Haizea scheduler wakes up. 405 406 @param lease_scheduler: Lease Scheduler 407 @type lease_scheduler: L{LeaseScheduler} 408 """ 409 pass
410
411 - def at_lease_request(self, lease):
412 """Collect data after a lease request. 413 414 Override this method to perform any actions after a lease 415 has been requested. 416 417 @param lease: Requested lease 418 @type lease: L{Lease} 419 """ 420 pass
421
422 - def at_lease_done(self, lease):
423 """Collect data when a lease is done (this includes successful 424 completion and rejected/cancelled/failed leases). 425 426 @param lease: Lease that was completed 427 @type lease: L{Lease} 428 """ 429 pass
430
431 - def _set_stat_from_counter(self, stat_id, counter_id):
432 """Convenience function that sets the value of a per-run 433 stat with the last value of a counter. 434 435 @param stat_id: Name of per-run stat 436 @type stat_id: C{str} 437 @param counter_id: Name of counter 438 @type counter_id: C{str} 439 """ 440 value = self.accounting.get_last_counter_value(counter_id) 441 self.accounting.set_stat(stat_id, value)
442