Package haizea :: Package core :: Package scheduler :: Module slottable
[hide private]
[frames] | no frames]

Source Code for Module haizea.core.scheduler.slottable

   1  # -------------------------------------------------------------------------- # 
   2  # Copyright 2006-2009, University of Chicago                                 # 
   3  # Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   # 
   4  # Complutense de Madrid (dsa-research.org)                                   # 
   5  #                                                                            # 
   6  # Licensed under the Apache License, Version 2.0 (the "License"); you may    # 
   7  # not use this file except in compliance with the License. You may obtain    # 
   8  # a copy of the License at                                                   # 
   9  #                                                                            # 
  10  # http://www.apache.org/licenses/LICENSE-2.0                                 # 
  11  #                                                                            # 
  12  # Unless required by applicable law or agreed to in writing, software        # 
  13  # distributed under the License is distributed on an "AS IS" BASIS,          # 
  14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   # 
  15  # See the License for the specific language governing permissions and        # 
  16  # limitations under the License.                                             # 
  17  # -------------------------------------------------------------------------- # 
  18   
  19  import haizea.common.constants as constants 
  20  from haizea.common.utils import xmlrpc_marshall_singlevalue 
  21  import bisect 
  22  import logging 
  23  from operator import attrgetter 
  24   
  25  """This module provides an in-memory slot table data structure.  
  26   
  27  A slot table is essentially just a collection of resource reservations, and is 
  28  implemented using the classes ResourceTuple, ResourceReservation, Node, and 
  29  KeyValueWrapper, and SlotTable. See the documentation in these classes for 
  30  additional details. 
  31   
  32  This module also provides an "availability window" implementation, which provides 
  33  easier access to the contents of a slot table (by determining the availability 
  34  in each node starting at a given time). The availability window is implemented 
  35  in classes ChangepointAvail, ChangepointNodeAvail, AvailEntry, OngoingAvailability, 
  36  and AvailabilityWindow. 
  37   
  38  """ 
