1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import haizea.common.constants as constants
20 from haizea.common.utils import xmlrpc_marshall_singlevalue
21 import bisect
22 import logging
23 from operator import attrgetter
24
25 """This module provides an in-memory slot table data structure.
26
27 A slot table is essentially just a collection of resource reservations, and is
28 implemented using the classes ResourceTuple, ResourceReservation, Node, and
29 KeyValueWrapper, and SlotTable. See the documentation in these classes for
30 additional details.
31
32 This module also provides an "availability window" implementation, which provides
33 easier access to the contents of a slot table (by determining the availability
34 in each node starting at a given time). The availability window is implemented
35 in classes ChangepointAvail, ChangepointNodeAvail, AvailEntry, OngoingAvailability,
36 and AvailabilityWindow.
37
38 """
42 """A resource tuple
43
44 This is an internal data structure used by the slot table. To
45 manipulate "quantities of resources" in Haizea, use L{Capacity}
46 instead.
47
48 A resource tuple represents a quantity of resources. For example,
49 "50% of a CPU and 512 MB of memory" is a resource tuple with two
50 components (CPU and memory). The purpose of having a class for this
51 (instead of a simpler structure, like a list or dictionary) is to
52 be able to perform certain basic operations, like determining whether
53 one tuple "fits" in another (e.g., the previous tuple fits in
54 "100% of CPU and 1024 MB of memory", but in "100% of CPU and 256 MB
55 of memory".
56
57 A resource tuple is tightly coupled to a particular slot table. So,
58 if a slot table defines that each node has "CPUs, memory, and disk space",
59 the resource tuples will depend on this definition (the specification
60 of valid resources is not repeated in each resource tuple object).
61
62 Resources in a resource tuple can be of two types: single instance and
63 multi instance. Memory is an example of a single instance resource: there
64 is only "one" memory in a node (with some capacity). CPUs are an example
65 of a multi instance resource: there can be multiple CPUs in a single node,
66 and each CPU can be used to satisfy a requirement for a CPU.
67
68 """
69 SINGLE_INSTANCE = 1
70 MULTI_INSTANCE = 2
71
72 - def __init__(self, slottable, single_instance, multi_instance = None):
73 """Constructor. Should only be called from SlotTable.
74
75 The constructor is not meant to be called directly and should only
76 be called from SlotTable.
77
78 The res parameter is a list with the quantities of each resource.
79 The list starts with the single-instance resources, followed
80 by the multi-instance resources. The slottable contains information
81 about the layout of this list:
82
83 - The mapping of resource to position in the list is contained in attribute
84 rtuple_restype2pos of the slottable.
85 - For single-instance resources, the position returned by this mapping contains
86 the quantity.
87 - For multi-instance resources, the position returns the
88 quantity of the first instance. The number of instances of a given resource
89 is contained in attribute rtuple_nres of the slottable.
90 - The number of single-instance resources is contained in attribute rtuple_len of
91 the slottable.
92
93 @param slottable: Slot table
94 @type slottable: L{SlotTable}
95 @param single_instance: Quantities of single instance resources
96 @type single_instance: C{list}
97 @param multi_instance: Quantities of multi instance resources
98 @type multi_instance: C{dict}
99 """
100
101 self.slottable = slottable
102 """
103 Slot table
104 @type: SlotTable
105 """
106
107 self._single_instance = single_instance
108 """
109 Resource quantities
110 @type: list
111 """
112
113 self._multi_instance = multi_instance
114 """
115 Resource quantities
116 @type: dict
117 """
118
119 @classmethod
121 """Creates a deep copy of a resource tuple
122
123 @param rt: Resource tuple to copy
124 @type rt: L{ResourceTuple}
125 @return: Copy of resource tuple
126 @rtype: L{ResourceTuple}
127 """
128 return cls(rt.slottable, rt._single_instance[:], dict([(pos,l[:]) for pos, l in rt._multi_instance.items()]))
129
130
132 """Determines if this resource tuple fits in a given resource tuple
133
134 @param rt: Resource tuple
135 @type rt: L{ResourceTuple}
136 @return: True if this resource tuple fits in rt. False otherwise.
137 @rtype: bool
138 """
139
140
141
142
143 for i in xrange(self.slottable.rtuple_nsingle):
144 if self._single_instance[i] > rt._single_instance[i]:
145 return False
146
147
148
149
150
151 if self.slottable.rtuple_has_multiinst:
152
153
154 _multi_instance2 = dict([(i, l[:]) for (i,l) in rt._multi_instance.items()])
155 for (pos, l) in self._multi_instance.items():
156 insts = _multi_instance2[pos]
157 for quantity in l:
158 fits = False
159 for i in range(len(insts)):
160 if quantity <= insts[i]:
161 fits = True
162 insts[i] -= quantity
163 break
164 if fits == False:
165 return False
166 return True
167
168
169 - def incr(self, rt):
170 """Increases the resource tuple with the amounts in a given resource tuple
171
172 @param rt: Resource tuple
173 @type rt: L{ResourceTuple}
174 """
175 for slottype in xrange(self.slottable.rtuple_nsingle):
176 self._single_instance[slottype] += rt._single_instance[slottype]
177 if self.slottable.rtuple_has_multiinst:
178 for (pos, l) in rt._multi_instance.items():
179 self._multi_instance[pos] += l[:]
180
181 - def decr(self, rt):
182 """Decreases the resource tuple with the amounts in a given resource tuple
183
184 Precondition: rt must be known to fit in the resource tuple (via fits_in)
185
186 @param rt: Resource tuple
187 @type rt: L{ResourceTuple}
188 """
189 for slottype in xrange(self.slottable.rtuple_nsingle):
190 self._single_instance[slottype] -= rt._single_instance[slottype]
191
192
193
194
195
196 if self.slottable.rtuple_has_multiinst:
197 for (pos, l) in rt._multi_instance.items():
198 insts = self._multi_instance[pos]
199 for quantity in l:
200 fits = False
201 for i in range(len(insts)):
202 if quantity <= insts[i]:
203 fits = True
204 insts[i] -= quantity
205 break
206 if fits == False:
207
208 raise Exception, "Can't decrease"
209
210
211
213 """Gets the amount of a given resource type.
214
215 @param restype: Resource type
216 @type restype: C{str}
217 @return: For single-instance resources, returns the amount. For multi-instance
218 resources, returns the sum of all the instances.
219 @rtype: int
220 """
221 pos = self.slottable.rtuple_restype2pos[restype]
222 if pos < self.slottable.rtuple_nsingle:
223 return self._single_instance[pos]
224 else:
225 return sum(self._multi_instance[pos])
226
227
229 """Determines if any amount of a resource is less than that in a given resource tuple
230
231 In the case of multi-instance resources, this method will only work when both
232 resource tuples have the same number of instances, and makes the comparison
233 instance by instance. For example, if a CPU resource has two instances A and B:
234 ___A__B_
235 R1|75 50
236 R2|50 75
237
238 R2.any_less(R1) returns True. However:
239
240 ___A__B_
241 R1|75 50
242 R2|75 50
243
244 R2.any_less(R1) returns False, even though one instance (R2.B) is less than another (R1.A)
245
246 @param rt: Resource tuple
247 @type rt: L{ResourceTuple}
248 @return: True if these is any resource such that its amount is less than that in rt.
249 @rtype: int
250 """
251 for i in xrange(self.slottable.rtuple_nsingle):
252 if self._single_instance[i] < rt._single_instance[i]:
253 return True
254
255 if self.slottable.rtuple_has_multiinst:
256 for (pos, l) in self._multi_instance.items():
257 for i, x in l:
258 if l[i] < rt._multi_instance[pos][i]:
259 return True
260
261 return False
262
264 """Modifies the resource amounts to the minimum of the current amount and that in the given resource tuple
265
266 As in any_less, for multi-instance resources this method will only work when both
267 resource tuples have the same number of instances, and makes the change
268 instance by instance.
269
270 @param rt: Resource tuple
271 @type rt: L{ResourceTuple}
272 """
273 for i in xrange(self.slottable.rtuple_nsingle):
274 self._single_instance[i] = min(self._single_instance[i], rt._single_instance[i])
275
276 if self.slottable.rtuple_has_multiinst:
277 for (pos, l) in self._multi_instance.items():
278 for i, x in l:
279 l[i] = min(l[i], rt._multi_instance[pos][i])
280
281
283 """Creates a string representation of the resource tuple
284
285 @return: String representation
286 @rtype: C{str}
287 """
288 r=""
289 for i, x in enumerate(self._single_instance):
290 r += "%s:%i " % (i, x)
291 if self.slottable.rtuple_has_multiinst:
292 r+= `self._multi_instance`
293 return r
294
296 """Determines if the resource tuple is equal to a given resource tuple
297
298 @return: True if they equal, False otherwise
299 @rtype: C{str}
300 """
301 return self._single_instance == rt._single_instance and self._multi_instance == rt._multi_instance
302
304 """Returns state necessary to unpickle a ResourceTuple object
305
306 """
307 return (self._single_instance, self._multi_instance)
308
310 """Restores state when unpickling a ResourceTuple object
311
312 After unpickling, the object still has to be bound to a slottable.
313 """
314 self.slottable = None
315 self._single_instance = state[0]
316 self._multi_instance = state[1]
317
320 """A resource reservation
321
322 A resource reservation (or RR) is a data structure representing resources
323 (represented as a ResourceTuple) reserved across multiple physical nodes.
324 (each node can have a different resource tuple; e.g., 1 CPU and
325 512 MB of memory in node 1 and 2 CPUs and 1024 MB of memory in node 2). An RR
326 has a specific start and end time for all the nodes. Thus, if some nodes are
327 reserved for an interval of time, and other nodes are reserved for a different
328 interval (even if these reservations are for the same lease), two separate RRs
329 would have to be added to the slot table.
330
331 This class isn't used by itself but rather serves as the base class for
332 VM reservations, image transfer reservations, etc.
333 """
334
335
336 STATE_SCHEDULED = 0
337 STATE_ACTIVE = 1
338 STATE_DONE = 2
339
340
341 state_str = {STATE_SCHEDULED : "Scheduled",
342 STATE_ACTIVE : "Active",
343 STATE_DONE : "Done"}
344
345 - def __init__(self, lease, start, end, res):
346 """Constructor
347
348 @param lease: Lease this resource reservation belongs to
349 @type lease: L{Lease}
350 @param start: Starting time of the reservation
351 @type start: L{DateTime}
352 @param end: Ending time of the reservation
353 @type end: L{DateTime}
354 @param res: A dictionary mapping physical node ids to ResourceTuple objects
355 @type res: C{dict}
356 """
357 self.lease = lease
358 self.start = start
359 self.end = end
360 self.state = None
361 self.resources_in_pnode = res
362
364 """Prints the contents of the RR to the log
365
366 @param loglevel: Log level
367 @type loglevel: C{str}
368 """
369 logger = logging.getLogger("LEASES")
370 logger.log(loglevel, "Start : %s" % self.start)
371 logger.log(loglevel, "End : %s" % self.end)
372 logger.log(loglevel, "State : %s" % ResourceReservation.state_str[self.state])
373 logger.log(loglevel, "Resources : \n %s" % "\n ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]))
374
377 """Slot table
378
379 The slot table is one of the main data structures in Haizea (if not *the* main one).
380 It tracks the capacity of the physical nodes on which leases can be scheduled,
381 contains the resource reservations of all the leases, and allows efficient access
382 to them.
383
384 However, the information in the slot table is stored in a somewhat 'raw' format
385 (a collection of L{ResourceReservation}s) which can be hard to use directly. So,
386 one of the responsabilities of the slot table is to efficiently generate "availability
387 windows", which are a more convenient abstraction over available resources. See
388 AvailabilityWindow for more details. When writing a custom mapper, most read-only
389 interactions with the slot table should be through availability windows, which can
390 be obtained through the get_availability_window method of SlotTable.
391
392 The slot table also depends on classes L{Node} and L{KeyValueWrapper}.
393
394 Since querying resource reservations is the most frequent operation in Haizea, the
395 slot table tries to optimize access to them as much as possible. In particular,
396 we will often need to quickly access reservations starting or ending at a specific
397 time (or in an interval of time). The current slot table implementation stores the RRs
398 in two ordered lists: one by starting time and another by ending time. Access is done by
399 binary search in O(log n) time using the C{bisect} module. Insertion and removal
400 require O(n) time, since lists are implemented internally as arrays in CPython.
401 We could improve these times in the future by using a tree structure (which Python
402 doesn't have natively, so we'd have to include our own tree implementation), although
403 slot table accesses far outweight insertion and removal operations.
404
405 """
406
408 """Constructor
409
410 The slot table will be initially empty, without any physical nodes. These have to be added
411 with add_node.
412
413 @param resource_types: A dictionary mapping resource types to ResourceTuple.SINGLE_INSTANCE or
414 ResourceTuple.MULTI_INSTANCE (depending on whether the resource is single- or multi-instance)
415 @type resource_types: C{dict}
416 """
417 self.logger = logging.getLogger("SLOT")
418 self.nodes = {}
419 self.reservations_by_start = []
420 self.reservations_by_end = []
421 self.resource_types = resource_types
422 self.availabilitycache = {}
423 self.awcache_time = None
424 self.awcache = None
425 self.__dirty()
426
427
428 res_singleinstance = [rt for rt,ninst in resource_types if ninst == ResourceTuple.SINGLE_INSTANCE]
429 res_multiinstance = [(rt,ninst) for rt,ninst in resource_types if ninst == ResourceTuple.MULTI_INSTANCE]
430 self.rtuple_nsingle = len(res_singleinstance)
431 self.rtuple_nmultiple = len(res_multiinstance)
432 self.rtuple_has_multiinst = self.rtuple_nmultiple > 0
433 self.rtuple_restype2pos = dict([(rt,i) for (i,rt) in enumerate(res_singleinstance)])
434 pos = self.rtuple_nsingle
435 for rt, ninst in res_multiinstance:
436 self.rtuple_restype2pos[rt] = pos
437 pos = pos + 1
438
439 - def add_node(self, node_id, resourcetuple):
440 """Add a new physical node to the slot table
441
442 @param node_id: Resource type
443 @type node_id: C{int}
444 @param resourcetuple: Resource type
445 @type resourcetuple: L{ResourceTuple}
446 """
447 self.nodes[node_id] = Node(resourcetuple)
448
450 """Create an empty resource tuple
451
452 @return: Empty resource tuple, single-instance resources set to zero, multi-instance resources
453 set to zero instances.
454 @rtype: L{ResourceTuple}
455 """
456 return ResourceTuple(self, [0] * self.rtuple_nsingle, dict((pos,[]) for pos in xrange(self.rtuple_nsingle, self.rtuple_nsingle+self.rtuple_nmultiple)))
457
459 """Converts a L{Capacity} object to a L{ResourceTuple}
460
461 @param capacity: Resource capacity
462 @type capacity: L{Capacity}
463 @return: Resource tuple
464 @rtype: L{ResourceTuple}
465 """
466 single_instance = [0] * self.rtuple_nsingle
467 multi_instance = {}
468 for restype in capacity.get_resource_types():
469 pos = self.rtuple_restype2pos[restype]
470 ninst = capacity.ninstances[restype]
471 if pos < self.rtuple_nsingle:
472 single_instance[pos] = capacity.get_quantity(restype)
473 else:
474 multi_instance[pos] = []
475 for i in range(ninst):
476 multi_instance[pos].append(capacity.get_quantity_instance(restype, i))
477
478 rt = ResourceTuple(self, single_instance, multi_instance)
479
480 return rt
481
483 """Creates an availability window starting at a given time.
484
485 @param start: Start of availability window.
486 @type start: L{DateTime}
487 @return: Availability window
488 @rtype: L{AvailabilityWindow}
489 """
490
491
492
493
494
495
496
497 if self.awcache == None or start < self.awcache_time or (start >= self.awcache_time and not self.awcache.changepoints.has_key(start)):
498
499 self.__get_aw_cache_miss(start)
500 return self.awcache
501
503 """Computes the available resources on all nodes at a given time.
504
505 @param time: Time at which to determine availability.
506 @type time: L{DateTime}
507 @param min_capacity: If not None, only include the nodes that have at least
508 this minimum capacity.
509 @type min_capacity: L{ResourceTuple}
510 @return: A dictionary mapping physical node id to a L{Node} object (which
511 contains the available capacity of that physical node at the specified time)
512 @rtype: C{dict}
513 """
514 if not self.availabilitycache.has_key(time):
515 self.__get_availability_cache_miss(time)
516
517
518 nodes = self.availabilitycache[time]
519
520
521 if min_capacity != None:
522 newnodes = {}
523 for n, node in nodes.items():
524 if min_capacity.fits_in(node.capacity):
525 newnodes[n]=node
526 else:
527 pass
528 nodes = newnodes
529
530 return nodes
531
533 """Determines if the slot table is empty (has no reservations)
534
535 @return: True if there are no reservations, False otherwise.
536 @rtype: C{bool}
537 """
538 return (len(self.reservations_by_start) == 0)
539
541 """Determines if a resource type is "full" at a specified time.
542
543 A resource type is considered to be "full" if its available capacity is zero
544 in all the physical nodes in the slot table.
545
546 @param time: time at which to check for fullness.
547 @type time: L{DateTime}
548 @param restype: Resource type
549 @type restype: C{str}
550 @return: True if the resource type is full, False otherwise.
551 @rtype: C{bool}
552 """
553 nodes = self.get_availability(time)
554 avail = sum([node.capacity.get_by_type(restype) for node in nodes.values()])
555 return (avail == 0)
556
558 """Determines the aggregate capacity of a given resource type across all nodes.
559
560 @param restype: Resource type
561 @type restype: C{str}
562 @return: Total capacity
563 @rtype: C{int}
564 """
565 return sum([n.capacity.get_by_type(restype) for n in self.nodes.values()])
566
568 """Adds a L{ResourceReservation} to the slot table.
569
570 @param rr: Resource reservation
571 @type rr: L{ResourceReservation}
572 """
573 startitem = KeyValueWrapper(rr.start, rr)
574 enditem = KeyValueWrapper(rr.end, rr)
575 bisect.insort(self.reservations_by_start, startitem)
576 bisect.insort(self.reservations_by_end, enditem)
577 self.__dirty()
578
579
581 """Update a L{ResourceReservation} to the slot table.
582
583 Since the start and end time are used to index the reservations,
584 the old times have to be provided so we can find the old reservation
585 and make the changes.
586
587 @param rr: Resource reservation with updated values (including potentially new start and/or end times)
588 @type rr: L{ResourceReservation}
589 @param old_start: Start time of reservation before update.
590 @type old_start: L{DateTime}
591 @param old_end: End time of reservation before update.
592 @type old_end: L{DateTime}
593 """
594
595 self.__remove_reservation(rr, old_start, old_end)
596 self.add_reservation(rr)
597 self.__dirty()
598
599
601 """Remove a L{ResourceReservation} from the slot table.
602
603 @param rr: Resource reservation
604 @type rr: L{ResourceReservation}
605 """
606 self.__remove_reservation(rr, rr.start, rr.end)
607
608
610 """Get all reservations at a specified time
611
612 @param time: Time
613 @type time: L{DateTime}
614 @return: Resource reservations
615 @rtype: C{list} of L{ResourceReservation}s
616 """
617 item = KeyValueWrapper(time, None)
618 startpos = bisect.bisect_right(self.reservations_by_start, item)
619 bystart = set([x.value for x in self.reservations_by_start[:startpos]])
620 endpos = bisect.bisect_right(self.reservations_by_end, item)
621 byend = set([x.value for x in self.reservations_by_end[endpos:]])
622 res = bystart & byend
623 return list(res)
624
626 """Get all reservations starting in a specified interval.
627
628 The interval is closed: it includes the starting time and the ending time.
629
630 @param start: Start of interval
631 @type start: L{DateTime}
632 @param end: End of interval
633 @type end: L{DateTime}
634 @return: Resource reservations
635 @rtype: C{list} of L{ResourceReservation}s
636 """
637 startitem = KeyValueWrapper(start, None)
638 enditem = KeyValueWrapper(end, None)
639 startpos = bisect.bisect_left(self.reservations_by_start, startitem)
640 endpos = bisect.bisect_right(self.reservations_by_start, enditem)
641 res = [x.value for x in self.reservations_by_start[startpos:endpos]]
642 return res
643
645 """Get all reservations ending in a specified interval.
646
647 The interval is closed: it includes the starting time and the ending time.
648
649 @param start: Start of interval
650 @type start: L{DateTime}
651 @param end: End of interval
652 @type end: L{DateTime}
653 @return: Resource reservations
654 @rtype: C{list} of L{ResourceReservation}s
655 """
656 startitem = KeyValueWrapper(start, None)
657 enditem = KeyValueWrapper(end, None)
658 startpos = bisect.bisect_left(self.reservations_by_end, startitem)
659 endpos = bisect.bisect_right(self.reservations_by_end, enditem)
660 res = [x.value for x in self.reservations_by_end[startpos:endpos]]
661 return res
662
664 """Get all reservations starting after (but not on) a specified time
665
666 @param start: Time
667 @type start: L{DateTime}
668 @return: Resource reservations
669 @rtype: C{list} of L{ResourceReservation}s
670 """
671 startitem = KeyValueWrapper(start, None)
672 startpos = bisect.bisect_right(self.reservations_by_start, startitem)
673 res = [x.value for x in self.reservations_by_start[startpos:]]
674 return res
675
677 """Get all reservations ending after (but not on) a specified time
678
679 @param end: Time
680 @type end: L{DateTime}
681 @return: Resource reservations
682 @rtype: C{list} of L{ResourceReservation}s
683 """
684 startitem = KeyValueWrapper(end, None)
685 startpos = bisect.bisect_right(self.reservations_by_end, startitem)
686 res = [x.value for x in self.reservations_by_end[startpos:]]
687 return res
688
690 """Get all reservations starting on or after a specified time
691
692 @param start: Time
693 @type start: L{DateTime}
694 @return: Resource reservations
695 @rtype: C{list} of L{ResourceReservation}s
696 """
697 startitem = KeyValueWrapper(start, None)
698 startpos = bisect.bisect_left(self.reservations_by_start, startitem)
699 res = [x.value for x in self.reservations_by_start[startpos:]]
700 return res
701
703 """Get all reservations ending on or after a specified time
704
705 @param end: Time
706 @type end: L{DateTime}
707 @return: Resource reservations
708 @rtype: C{list} of L{ResourceReservation}s
709 """
710 startitem = KeyValueWrapper(end, None)
711 startpos = bisect.bisect_left(self.reservations_by_end, startitem)
712 res = [x.value for x in self.reservations_by_end[startpos:]]
713 return res
714
715
717 """Get all reservations starting at a specified time
718
719 @param time: Time
720 @type time: L{DateTime}
721 @return: Resource reservations
722 @rtype: C{list} of L{ResourceReservation}s
723 """
724 return self.get_reservations_starting_between(time, time)
725
727 """Get all reservations ending at a specified time
728
729 @param time: Time
730 @type time: L{DateTime}
731 @return: Resource reservations
732 @rtype: C{list} of L{ResourceReservation}s
733 """
734 return self.get_reservations_ending_between(time, time)
735
737 """Get all reservations that take place after (but not on) a
738 specified time. i.e., all reservations starting or ending after that time.
739
740 @param time: Time
741 @type time: L{DateTime}
742 @return: Resource reservations
743 @rtype: C{list} of L{ResourceReservation}s
744 """
745 bystart = set(self.get_reservations_starting_after(time))
746 byend = set(self.get_reservations_ending_after(time))
747 return list(bystart | byend)
748
750 """Get all reservations that take place on or after a
751 specified time. i.e., all reservations starting or ending after that time.
752
753 @param time: Time
754 @type time: L{DateTime}
755 @return: Resource reservations
756 @rtype: C{list} of L{ResourceReservation}s
757 """
758 bystart = set(self.get_reservations_starting_on_or_after(time))
759 byend = set(self.get_reservations_ending_on_or_after(time))
760 return list(bystart | byend)
761
763 """Get all the changepoints after a given time.
764
765 A changepoint is any time anything is scheduled to change in the
766 slottable (a reservation starting or ending).
767
768 @param after: Time
769 @type after: L{DateTime}
770 @param until: If not None, only include changepoints until this time.
771 @type until: L{DateTime}
772 @param nodes: If not None, only include changepoints affecting these nodes.
773 @type nodes: C{list} of C{int}s
774 @return: Changepoints
775 @rtype: C{list} of L{DateTime}s
776 """
777 changepoints = set()
778 res = self.get_reservations_after(after)
779 for rr in res:
780 if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
781 if rr.start > after:
782 changepoints.add(rr.start)
783 if rr.end > after:
784 changepoints.add(rr.end)
785 changepoints = list(changepoints)
786 if until != None:
787 changepoints = [c for c in changepoints if c < until]
788 changepoints.sort()
789 return changepoints
790
792 """Get the first changepoint after a given time.
793
794 @param time: Time
795 @type time: L{DateTime}
796 @return: Changepoints
797 @rtype: L{DateTime}
798 """
799 item = KeyValueWrapper(time, None)
800
801 startpos = bisect.bisect_right(self.reservations_by_start, item)
802 if startpos == len(self.reservations_by_start):
803 time1 = None
804 else:
805 time1 = self.reservations_by_start[startpos].value.start
806
807 endpos = bisect.bisect_right(self.reservations_by_end, item)
808 if endpos == len(self.reservations_by_end):
809 time2 = None
810 else:
811 time2 = self.reservations_by_end[endpos].value.end
812
813 if time1==None and time2==None:
814 return None
815 elif time1==None:
816 return time2
817 elif time2==None:
818 return time1
819 else:
820 return min(time1, time2)
821
822
823
825 """Get the first premature end time after a given time. ONLY FOR SIMULATION.
826
827 In simulation, some reservations can end prematurely, and this information
828 is stored in the slot table (in real life, this information is not
829 known a priori).
830
831 @param after: Time
832 @type after: L{DateTime}
833 @return: Next premature end
834 @rtype: L{DateTime}
835 """
836 from haizea.core.scheduler.vm_scheduler import VMResourceReservation
837
838 res = [i.value for i in self.reservations_by_end if isinstance(i.value, VMResourceReservation) and i.value.prematureend > after]
839 if len(res) > 0:
840 prematureends = [r.prematureend for r in res]
841 prematureends.sort()
842 return prematureends[0]
843 else:
844 return None
845
846
848 """Gets all the L{ResourceReservation}s that are set to end prematurely at a given time. ONLY FOR SIMULATION
849
850 @param time: Time
851 @type time: L{DateTime}
852 @return: Resource reservations
853 @rtype: C{list} of L{ResourceReservation}s
854 """
855 from haizea.core.scheduler.vm_scheduler import VMResourceReservation
856 return [i.value for i in self.reservations_by_end if isinstance(i.value, VMResourceReservation) and i.value.prematureend == time]
857
858
860 """Remove a L{ResourceReservation} from the slot table.
861
862 @param rr: Resource reservation
863 @type rr: L{ResourceReservation}
864 @param start: Start time under which the reservation is indexed, in cases where the RR
865 has changed (this parameter is only used when calling this method from update_reservation)
866 @type start: L{DateTime}
867 @param end: Same as start, but for the end time for the RR.
868 @type end: L{DateTime}
869 """
870 if start == None:
871 start = rr.start
872 if end == None:
873 end = rr.end
874 posstart = self.__get_reservation_index(self.reservations_by_start, rr, start)
875 posend = self.__get_reservation_index(self.reservations_by_end, rr, end)
876 self.reservations_by_start.pop(posstart)
877 self.reservations_by_end.pop(posend)
878 self.__dirty()
879
880
882 """Computes availability at a given time, and caches it.
883
884 Called when get_availability can't use availabilities in the cache.
885
886 @param time: Time at which to determine availability.
887 @type time: L{DateTime}
888 """
889 allnodes = set(self.nodes.keys())
890 nodes = {}
891 reservations = self.get_reservations_at(time)
892
893
894 for r in reservations:
895 for node in r.resources_in_pnode:
896 if not nodes.has_key(node):
897 n = self.nodes[node]
898 nodes[node] = Node(n.capacity)
899 nodes[node].capacity.decr(r.resources_in_pnode[node])
900
901
902 missing = allnodes - set(nodes.keys())
903 for node in missing:
904 nodes[node] = self.nodes[node]
905
906 self.availabilitycache[time] = nodes
907
909 """Computes availability window at a given time, and caches it.
910
911 Called when get_availability_window can't use the cached availability window.
912
913 @param time: Start of availability window.
914 @type time: L{DateTime}
915 """
916 self.awcache = AvailabilityWindow(self, time)
917 self.awcache_time = time
918
920 """Empties the caches.
921
922 Should be called whenever the caches become dirty (e.g., when a reservation
923 is added to the slot table).
924
925 """
926
927
928 self.availabilitycache = {}
929 self.awcache_time = None
930 self.awcache = None
931
933 """Find the index of a resource reservation in one of the internal reservation lists
934
935 @param rlist: Resource reservation
936 @type rlist: C{list} of L{ResourceReservation}s
937 @param rr: Resource reservation to look up
938 @type rr: L{ResourceReservation}
939 @param time: time the reservation is indexed under
940 @type time: L{DateTime}
941 """
942 item = KeyValueWrapper(time, None)
943 pos = bisect.bisect_left(rlist, item)
944 found = False
945 while not found:
946 if rlist[pos].value == rr:
947 found = True
948 else:
949 pos += 1
950 return pos
951
953 """Verifies the slot table is consistent. Used by unit tests.
954
955 @return: Returns a tuple, the first item being True if the slot table
956 is in a consistent state, and False otherwise. If the slot table is not
957 in a consistent state, the remaining values in the tuple are the
958 offending node, the offending changepoint, and the available resources
959 in the node at the changepoint.
960 @rtype: (C{bool},
961 """
962
963 changepoints = set()
964 for rr in [x.value for x in self.reservations_by_start]:
965 changepoints.add(rr.start)
966 changepoints.add(rr.end)
967 changepoints = list(changepoints)
968 changepoints.sort()
969
970 for cp in changepoints:
971 avail = self.get_availability(cp)
972 for node in avail:
973 for resource in avail[node].capacity._single_instance:
974 if resource < 0:
975 return False, node, cp, avail[node].capacity
976
977 return True, None, None, None
978
979
980
981 -class Node(object):
982 """A physical node in the slot table."""
983
985 """Constructor
986
987 @param capacity: Capacity of the node
988 @type capacity: L{ResourceTuple}
989 """
990 self.capacity = ResourceTuple.copy(capacity)
991
994 """A wrapper around L{ResourceReservations} so we can use the bisect module
995 to manage ordered lists of reservations."""
996
998 """Constructor
999
1000 @param key: Time under which the reservation should be indexed
1001 @type key: L{DateTime}
1002 @param value: Resource reservation
1003 @type value: L{ResourceReservation}
1004 """
1005 self.key = key
1006 self.value = value
1007
1009 return cmp(self.key, other.key)
1010
1013 """An availability window
1014
1015 A particularly important operation with the slot table is determining the
1016 "availability window" of resources starting at a given time. In a nutshell,
1017 an availability window provides a convenient abstraction over the slot table,
1018 with methods to answer questions like "If I want to start a least at time T,
1019 are there enough resources available to start the lease?" "Will those resources
1020 be available until time T+t?" "If not, what's the longest period of time those
1021 resources will be available?" etc.
1022
1023 AvailabilityWindow objects are not meant to be created directly, and should be
1024 created through the SlotTable's get_availability_window method.
1025
1026 """
1028 """Constructor
1029
1030 An availability window starts at a specific time, provided to the constructor.
1031
1032 @param slottable: Slot table the availability window is based upon.
1033 @type slottable: L{SlotTable}
1034 @param time: Starting time of the availability window.
1035 @type time: L{DateTime}
1036 """
1037 self.slottable = slottable
1038 self.logger = logging.getLogger("SLOTTABLE.WIN")
1039 self.time = time
1040 self.leases = set()
1041
1042 self.cp_list = [self.time] + self.slottable.get_changepoints_after(time)
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057 self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
1058
1059
1060 for cp in self.changepoints.values():
1061 for node_id, node in self.slottable.nodes.items():
1062 cp.add_node(node_id, node.capacity)
1063
1064
1065 rrs = self.slottable.get_reservations_after(time)
1066 rrs.sort(key=attrgetter("start"))
1067
1068
1069 pos = 0
1070
1071
1072
1073
1074
1075 for rr in rrs:
1076
1077 if rr.start == rr.end:
1078 continue
1079
1080
1081 while rr.start >= self.time and self.cp_list[pos] != rr.start:
1082 pos += 1
1083
1084
1085 lease = rr.lease
1086 self.leases.add(lease)
1087
1088
1089
1090
1091 if rr.start >= self.time:
1092 start_cp = self.changepoints[rr.start]
1093 else:
1094 start_cp = self.changepoints[self.time]
1095
1096
1097 start_cp.leases.add(lease)
1098
1099
1100 for node in rr.resources_in_pnode:
1101 start_cp.nodes[node].decr(rr.resources_in_pnode[node])
1102 start_cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node])
1103
1104
1105 pos2 = pos + 1
1106
1107 while self.cp_list[pos2] < rr.end:
1108 cp = self.changepoints[self.cp_list[pos2]]
1109 cp.leases.add(lease)
1110 for node in rr.resources_in_pnode:
1111 cp.nodes[node].decr(rr.resources_in_pnode[node])
1112 cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node])
1113
1114 pos2 += 1
1115
1116
1117
1118
1119
1120
1121 prev_nodeavail = {}
1122 for node_id, node in self.changepoints[self.time].nodes.items():
1123 prev_nodeavail[node_id] = [node]
1124
1125 for cp in self.cp_list[1:]:
1126 for node_id, node in self.changepoints[cp].nodes.items():
1127 prev_nodes = prev_nodeavail[node_id]
1128 if prev_nodes[-1].available == node.available and prev_nodes[-1].leases == node.leases:
1129 prev_nodes.append(node)
1130 else:
1131 for prev_node in prev_nodes:
1132 prev_node.next_cp = cp
1133 prev_node.next_nodeavail = node
1134 prev_nodeavail[node_id] = [node]
1135
1136
1138 """Determines the available capacity at a given time and node
1139
1140 @param time: Time
1141 @type time: L{DateTime}
1142 @param node: Node id
1143 @type node: C{int}
1144 @return: Available capacity
1145 @rtype: L{ResourceTuple}
1146 """
1147 return self.changepoints[time].nodes[node].available
1148
1149
1151 """Determines the available capacity from a given time onwards.
1152
1153 This method returns an L{OngoingAvailability} object (see that class's
1154 documentation for more details)
1155
1156 @param time: Time
1157 @type time: L{DateTime}
1158 @param node: Node id
1159 @type node: C{int}
1160 @param preempted_leases: List of leases that can be preempted.
1161 @type preempted_leases: C{list} of L{Lease}s
1162 @return: Ongoing availability (see L{OngoingAvailability} documentation for more details)
1163 @rtype: L{OngoingAvailability}
1164 """
1165 return OngoingAvailability(self.changepoints[time].nodes[node], preempted_leases)
1166
1167
1169 """Get all the nodes at a given time.
1170
1171 @param time: Time
1172 @type time: L{DateTime}
1173 @return: Node ids
1174 @rtype: C{list} of C{int}
1175 """
1176 return self.changepoints[time].nodes.keys()
1177
1179 """Get leases scheduled on a node at a given time.
1180
1181 @param node: Node id
1182 @type node: C{int}
1183 @param time: Time
1184 @type time: L{DateTime}
1185 """
1186 return self.changepoints[time].nodes[node].leases
1187
1189 """Get all the leases scheduled in an interval.
1190
1191 This interval is semi-closed: It includes the start time but not the
1192 end time of the interval.
1193
1194 @param from_time: Start of interval
1195 @type from_time: L{DateTime}
1196 @param until_time: End of interval
1197 @type until_time: L{DateTime}
1198 @return: Leases
1199 @rtype: C{list} of L{Lease}s
1200 """
1201 leases = set()
1202 for cp in self.cp_list:
1203 if cp < from_time:
1204 continue
1205 if cp >= until_time:
1206 break
1207 leases.update(self.changepoints[cp].leases)
1208 return list(leases)
1209
1211 """Determine how much longer the capacity in a node will
1212 last, starting at a given time.
1213
1214 @param node: Node id
1215 @type node: C{int}
1216 @param time: Time
1217 @type time: L{DateTime}
1218 @return: Duration the capacity will last. If it will last indefinitely,
1219 None is returned.
1220 @rtype: L{DateTimeDelta}
1221 """
1222 next_cp = self.changepoints[time].nodes[node].next_cp
1223 if next_cp == None:
1224 return None
1225 else:
1226 return next_cp - time
1227
1230 """Information about ongoing availability in a node
1231
1232 An OngoingAvailability object contains information not just about
1233 the availability starting at a given time, but also how that availability
1234 diminishes over time. Thus, it the object to use when determining
1235 if, starting at a given time, it is possible to fit some capacity
1236 up to a certain time (with or without preempting other leases).
1237
1238 Typically, you will want to create an OngoingAvailability object using
1239 the get_ongoing_availability method in L{AvailabilityWindow}
1240 """
1241
1242 - def __init__(self, node, preempted_leases):
1243 """Constructor
1244
1245 @param node: Node and time from which to start determing availability, represented
1246 by a valid L{ChangepointNodeAvail} object from the L{AvailabilityWindow}.
1247 @type node: L{ChangepointNodeAvail}
1248 @param preempted_leases: List of leases that can be preempted.
1249 @type preempted_leases: C{list} of L{Lease}s
1250 """
1251 avails = []
1252 prev_avail = None
1253 prev_node = None
1254
1255
1256 while node != None:
1257 if len(preempted_leases) == 0:
1258 available = ResourceTuple.copy(node.available)
1259 else:
1260 available = node.get_avail_withpreemption(preempted_leases)
1261
1262 if prev_avail != None and available.any_less(prev_avail.available):
1263 available.min(prev_avail.available)
1264 availentry = AvailEntry(available, None)
1265 avails.append(availentry)
1266 prev_avail.until = prev_node.next_cp
1267 prev_avail = availentry
1268 elif prev_avail == None:
1269 availentry = AvailEntry(available, None)
1270 avails.append(availentry)
1271 prev_avail = availentry
1272
1273 prev_node = node
1274 node = node.next_nodeavail
1275
1276 self.avail_list = avails
1277
1278
1279 - def fits(self, capacity, until):
1280 """Determine if there is enough capacity until a given time.
1281
1282 @param capacity: Capacity
1283 @type capacity: L{ResourceTuple}
1284 @param until: Time
1285 @type until: L{DateTime}
1286 @return: True if the given capacity can fit until the given time. False otherwise.
1287 @rtype: C{bool}
1288 """
1289 for avail in self.avail_list:
1290 if avail.until == None or avail.until >= until:
1291 return capacity.fits_in(avail.available)
1292
1294 """Determine for how long we can fit a given capacity.
1295
1296 @param capacity: Capacity
1297 @type capacity: L{ResourceTuple}
1298 @return: The latest time at which the given capacity fits in the node.
1299 @rtype: L{DateTime}
1300 """
1301 prev = None
1302 for avail in self.avail_list:
1303 if not capacity.fits_in(avail.available):
1304 return prev
1305 else:
1306 prev = avail.until
1307
1308
1309
1310
1311
1312
1313 -class AvailEntry(object):
1314 - def __init__(self, available, until):
1315 self.available = available
1316 self.until = until
1317
1320 self.nodes = {}
1321 self.leases = set()
1322
1325
1328 self.capacity = capacity
1329 self.available = ResourceTuple.copy(capacity)
1330 self.leases = set()
1331 self.available_if_preempting = {}
1332 self.next_cp = None
1333 self.next_nodeavail = None
1334
1335 - def decr(self, capacity):
1336 self.available.decr(capacity)
1337
1339 if not lease in self.leases:
1340 self.leases.add(lease)
1341 self.available_if_preempting[lease] = ResourceTuple.copy(capacity)
1342 else:
1343 self.available_if_preempting[lease].incr(capacity)
1344
1346 avail = ResourceTuple.copy(self.capacity)
1347 for l in self.available_if_preempting:
1348 if not l in leases:
1349 avail.decr(self.available_if_preempting[l])
1350 return avail
1351