39 40 41 -class ResourceTuple(object):
42 """A resource tuple 43 44 This is an internal data structure used by the slot table. To 45 manipulate "quantities of resources" in Haizea, use L{Capacity} 46 instead. 47 48 A resource tuple represents a quantity of resources. For example, 49 "50% of a CPU and 512 MB of memory" is a resource tuple with two 50 components (CPU and memory). The purpose of having a class for this 51 (instead of a simpler structure, like a list or dictionary) is to 52 be able to perform certain basic operations, like determining whether 53 one tuple "fits" in another (e.g., the previous tuple fits in 54 "100% of CPU and 1024 MB of memory", but in "100% of CPU and 256 MB 55 of memory". 56 57 A resource tuple is tightly coupled to a particular slot table. So, 58 if a slot table defines that each node has "CPUs, memory, and disk space", 59 the resource tuples will depend on this definition (the specification 60 of valid resources is not repeated in each resource tuple object). 61 62 Resources in a resource tuple can be of two types: single instance and 63 multi instance. Memory is an example of a single instance resource: there 64 is only "one" memory in a node (with some capacity). CPUs are an example 65 of a multi instance resource: there can be multiple CPUs in a single node, 66 and each CPU can be used to satisfy a requirement for a CPU. 67 68 """ 69 SINGLE_INSTANCE = 1 70 MULTI_INSTANCE = 2 71
72 - def __init__(self, slottable, single_instance, multi_instance = None):
73 """Constructor. Should only be called from SlotTable. 74 75 The constructor is not meant to be called directly and should only 76 be called from SlotTable. 77 78 The res parameter is a list with the quantities of each resource. 79 The list starts with the single-instance resources, followed 80 by the multi-instance resources. The slottable contains information 81 about the layout of this list: 82 83 - The mapping of resource to position in the list is contained in attribute 84 rtuple_restype2pos of the slottable. 85 - For single-instance resources, the position returned by this mapping contains 86 the quantity. 87 - For multi-instance resources, the position returns the 88 quantity of the first instance. The number of instances of a given resource 89 is contained in attribute rtuple_nres of the slottable. 90 - The number of single-instance resources is contained in attribute rtuple_len of 91 the slottable. 92 93 @param slottable: Slot table 94 @type slottable: L{SlotTable} 95 @param single_instance: Quantities of single instance resources 96 @type single_instance: C{list} 97 @param multi_instance: Quantities of multi instance resources 98 @type multi_instance: C{dict} 99 """ 100 101 self.slottable = slottable 102 """ 103 Slot table 104 @type: SlotTable 105 """ 106 107 self._single_instance = single_instance 108 """ 109 Resource quantities 110 @type: list 111 """ 112 113 self._multi_instance = multi_instance 114 """ 115 Resource quantities 116 @type: dict 117 """
118 119 @classmethod
120 - def copy(cls, rt):
121 """Creates a deep copy of a resource tuple 122 123 @param rt: Resource tuple to copy 124 @type rt: L{ResourceTuple} 125 @return: Copy of resource tuple 126 @rtype: L{ResourceTuple} 127 """ 128 return cls(rt.slottable, rt._single_instance[:], dict([(pos,l[:]) for pos, l in rt._multi_instance.items()]))
129 130
131 - def fits_in(self, rt):
132 """Determines if this resource tuple fits in a given resource tuple 133 134 @param rt: Resource tuple 135 @type rt: L{ResourceTuple} 136 @return: True if this resource tuple fits in rt. False otherwise. 137 @rtype: bool 138 """ 139 140 # For single-instance resources, this is simple: just check if all the values 141 # in this resource tuple are smaller that the corresponding value in the 142 # given resource tuple 143 for i in xrange(self.slottable.rtuple_nsingle): 144 if self._single_instance[i] > rt._single_instance[i]: 145 return False 146 147 # For multi-instance resources this is a bit more hairy, since there 148 # are potentially multiple fittings. For example, if we have four CPUs 149 # and one is 50% used, an additional 50% could be fit in any of the four 150 # CPUs. Here we use a simple first-fit algorithm. 151 if self.slottable.rtuple_has_multiinst: 152 # Create copy of resource tuple's multi-instance resources. We'll 153 # subtract from this to determine if there is a fir. 154 _multi_instance2 = dict([(i, l[:]) for (i,l) in rt._multi_instance.items()]) 155 for (pos, l) in self._multi_instance.items(): 156 insts = _multi_instance2[pos] 157 for quantity in l: 158 fits = False 159 for i in range(len(insts)): 160 if quantity <= insts[i]: 161 fits = True 162 insts[i] -= quantity 163 break 164 if fits == False: 165 return False 166 return True
167 168
169 - def incr(self, rt):
170 """Increases the resource tuple with the amounts in a given resource tuple 171 172 @param rt: Resource tuple 173 @type rt: L{ResourceTuple} 174 """ 175 for slottype in xrange(self.slottable.rtuple_nsingle): 176 self._single_instance[slottype] += rt._single_instance[slottype] 177 if self.slottable.rtuple_has_multiinst: 178 for (pos, l) in rt._multi_instance.items(): 179 self._multi_instance[pos] += l[:]
180
181 - def decr(self, rt):
182 """Decreases the resource tuple with the amounts in a given resource tuple 183 184 Precondition: rt must be known to fit in the resource tuple (via fits_in) 185 186 @param rt: Resource tuple 187 @type rt: L{ResourceTuple} 188 """ 189 for slottype in xrange(self.slottable.rtuple_nsingle): 190 self._single_instance[slottype] -= rt._single_instance[slottype] 191 192 # Decreasing is trickier than increasing because instead of simply adding 193 # more instances, we essentially have to fit the multi-instance resources 194 # from the given resource tuple into the resource tuple. For consistency, 195 # we use the same first-fit algorithm as in fits_in 196 if self.slottable.rtuple_has_multiinst: 197 for (pos, l) in rt._multi_instance.items(): 198 insts = self._multi_instance[pos] 199 for quantity in l: 200 fits = False 201 for i in range(len(insts)): 202 if quantity <= insts[i]: 203 fits = True 204 insts[i] -= quantity 205 break 206 if fits == False: 207 # If the precondition is met, this shouldn't happen 208 raise Exception, "Can't decrease"
209 210 211
212 - def get_by_type(self, restype):
213 """Gets the amount of a given resource type. 214 215 @param restype: Resource type 216 @type restype: C{str} 217 @return: For single-instance resources, returns the amount. For multi-instance 218 resources, returns the sum of all the instances. 219 @rtype: int 220 """ 221 pos = self.slottable.rtuple_restype2pos[restype] 222 if pos < self.slottable.rtuple_nsingle: 223 return self._single_instance[pos] 224 else: 225 return sum(self._multi_instance[pos])
226 227
228 - def any_less(self, rt):
229 """Determines if any amount of a resource is less than that in a given resource tuple 230 231 In the case of multi-instance resources, this method will only work when both 232 resource tuples have the same number of instances, and makes the comparison 233 instance by instance. For example, if a CPU resource has two instances A and B: 234 ___A__B_ 235 R1|75 50 236 R2|50 75 237 238 R2.any_less(R1) returns True. However: 239 240 ___A__B_ 241 R1|75 50 242 R2|75 50 243 244 R2.any_less(R1) returns False, even though one instance (R2.B) is less than another (R1.A) 245 246 @param rt: Resource tuple 247 @type rt: L{ResourceTuple} 248 @return: True if these is any resource such that its amount is less than that in rt. 249 @rtype: int 250 """ 251 for i in xrange(self.slottable.rtuple_nsingle): 252 if self._single_instance[i] < rt._single_instance[i]: 253 return True 254 255 if self.slottable.rtuple_has_multiinst: 256 for (pos, l) in self._multi_instance.items(): 257 for i, x in l: 258 if l[i] < rt._multi_instance[pos][i]: 259 return True 260 261 return False
262
263 - def min(self, rt):
264 """Modifies the resource amounts to the minimum of the current amount and that in the given resource tuple 265 266 As in any_less, for multi-instance resources this method will only work when both 267 resource tuples have the same number of instances, and makes the change 268 instance by instance. 269 270 @param rt: Resource tuple 271 @type rt: L{ResourceTuple} 272 """ 273 for i in xrange(self.slottable.rtuple_nsingle): 274 self._single_instance[i] = min(self._single_instance[i], rt._single_instance[i]) 275 276 if self.slottable.rtuple_has_multiinst: 277 for (pos, l) in self._multi_instance.items(): 278 for i, x in l: 279 l[i] = min(l[i], rt._multi_instance[pos][i])
280 281
282 - def __repr__(self):
283 """Creates a string representation of the resource tuple 284 285 @return: String representation 286 @rtype: C{str} 287 """ 288 r="" 289 for i, x in enumerate(self._single_instance): 290 r += "%s:%i " % (i, x) 291 if self.slottable.rtuple_has_multiinst: 292 r+= `self._multi_instance` 293 return r
294
295 - def __eq__(self, rt):
296 """Determines if the resource tuple is equal to a given resource tuple 297 298 @return: True if they equal, False otherwise 299 @rtype: C{str} 300 """ 301 return self._single_instance == rt._single_instance and self._multi_instance == rt._multi_instance
302
303 - def __getstate__(self):
304 """Returns state necessary to unpickle a ResourceTuple object 305 306 """ 307 return (self._single_instance, self._multi_instance)
308
309 - def __setstate__(self, state):
310 """Restores state when unpickling a ResourceTuple object 311 312 After unpickling, the object still has to be bound to a slottable. 313 """ 314 self.slottable = None 315 self._single_instance = state[0] 316 self._multi_instance = state[1]
317
318 319 -class ResourceReservation(object):
320 """A resource reservation 321 322 A resource reservation (or RR) is a data structure representing resources 323 (represented as a ResourceTuple) reserved across multiple physical nodes. 324 (each node can have a different resource tuple; e.g., 1 CPU and 325 512 MB of memory in node 1 and 2 CPUs and 1024 MB of memory in node 2). An RR 326 has a specific start and end time for all the nodes. Thus, if some nodes are 327 reserved for an interval of time, and other nodes are reserved for a different 328 interval (even if these reservations are for the same lease), two separate RRs 329 would have to be added to the slot table. 330 331 This class isn't used by itself but rather serves as the base class for 332 VM reservations, image transfer reservations, etc. 333 """ 334 335 # Resource reservation states 336 STATE_SCHEDULED = 0 337 STATE_ACTIVE = 1 338 STATE_DONE = 2 339 340 # Mapping from state to a descriptive string 341 state_str = {STATE_SCHEDULED : "Scheduled", 342 STATE_ACTIVE : "Active", 343 STATE_DONE : "Done"} 344
345 - def __init__(self, lease, start, end, res):
346 """Constructor 347 348 @param lease: Lease this resource reservation belongs to 349 @type lease: L{Lease} 350 @param start: Starting time of the reservation 351 @type start: L{DateTime} 352 @param end: Ending time of the reservation 353 @type end: L{DateTime} 354 @param res: A dictionary mapping physical node ids to ResourceTuple objects 355 @type res: C{dict} 356 """ 357 self.lease = lease 358 self.start = start 359 self.end = end 360 self.state = None 361 self.resources_in_pnode = res # pnode -> ResourceTuple
362
363 - def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
364 """Prints the contents of the RR to the log 365 366 @param loglevel: Log level 367 @type loglevel: C{str} 368 """ 369 logger = logging.getLogger("LEASES") 370 logger.log(loglevel, "Start : %s" % self.start) 371 logger.log(loglevel, "End : %s" % self.end) 372 logger.log(loglevel, "State : %s" % ResourceReservation.state_str[self.state]) 373 logger.log(loglevel, "Resources : \n %s" % "\n ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]))
374
375 376 -class SlotTable(object):
377 """Slot table 378 379 The slot table is one of the main data structures in Haizea (if not *the* main one). 380 It tracks the capacity of the physical nodes on which leases can be scheduled, 381 contains the resource reservations of all the leases, and allows efficient access 382 to them. 383 384 However, the information in the slot table is stored in a somewhat 'raw' format 385 (a collection of L{ResourceReservation}s) which can be hard to use directly. So, 386 one of the responsabilities of the slot table is to efficiently generate "availability 387 windows", which are a more convenient abstraction over available resources. See 388 AvailabilityWindow for more details. When writing a custom mapper, most read-only 389 interactions with the slot table should be through availability windows, which can 390 be obtained through the get_availability_window method of SlotTable. 391 392 The slot table also depends on classes L{Node} and L{KeyValueWrapper}. 393 394 Since querying resource reservations is the most frequent operation in Haizea, the 395 slot table tries to optimize access to them as much as possible. In particular, 396 we will often need to quickly access reservations starting or ending at a specific 397 time (or in an interval of time). The current slot table implementation stores the RRs 398 in two ordered lists: one by starting time and another by ending time. Access is done by 399 binary search in O(log n) time using the C{bisect} module. Insertion and removal 400 require O(n) time, since lists are implemented internally as arrays in CPython. 401 We could improve these times in the future by using a tree structure (which Python 402 doesn't have natively, so we'd have to include our own tree implementation), although 403 slot table accesses far outweight insertion and removal operations. 404 405 """ 406
407 - def __init__(self, resource_types):
408 """Constructor 409 410 The slot table will be initially empty, without any physical nodes. These have to be added 411 with add_node. 412 413 @param resource_types: A dictionary mapping resource types to ResourceTuple.SINGLE_INSTANCE or 414 ResourceTuple.MULTI_INSTANCE (depending on whether the resource is single- or multi-instance) 415 @type resource_types: C{dict} 416 """ 417 self.logger = logging.getLogger("SLOT") 418 self.nodes = {} 419 self.reservations_by_start = [] 420 self.reservations_by_end = [] 421 self.resource_types = resource_types 422 self.availabilitycache = {} 423 self.awcache_time = None 424 self.awcache = None 425 self.__dirty() 426 427 # Resource tuple fields 428 res_singleinstance = [rt for rt,ninst in resource_types if ninst == ResourceTuple.SINGLE_INSTANCE] 429 res_multiinstance = [(rt,ninst) for rt,ninst in resource_types if ninst == ResourceTuple.MULTI_INSTANCE] 430 self.rtuple_nsingle = len(res_singleinstance) 431 self.rtuple_nmultiple = len(res_multiinstance) 432 self.rtuple_has_multiinst = self.rtuple_nmultiple > 0 433 self.rtuple_restype2pos = dict([(rt,i) for (i,rt) in enumerate(res_singleinstance)]) 434 pos = self.rtuple_nsingle 435 for rt, ninst in res_multiinstance: 436 self.rtuple_restype2pos[rt] = pos 437 pos = pos + 1
438
439 - def add_node(self, node_id, resourcetuple):
440 """Add a new physical node to the slot table 441 442 @param node_id: Resource type 443 @type node_id: C{int} 444 @param resourcetuple: Resource type 445 @type resourcetuple: L{ResourceTuple} 446 """ 447 self.nodes[node_id] = Node(resourcetuple)
448
450 """Create an empty resource tuple 451 452 @return: Empty resource tuple, single-instance resources set to zero, multi-instance resources 453 set to zero instances. 454 @rtype: L{ResourceTuple} 455 """ 456 return ResourceTuple(self, [0] * self.rtuple_nsingle, dict((pos,[]) for pos in xrange(self.rtuple_nsingle, self.rtuple_nsingle+self.rtuple_nmultiple)))
457
458 - def create_resource_tuple_from_capacity(self, capacity):
459 """Converts a L{Capacity} object to a L{ResourceTuple} 460 461 @param capacity: Resource capacity 462 @type capacity: L{Capacity} 463 @return: Resource tuple 464 @rtype: L{ResourceTuple} 465 """ 466 single_instance = [0] * self.rtuple_nsingle 467 multi_instance = {} 468 for restype in capacity.get_resource_types(): 469 pos = self.rtuple_restype2pos[restype] 470 ninst = capacity.ninstances[restype] 471 if pos < self.rtuple_nsingle: 472 single_instance[pos] = capacity.get_quantity(restype) 473 else: 474 multi_instance[pos] = [] 475 for i in range(ninst): 476 multi_instance[pos].append(capacity.get_quantity_instance(restype, i)) 477 478 rt = ResourceTuple(self, single_instance, multi_instance) 479 480 return rt
481
482 - def get_availability_window(self, start):
483 """Creates an availability window starting at a given time. 484 485 @param start: Start of availability window. 486 @type start: L{DateTime} 487 @return: Availability window 488 @rtype: L{AvailabilityWindow} 489 """ 490 491 # If possible, we try to use the cached availability window, so we don't have to 492 # recompute an entire availability window from scratch. 493 # The way availability windows are currently implemented (see AvailabilityWindow 494 # for details), an existing availability window can be used if the requested start 495 # time is after the existing start time *and* the requested start time is one of 496 # the changepoints covered by the availability window. 497 if self.awcache == None or start < self.awcache_time or (start >= self.awcache_time and not self.awcache.changepoints.has_key(start)): 498 # If the cached version doesn't work, recompute the availability window 499 self.__get_aw_cache_miss(start) 500 return self.awcache
501
502 - def get_availability(self, time, min_capacity=None):
503 """Computes the available resources on all nodes at a given time. 504 505 @param time: Time at which to determine availability. 506 @type time: L{DateTime} 507 @param min_capacity: If not None, only include the nodes that have at least 508 this minimum capacity. 509 @type min_capacity: L{ResourceTuple} 510 @return: A dictionary mapping physical node id to a L{Node} object (which 511 contains the available capacity of that physical node at the specified time) 512 @rtype: C{dict} 513 """ 514 if not self.availabilitycache.has_key(time): 515 self.__get_availability_cache_miss(time) 516 # Cache miss 517 518 nodes = self.availabilitycache[time] 519 520 # Keep only those nodes with enough resources 521 if min_capacity != None: 522 newnodes = {} 523 for n, node in nodes.items(): 524 if min_capacity.fits_in(node.capacity): 525 newnodes[n]=node 526 else: 527 pass 528 nodes = newnodes 529 530 return nodes
531
532 - def is_empty(self):
533 """Determines if the slot table is empty (has no reservations) 534 535 @return: True if there are no reservations, False otherwise. 536 @rtype: C{bool} 537 """ 538 return (len(self.reservations_by_start) == 0)
539
540 - def is_full(self, time, restype):
541 """Determines if a resource type is "full" at a specified time. 542 543 A resource type is considered to be "full" if its available capacity is zero 544 in all the physical nodes in the slot table. 545 546 @param time: time at which to check for fullness. 547 @type time: L{DateTime} 548 @param restype: Resource type 549 @type restype: C{str} 550 @return: True if the resource type is full, False otherwise. 551 @rtype: C{bool} 552 """ 553 nodes = self.get_availability(time) 554 avail = sum([node.capacity.get_by_type(restype) for node in nodes.values()]) 555 return (avail == 0)
556
557 - def get_total_capacity(self, restype):
558 """Determines the aggregate capacity of a given resource type across all nodes. 559 560 @param restype: Resource type 561 @type restype: C{str} 562 @return: Total capacity 563 @rtype: C{int} 564 """ 565 return sum([n.capacity.get_by_type(restype) for n in self.nodes.values()])
566
567 - def add_reservation(self, rr):
568 """Adds a L{ResourceReservation} to the slot table. 569 570 @param rr: Resource reservation 571 @type rr: L{ResourceReservation} 572 """ 573 startitem = KeyValueWrapper(rr.start, rr) 574 enditem = KeyValueWrapper(rr.end, rr) 575 bisect.insort(self.reservations_by_start, startitem) 576 bisect.insort(self.reservations_by_end, enditem) 577 self.__dirty()
578 579
580 - def update_reservation(self, rr, old_start, old_end):
581 """Update a L{ResourceReservation} to the slot table. 582 583 Since the start and end time are used to index the reservations, 584 the old times have to be provided so we can find the old reservation 585 and make the changes. 586 587 @param rr: Resource reservation with updated values (including potentially new start and/or end times) 588 @type rr: L{ResourceReservation} 589 @param old_start: Start time of reservation before update. 590 @type old_start: L{DateTime} 591 @param old_end: End time of reservation before update. 592 @type old_end: L{DateTime} 593 """ 594 # TODO: Might be more efficient to resort lists 595 self.__remove_reservation(rr, old_start, old_end) 596 self.add_reservation(rr) 597 self.__dirty()
598 599
600 - def remove_reservation(self, rr):
601 """Remove a L{ResourceReservation} from the slot table. 602 603 @param rr: Resource reservation 604 @type rr: L{ResourceReservation} 605 """ 606 self.__remove_reservation(rr, rr.start, rr.end)
607 608
609 - def get_reservations_at(self, time):
610 """Get all reservations at a specified time 611 612 @param time: Time 613 @type time: L{DateTime} 614 @return: Resource reservations 615 @rtype: C{list} of L{ResourceReservation}s 616 """ 617 item = KeyValueWrapper(time, None) 618 startpos = bisect.bisect_right(self.reservations_by_start, item) 619 bystart = set([x.value for x in self.reservations_by_start[:startpos]]) 620 endpos = bisect.bisect_right(self.reservations_by_end, item) 621 byend = set([x.value for x in self.reservations_by_end[endpos:]]) 622 res = bystart & byend 623 return list(res)
624
625 - def get_reservations_starting_between(self, start, end):
626 """Get all reservations starting in a specified interval. 627 628 The interval is closed: it includes the starting time and the ending time. 629 630 @param start: Start of interval 631 @type start: L{DateTime} 632 @param end: End of interval 633 @type end: L{DateTime} 634 @return: Resource reservations 635 @rtype: C{list} of L{ResourceReservation}s 636 """ 637 startitem = KeyValueWrapper(start, None) 638 enditem = KeyValueWrapper(end, None) 639 startpos = bisect.bisect_left(self.reservations_by_start, startitem) 640 endpos = bisect.bisect_right(self.reservations_by_start, enditem) 641 res = [x.value for x in self.reservations_by_start[startpos:endpos]] 642 return res
643
644 - def get_reservations_ending_between(self, start, end):
645 """Get all reservations ending in a specified interval. 646 647 The interval is closed: it includes the starting time and the ending time. 648 649 @param start: Start of interval 650 @type start: L{DateTime} 651 @param end: End of interval 652 @type end: L{DateTime} 653 @return: Resource reservations 654 @rtype: C{list} of L{ResourceReservation}s 655 """ 656 startitem = KeyValueWrapper(start, None) 657 enditem = KeyValueWrapper(end, None) 658 startpos = bisect.bisect_left(self.reservations_by_end, startitem) 659 endpos = bisect.bisect_right(self.reservations_by_end, enditem) 660 res = [x.value for x in self.reservations_by_end[startpos:endpos]] 661 return res
662
663 - def get_reservations_starting_after(self, start):
664 """Get all reservations starting after (but not on) a specified time 665 666 @param start: Time 667 @type start: L{DateTime} 668 @return: Resource reservations 669 @rtype: C{list} of L{ResourceReservation}s 670 """ 671 startitem = KeyValueWrapper(start, None) 672 startpos = bisect.bisect_right(self.reservations_by_start, startitem) 673 res = [x.value for x in self.reservations_by_start[startpos:]] 674 return res
675
676 - def get_reservations_ending_after(self, end):
677 """Get all reservations ending after (but not on) a specified time 678 679 @param end: Time 680 @type end: L{DateTime} 681 @return: Resource reservations 682 @rtype: C{list} of L{ResourceReservation}s 683 """ 684 startitem = KeyValueWrapper(end, None) 685 startpos = bisect.bisect_right(self.reservations_by_end, startitem) 686 res = [x.value for x in self.reservations_by_end[startpos:]] 687 return res
688
689 - def get_reservations_starting_on_or_after(self, start):
690 """Get all reservations starting on or after a specified time 691 692 @param start: Time 693 @type start: L{DateTime} 694 @return: Resource reservations 695 @rtype: C{list} of L{ResourceReservation}s 696 """ 697 startitem = KeyValueWrapper(start, None) 698 startpos = bisect.bisect_left(self.reservations_by_start, startitem) 699 res = [x.value for x in self.reservations_by_start[startpos:]] 700 return res
701
702 - def get_reservations_ending_on_or_after(self, end):
703 """Get all reservations ending on or after a specified time 704 705 @param end: Time 706 @type end: L{DateTime} 707 @return: Resource reservations 708 @rtype: C{list} of L{ResourceReservation}s 709 """ 710 startitem = KeyValueWrapper(end, None) 711 startpos = bisect.bisect_left(self.reservations_by_end, startitem) 712 res = [x.value for x in self.reservations_by_end[startpos:]] 713 return res
714 715
716 - def get_reservations_starting_at(self, time):
717 """Get all reservations starting at a specified time 718 719 @param time: Time 720 @type time: L{DateTime} 721 @return: Resource reservations 722 @rtype: C{list} of L{ResourceReservation}s 723 """ 724 return self.get_reservations_starting_between(time, time)
725
726 - def get_reservations_ending_at(self, time):
727 """Get all reservations ending at a specified time 728 729 @param time: Time 730 @type time: L{DateTime} 731 @return: Resource reservations 732 @rtype: C{list} of L{ResourceReservation}s 733 """ 734 return self.get_reservations_ending_between(time, time)
735
736 - def get_reservations_after(self, time):
737 """Get all reservations that take place after (but not on) a 738 specified time. i.e., all reservations starting or ending after that time. 739 740 @param time: Time 741 @type time: L{DateTime} 742 @return: Resource reservations 743 @rtype: C{list} of L{ResourceReservation}s 744 """ 745 bystart = set(self.get_reservations_starting_after(time)) 746 byend = set(self.get_reservations_ending_after(time)) 747 return list(bystart | byend)
748
749 - def get_reservations_on_or_after(self, time):
750 """Get all reservations that take place on or after a 751 specified time. i.e., all reservations starting or ending after that time. 752 753 @param time: Time 754 @type time: L{DateTime} 755 @return: Resource reservations 756 @rtype: C{list} of L{ResourceReservation}s 757 """ 758 bystart = set(self.get_reservations_starting_on_or_after(time)) 759 byend = set(self.get_reservations_ending_on_or_after(time)) 760 return list(bystart | byend)
761
762 - def get_changepoints_after(self, after, until=None, nodes=None):
763 """Get all the changepoints after a given time. 764 765 A changepoint is any time anything is scheduled to change in the 766 slottable (a reservation starting or ending). 767 768 @param after: Time 769 @type after: L{DateTime} 770 @param until: If not None, only include changepoints until this time. 771 @type until: L{DateTime} 772 @param nodes: If not None, only include changepoints affecting these nodes. 773 @type nodes: C{list} of C{int}s 774 @return: Changepoints 775 @rtype: C{list} of L{DateTime}s 776 """ 777 changepoints = set() 778 res = self.get_reservations_after(after) 779 for rr in res: 780 if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0): 781 if rr.start > after: 782 changepoints.add(rr.start) 783 if rr.end > after: 784 changepoints.add(rr.end) 785 changepoints = list(changepoints) 786 if until != None: 787 changepoints = [c for c in changepoints if c < until] 788 changepoints.sort() 789 return changepoints
790
791 - def get_next_changepoint(self, time):
792 """Get the first changepoint after a given time. 793 794 @param time: Time 795 @type time: L{DateTime} 796 @return: Changepoints 797 @rtype: L{DateTime} 798 """ 799 item = KeyValueWrapper(time, None) 800 801 startpos = bisect.bisect_right(self.reservations_by_start, item) 802 if startpos == len(self.reservations_by_start): 803 time1 = None 804 else: 805 time1 = self.reservations_by_start[startpos].value.start 806 807 endpos = bisect.bisect_right(self.reservations_by_end, item) 808 if endpos == len(self.reservations_by_end): 809 time2 = None 810 else: 811 time2 = self.reservations_by_end[endpos].value.end 812 813 if time1==None and time2==None: 814 return None 815 elif time1==None: 816 return time2 817 elif time2==None: 818 return time1 819 else: 820 return min(time1, time2)
821 822 823
824 - def get_next_premature_end(self, after):
825 """Get the first premature end time after a given time. ONLY FOR SIMULATION. 826 827 In simulation, some reservations can end prematurely, and this information 828 is stored in the slot table (in real life, this information is not 829 known a priori). 830 831 @param after: Time 832 @type after: L{DateTime} 833 @return: Next premature end 834 @rtype: L{DateTime} 835 """ 836 from haizea.core.scheduler.vm_scheduler import VMResourceReservation 837 # Inefficient, but ok since this query seldom happens 838 res = [i.value for i in self.reservations_by_end if isinstance(i.value, VMResourceReservation) and i.value.prematureend > after] 839 if len(res) > 0: 840 prematureends = [r.prematureend for r in res] 841 prematureends.sort() 842 return prematureends[0] 843 else: 844 return None
845 846
847 - def get_prematurely_ending_res(self, time):
848 """Gets all the L{ResourceReservation}s that are set to end prematurely at a given time. ONLY FOR SIMULATION 849 850 @param time: Time 851 @type time: L{DateTime} 852 @return: Resource reservations 853 @rtype: C{list} of L{ResourceReservation}s 854 """ 855 from haizea.core.scheduler.vm_scheduler import VMResourceReservation 856 return [i.value for i in self.reservations_by_end if isinstance(i.value, VMResourceReservation) and i.value.prematureend == time]
857 858
859 - def __remove_reservation(self, rr, start=None, end=None):
860 """Remove a L{ResourceReservation} from the slot table. 861 862 @param rr: Resource reservation 863 @type rr: L{ResourceReservation} 864 @param start: Start time under which the reservation is indexed, in cases where the RR 865 has changed (this parameter is only used when calling this method from update_reservation) 866 @type start: L{DateTime} 867 @param end: Same as start, but for the end time for the RR. 868 @type end: L{DateTime} 869 """ 870 if start == None: 871 start = rr.start 872 if end == None: 873 end = rr.end 874 posstart = self.__get_reservation_index(self.reservations_by_start, rr, start) 875 posend = self.__get_reservation_index(self.reservations_by_end, rr, end) 876 self.reservations_by_start.pop(posstart) 877 self.reservations_by_end.pop(posend) 878 self.__dirty()
879 880
881 - def __get_availability_cache_miss(self, time):
882 """Computes availability at a given time, and caches it. 883 884 Called when get_availability can't use availabilities in the cache. 885 886 @param time: Time at which to determine availability. 887 @type time: L{DateTime} 888 """ 889 allnodes = set(self.nodes.keys()) 890 nodes = {} 891 reservations = self.get_reservations_at(time) 892 893 # Find how much resources are available on each node 894 for r in reservations: 895 for node in r.resources_in_pnode: 896 if not nodes.has_key(node): 897 n = self.nodes[node] 898 nodes[node] = Node(n.capacity) 899 nodes[node].capacity.decr(r.resources_in_pnode[node]) 900 901 # For the remaining nodes, use a reference to the original node, not a copy 902 missing = allnodes - set(nodes.keys()) 903 for node in missing: 904 nodes[node] = self.nodes[node] 905 906 self.availabilitycache[time] = nodes
907
908 - def __get_aw_cache_miss(self, time):
909 """Computes availability window at a given time, and caches it. 910 911 Called when get_availability_window can't use the cached availability window. 912 913 @param time: Start of availability window. 914 @type time: L{DateTime} 915 """ 916 self.awcache = AvailabilityWindow(self, time) 917 self.awcache_time = time
918
919 - def __dirty(self):
920 """Empties the caches. 921 922 Should be called whenever the caches become dirty (e.g., when a reservation 923 is added to the slot table). 924 925 """ 926 # You're a dirty, dirty slot table and you should be 927 # ashamed of having outdated caches! 928 self.availabilitycache = {} 929 self.awcache_time = None 930 self.awcache = None
931
932 - def __get_reservation_index(self, rlist, rr, time):
933 """Find the index of a resource reservation in one of the internal reservation lists 934 935 @param rlist: Resource reservation 936 @type rlist: C{list} of L{ResourceReservation}s 937 @param rr: Resource reservation to look up 938 @type rr: L{ResourceReservation} 939 @param time: time the reservation is indexed under 940 @type time: L{DateTime} 941 """ 942 item = KeyValueWrapper(time, None) 943 pos = bisect.bisect_left(rlist, item) 944 found = False 945 while not found: 946 if rlist[pos].value == rr: 947 found = True 948 else: 949 pos += 1 950 return pos
951
952 - def sanity_check(self):
953 """Verifies the slot table is consistent. Used by unit tests. 954 955 @return: Returns a tuple, the first item being True if the slot table 956 is in a consistent state, and False otherwise. If the slot table is not 957 in a consistent state, the remaining values in the tuple are the 958 offending node, the offending changepoint, and the available resources 959 in the node at the changepoint. 960 @rtype: (C{bool}, 961 """ 962 # Get checkpoints 963 changepoints = set() 964 for rr in [x.value for x in self.reservations_by_start]: 965 changepoints.add(rr.start) 966 changepoints.add(rr.end) 967 changepoints = list(changepoints) 968 changepoints.sort() 969 970 for cp in changepoints: 971 avail = self.get_availability(cp) 972 for node in avail: 973 for resource in avail[node].capacity._single_instance: 974 if resource < 0: 975 return False, node, cp, avail[node].capacity 976 977 return True, None, None, None
978
979 # TODO: We don't need a class for this anymore, but removing it requires making a lot of 980 # changes in SlotTable. 981 -class Node(object):
982 """A physical node in the slot table.""" 983
984 - def __init__(self, capacity):
985 """Constructor 986 987 @param capacity: Capacity of the node 988 @type capacity: L{ResourceTuple} 989 """ 990 self.capacity = ResourceTuple.copy(capacity)
991
992 993 -class KeyValueWrapper(object):
994 """A wrapper around L{ResourceReservations} so we can use the bisect module 995 to manage ordered lists of reservations.""" 996
997 - def __init__(self, key, value):
998 """Constructor 999 1000 @param key: Time under which the reservation should be indexed 1001 @type key: L{DateTime} 1002 @param value: Resource reservation 1003 @type value: L{ResourceReservation} 1004 """ 1005 self.key = key 1006 self.value = value
1007
1008 - def __cmp__(self, other):
1009 return cmp(self.key, other.key)
1010
1011 1012 -class AvailabilityWindow(object):
1013 """An availability window 1014 1015 A particularly important operation with the slot table is determining the 1016 "availability window" of resources starting at a given time. In a nutshell, 1017 an availability window provides a convenient abstraction over the slot table, 1018 with methods to answer questions like "If I want to start a least at time T, 1019 are there enough resources available to start the lease?" "Will those resources 1020 be available until time T+t?" "If not, what's the longest period of time those 1021 resources will be available?" etc. 1022 1023 AvailabilityWindow objects are not meant to be created directly, and should be 1024 created through the SlotTable's get_availability_window method. 1025 1026 """
1027 - def __init__(self, slottable, time):
1028 """Constructor 1029 1030 An availability window starts at a specific time, provided to the constructor. 1031 1032 @param slottable: Slot table the availability window is based upon. 1033 @type slottable: L{SlotTable} 1034 @param time: Starting time of the availability window. 1035 @type time: L{DateTime} 1036 """ 1037 self.slottable = slottable 1038 self.logger = logging.getLogger("SLOTTABLE.WIN") 1039 self.time = time 1040 self.leases = set() 1041 1042 self.cp_list = [self.time] + self.slottable.get_changepoints_after(time) 1043 1044 # The availability window is stored using a sparse data structure that 1045 # allows quick access to information related to a specific changepoint in 1046 # the slottable. 1047 # 1048 # All this information is contained in the 'changepoints' attribute: 1049 # - The 'changepoints' attribute is a dictionary mapping changepoints 1050 # to ChangepointAvail objects. 1051 # - A ChangepointAvail contains information about availability in a 1052 # changepoint. More specifically, it contains ChangepointNodeAvail 1053 # 1054 # We also have an ordered list of changepoints in cp_list 1055 1056 # Create initial changepoint dictionary 1057 self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list]) 1058 1059 # Add the nodes to each ChangepointAvail object 1060 for cp in self.changepoints.values(): 1061 for node_id, node in self.slottable.nodes.items(): 1062 cp.add_node(node_id, node.capacity) 1063 1064 # Get reservations that will affect the availability window. 1065 rrs = self.slottable.get_reservations_after(time) 1066 rrs.sort(key=attrgetter("start")) 1067 1068 # This is an index into cp_list. We start at the first changepoint. 1069 pos = 0 1070 1071 # Fill in rest of the availability window. 1072 # For each reservation, we go through each changepoint the reservation 1073 # passes through, and we reduce the availability at that changepoint. 1074 # Note that the RRs are ordered by starting time. 1075 for rr in rrs: 1076 # Ignore nil-duration reservations 1077 if rr.start == rr.end: 1078 continue 1079 1080 # Advance pos to the changepoint corresponding to the RR's starting time. 1081 while rr.start >= self.time and self.cp_list[pos] != rr.start: 1082 pos += 1 1083 1084 # Add the lease to the set of leases included in the availability window 1085 lease = rr.lease 1086 self.leases.add(lease) 1087 1088 # Get the ChangepointAvail object for the starting changepoint. Note 1089 # that the RRs starting time might be before the start of the availability 1090 # window, in which case we just take the first ChangepointAvail. 1091 if rr.start >= self.time: 1092 start_cp = self.changepoints[rr.start] 1093 else: 1094 start_cp = self.changepoints[self.time] 1095 1096 # Add the RR's lease to the ChangepointAvail object 1097 start_cp.leases.add(lease) 1098 1099 # Decrease the availability at each node 1100 for node in rr.resources_in_pnode: 1101 start_cp.nodes[node].decr(rr.resources_in_pnode[node]) 1102 start_cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node]) 1103 1104 # Process the other changepoints covered by this RR. 1105 pos2 = pos + 1 1106 1107 while self.cp_list[pos2] < rr.end: 1108 cp = self.changepoints[self.cp_list[pos2]] 1109 cp.leases.add(lease) 1110 for node in rr.resources_in_pnode: 1111 cp.nodes[node].decr(rr.resources_in_pnode[node]) 1112 cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node]) 1113 1114 pos2 += 1 1115 1116 1117 # We link the ChangepointNodeAvail objects so each includes a 'pointer' to 1118 # the next changepoint in that node and the corresponding next 1119 # ChangepointNodeAvail 1120 1121 prev_nodeavail = {} 1122 for node_id, node in self.changepoints[self.time].nodes.items(): 1123 prev_nodeavail[node_id] = [node] 1124 1125 for cp in self.cp_list[1:]: 1126 for node_id, node in self.changepoints[cp].nodes.items(): 1127 prev_nodes = prev_nodeavail[node_id] 1128 if prev_nodes[-1].available == node.available and prev_nodes[-1].leases == node.leases: 1129 prev_nodes.append(node) 1130 else: 1131 for prev_node in prev_nodes: 1132 prev_node.next_cp = cp 1133 prev_node.next_nodeavail = node 1134 prev_nodeavail[node_id] = [node]
1135 1136
1137 - def get_availability(self, time, node):
1138 """Determines the available capacity at a given time and node 1139 1140 @param time: Time 1141 @type time: L{DateTime} 1142 @param node: Node id 1143 @type node: C{int} 1144 @return: Available capacity 1145 @rtype: L{ResourceTuple} 1146 """ 1147 return self.changepoints[time].nodes[node].available
1148 1149
1150 - def get_ongoing_availability(self, time, node, preempted_leases = []):
1151 """Determines the available capacity from a given time onwards. 1152 1153 This method returns an L{OngoingAvailability} object (see that class's 1154 documentation for more details) 1155 1156 @param time: Time 1157 @type time: L{DateTime} 1158 @param node: Node id 1159 @type node: C{int} 1160 @param preempted_leases: List of leases that can be preempted. 1161 @type preempted_leases: C{list} of L{Lease}s 1162 @return: Ongoing availability (see L{OngoingAvailability} documentation for more details) 1163 @rtype: L{OngoingAvailability} 1164 """ 1165 return OngoingAvailability(self.changepoints[time].nodes[node], preempted_leases)
1166 1167
1168 - def get_nodes_at(self, time):
1169 """Get all the nodes at a given time. 1170 1171 @param time: Time 1172 @type time: L{DateTime} 1173 @return: Node ids 1174 @rtype: C{list} of C{int} 1175 """ 1176 return self.changepoints[time].nodes.keys()
1177
1178 - def get_leases_at(self, node, time):
1179 """Get leases scheduled on a node at a given time. 1180 1181 @param node: Node id 1182 @type node: C{int} 1183 @param time: Time 1184 @type time: L{DateTime} 1185 """ 1186 return self.changepoints[time].nodes[node].leases
1187
1188 - def get_leases_between(self, from_time, until_time):
1189 """Get all the leases scheduled in an interval. 1190 1191 This interval is semi-closed: It includes the start time but not the 1192 end time of the interval. 1193 1194 @param from_time: Start of interval 1195 @type from_time: L{DateTime} 1196 @param until_time: End of interval 1197 @type until_time: L{DateTime} 1198 @return: Leases 1199 @rtype: C{list} of L{Lease}s 1200 """ 1201 leases = set() 1202 for cp in self.cp_list: 1203 if cp < from_time: 1204 continue 1205 if cp >= until_time: 1206 break 1207 leases.update(self.changepoints[cp].leases) 1208 return list(leases)
1209
1210 - def get_capacity_duration(self, node, time):
1211 """Determine how much longer the capacity in a node will 1212 last, starting at a given time. 1213 1214 @param node: Node id 1215 @type node: C{int} 1216 @param time: Time 1217 @type time: L{DateTime} 1218 @return: Duration the capacity will last. If it will last indefinitely, 1219 None is returned. 1220 @rtype: L{DateTimeDelta} 1221 """ 1222 next_cp = self.changepoints[time].nodes[node].next_cp 1223 if next_cp == None: 1224 return None 1225 else: 1226 return next_cp - time
1227
1228 1229 -class OngoingAvailability(object):
1230 """Information about ongoing availability in a node 1231 1232 An OngoingAvailability object contains information not just about 1233 the availability starting at a given time, but also how that availability 1234 diminishes over time. Thus, it the object to use when determining 1235 if, starting at a given time, it is possible to fit some capacity 1236 up to a certain time (with or without preempting other leases). 1237 1238 Typically, you will want to create an OngoingAvailability object using 1239 the get_ongoing_availability method in L{AvailabilityWindow} 1240 """ 1241
1242 - def __init__(self, node, preempted_leases):
1243 """Constructor 1244 1245 @param node: Node and time from which to start determing availability, represented 1246 by a valid L{ChangepointNodeAvail} object from the L{AvailabilityWindow}. 1247 @type node: L{ChangepointNodeAvail} 1248 @param preempted_leases: List of leases that can be preempted. 1249 @type preempted_leases: C{list} of L{Lease}s 1250 """ 1251 avails = [] 1252 prev_avail = None 1253 prev_node = None 1254 1255 # Traverse the list of ChangepointNodeAvails 1256 while node != None: 1257 if len(preempted_leases) == 0: 1258 available = ResourceTuple.copy(node.available) 1259 else: 1260 available = node.get_avail_withpreemption(preempted_leases) 1261 1262 if prev_avail != None and available.any_less(prev_avail.available): 1263 available.min(prev_avail.available) 1264 availentry = AvailEntry(available, None) 1265 avails.append(availentry) 1266 prev_avail.until = prev_node.next_cp 1267 prev_avail = availentry 1268 elif prev_avail == None: 1269 availentry = AvailEntry(available, None) 1270 avails.append(availentry) 1271 prev_avail = availentry 1272 1273 prev_node = node 1274 node = node.next_nodeavail 1275 1276 self.avail_list = avails
1277 1278
1279 - def fits(self, capacity, until):
1280 """Determine if there is enough capacity until a given time. 1281 1282 @param capacity: Capacity 1283 @type capacity: L{ResourceTuple} 1284 @param until: Time 1285 @type until: L{DateTime} 1286 @return: True if the given capacity can fit until the given time. False otherwise. 1287 @rtype: C{bool} 1288 """ 1289 for avail in self.avail_list: 1290 if avail.until == None or avail.until >= until: 1291 return capacity.fits_in(avail.available)
1292
1293 - def latest_fit(self, capacity):
1294 """Determine for how long we can fit a given capacity. 1295 1296 @param capacity: Capacity 1297 @type capacity: L{ResourceTuple} 1298 @return: The latest time at which the given capacity fits in the node. 1299 @rtype: L{DateTime} 1300 """ 1301 prev = None 1302 for avail in self.avail_list: 1303 if not capacity.fits_in(avail.available): 1304 return prev 1305 else: 1306 prev = avail.until
1307
1308 1309 # TODO: Document these classes too. These are pretty simple and only used internally by 1310 # Haizea (there's no good reason why someone writing a mapper or a policy would have to 1311 # use them), so for now they should be pretty self-explanatory. 1312 1313 -class AvailEntry(object):
1314 - def __init__(self, available, until):
1315 self.available = available 1316 self.until = until
1317
1318 -class ChangepointAvail(object):
1319 - def __init__(self):
1320 self.nodes = {} 1321 self.leases = set()
1322
1323 - def add_node(self, node, capacity):
1324 self.nodes[node] = ChangepointNodeAvail(capacity)
1325
1326 -class ChangepointNodeAvail(object):
1327 - def __init__(self, capacity):
1328 self.capacity = capacity 1329 self.available = ResourceTuple.copy(capacity) 1330 self.leases = set() 1331 self.available_if_preempting = {} 1332 self.next_cp = None 1333 self.next_nodeavail = None
1334
1335 - def decr(self, capacity):
1336 self.available.decr(capacity)
1337
1338 - def add_lease(self, lease, capacity):
1339 if not lease in self.leases: 1340 self.leases.add(lease) 1341 self.available_if_preempting[lease] = ResourceTuple.copy(capacity) 1342 else: 1343 self.available_if_preempting[lease].incr(capacity)
1344
1345 - def get_avail_withpreemption(self, leases):
1346 avail = ResourceTuple.copy(self.capacity) 1347 for l in self.available_if_preempting: 1348 if not l in leases: 1349 avail.decr(self.available_if_preempting[l]) 1350 return avail
1351