1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """This module provides the main classes for Haizea's VM Scheduler. All the
20 scheduling code that decides when and where a lease is scheduled is contained
21 in the VMScheduler class (except for the code that specifically decides
22 what physical machines each virtual machine is mapped to, which is factored out
23 into the "mapper" module). This module also provides the classes for the
24 reservations that will be placed in the slot table and correspond to VMs.
25 """
26
27 import haizea.common.constants as constants
28 from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_persistence
29 from haizea.core.leases import Lease, Capacity
30 from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
31 from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
32 from operator import attrgetter, itemgetter
33 from mx.DateTime import TimeDelta
34
35 import logging
36
37
39 """The Haizea VM Scheduler
40
41 This class is responsible for taking a lease and scheduling VMs to satisfy
42 the requirements of that lease.
43 """
44
45 - def __init__(self, slottable, resourcepool, mapper, max_in_future):
46 """Constructor
47
48 The constructor does little more than create the VM scheduler's
49 attributes. However, it does expect (in the arguments) a fully-constructed
50 SlotTable, ResourcePool, and Mapper (these are constructed in the
51 Manager's constructor).
52
53 Arguments:
54 slottable -- Slot table
55 resourcepool -- Resource pool where enactment commands will be sent to
56 mapper -- Mapper
57 """
58 self.slottable = slottable
59 self.resourcepool = resourcepool
60 self.mapper = mapper
61 self.logger = logging.getLogger("VMSCHED")
62
63
64
65 self.handlers = {}
66 self.handlers[VMResourceReservation] = ReservationEventHandler(
67 sched = self,
68 on_start = VMScheduler._handle_start_vm,
69 on_end = VMScheduler._handle_end_vm)
70
71 self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
72 sched = self,
73 on_start = VMScheduler._handle_start_shutdown,
74 on_end = VMScheduler._handle_end_shutdown)
75
76 self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
77 sched = self,
78 on_start = VMScheduler._handle_start_suspend,
79 on_end = VMScheduler._handle_end_suspend)
80
81 self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
82 sched = self,
83 on_start = VMScheduler._handle_start_resume,
84 on_end = VMScheduler._handle_end_resume)
85
86 self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
87 sched = self,
88 on_start = VMScheduler._handle_start_migrate,
89 on_end = VMScheduler._handle_end_migrate)
90
91 self.max_in_future = max_in_future
92
93 self.future_leases = set()
94
95
96 - def schedule(self, lease, nexttime, earliest):
97 """ The scheduling function
98
99 This particular function doesn't do much except call __schedule_asap
100 and __schedule_exact (which do all the work).
101
102 Arguments:
103 lease -- Lease to schedule
104 nexttime -- The next time at which the scheduler can allocate resources.
105 earliest -- The earliest possible starting times on each physical node
106 """
107 if lease.get_type() == Lease.BEST_EFFORT:
108 return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = self.can_schedule_in_future())
109 elif lease.get_type() == Lease.ADVANCE_RESERVATION:
110 return self.__schedule_exact(lease, nexttime, earliest)
111 elif lease.get_type() == Lease.IMMEDIATE:
112 return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = False)
113
114
116 """ Estimates the time required to migrate a lease's VMs
117
118 This function conservatively estimates that all the VMs are going to
119 be migrated to other nodes. Since all the transfers are intra-node,
120 the bottleneck is the transfer from whatever node has the most
121 memory to transfer.
122
123 Note that this method only estimates the time to migrate the memory
124 state files for the VMs. Migrating the software environment (which may
125 or may not be a disk image) is the responsibility of the preparation
126 scheduler, which has it's own set of migration scheduling methods.
127
128 Arguments:
129 lease -- Lease that might be migrated
130 """
131 migration = get_config().get("migration")
132 if migration == constants.MIGRATE_YES:
133 vmrr = lease.get_last_vmrr()
134 mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
135 for pnode in vmrr.nodes.values():
136 mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
137 mem_in_pnode[pnode] += mem
138 max_mem_to_transfer = max(mem_in_pnode.values())
139 bandwidth = self.resourcepool.info.get_migration_bandwidth()
140 return estimate_transfer_time(max_mem_to_transfer, bandwidth)
141 elif migration == constants.MIGRATE_YES_NOTRANSFER:
142 return TimeDelta(seconds=0)
143
145 """ Schedules migrations for a lease
146
147 Arguments:
148 lease -- Lease being migrated
149 vmrr -- The VM reservation before which the migration will take place
150 nexttime -- The next time at which the scheduler can allocate resources.
151 """
152
153
154
155 last_vmrr = lease.get_last_vmrr()
156 vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
157
158
159 mustmigrate = False
160 for vnode in vnode_migrations:
161 if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
162 mustmigrate = True
163 break
164
165 if not mustmigrate:
166 return []
167
168
169
170 if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
171 start = nexttime
172 end = nexttime
173 res = {}
174 migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
175 migr_rr.state = ResourceReservation.STATE_SCHEDULED
176 return [migr_rr]
177
178
179 migrations = []
180 while len(vnode_migrations) > 0:
181 pnodes = set()
182 migration = {}
183 for vnode in vnode_migrations:
184 origin = vnode_migrations[vnode][0]
185 dest = vnode_migrations[vnode][1]
186 if not origin in pnodes and not dest in pnodes:
187 migration[vnode] = vnode_migrations[vnode]
188 pnodes.add(origin)
189 pnodes.add(dest)
190 for vnode in migration:
191 del vnode_migrations[vnode]
192 migrations.append(migration)
193
194
195 start = max(last_vmrr.post_rrs[-1].end, nexttime)
196 bandwidth = self.resourcepool.info.get_migration_bandwidth()
197 migr_rrs = []
198 for m in migrations:
199 vnodes_to_migrate = m.keys()
200 max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
201 migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
202 end = start + migr_time
203 res = {}
204 for (origin,dest) in m.values():
205 resorigin = Capacity([constants.RES_NETOUT])
206 resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
207 resdest = Capacity([constants.RES_NETIN])
208 resdest.set_quantity(constants.RES_NETIN, bandwidth)
209 res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
210 res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)
211 migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
212 migr_rr.state = ResourceReservation.STATE_SCHEDULED
213 migr_rrs.append(migr_rr)
214 start = end
215
216 return migr_rrs
217
242
243
245 """ Determines if it is possible to suspend a lease before a given time
246
247 Arguments:
248 vmrr -- VM RR to be preempted
249 t -- Time by which the VM must be preempted
250 """
251
252
253
254 vmrr = lease.get_last_vmrr()
255 time_until_suspend = t - vmrr.start
256 min_duration = self.__compute_scheduling_threshold(lease)
257 can_suspend = time_until_suspend >= min_duration
258 return can_suspend
259
260
262 """ Preempts a VM reservation at a given time
263
264 This method assumes that the lease is, in fact, preemptable,
265 that the VMs are running at the given time, and that there is
266 enough time to suspend the VMs before the given time (all these
267 checks are done in the lease scheduler).
268
269 Arguments:
270 vmrr -- VM RR to be preempted
271 t -- Time by which the VM must be preempted
272 """
273
274
275 old_start = vmrr.start
276 old_end = vmrr.end
277
278
279 self.__schedule_suspension(vmrr, t)
280
281
282 self.slottable.update_reservation(vmrr, old_start, old_end)
283
284
285 for susprr in vmrr.post_rrs:
286 self.slottable.add_reservation(susprr)
287
288
290 """ Returns a list of future leases that are reschedulable.
291
292 Currently, this list is just the best-effort leases scheduled
293 in the future as determined by the backfilling algorithm.
294 Advance reservation leases, by their nature, cannot be
295 rescheduled to find a "better" starting time.
296 """
297 return list(self.future_leases)
298
299
301 """ Returns True if the backfilling algorithm would allow a lease
302 to be scheduled in the future.
303
304 """
305 if self.max_in_future == -1:
306 return True
307 else:
308 return len(self.future_leases) < self.max_in_future
309
310
339
340
342 """ Schedules VMs that must start at an exact time
343
344 This type of lease is "easy" to schedule because we know the exact
345 start time, which means that's the only starting time we have to
346 check. So, this method does little more than call the mapper.
347
348 Arguments:
349 lease -- Lease to schedule
350 nexttime -- The next time at which the scheduler can allocate resources.
351 earliest -- The earliest possible starting times on each physical node
352 """
353
354
355 start = lease.start.requested
356 end = start + lease.duration.requested
357
358
359
360 requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
361
362
363 mapping, actualend, preemptions = self.mapper.map(lease,
364 requested_resources,
365 start,
366 end,
367 strictend = True)
368
369
370 if mapping == None:
371 raise NotSchedulableException, "Not enough resources in specified interval"
372
373
374 res = {}
375
376 for (vnode,pnode) in mapping.items():
377 vnode_res = requested_resources[vnode]
378 if res.has_key(pnode):
379 res[pnode].incr(vnode_res)
380 else:
381 res[pnode] = ResourceTuple.copy(vnode_res)
382
383 vmrr = VMResourceReservation(lease, start, end, mapping, res)
384 vmrr.state = ResourceReservation.STATE_SCHEDULED
385
386
387 self.__schedule_shutdown(vmrr)
388
389 return vmrr, preemptions
390
391
392 - def __schedule_asap(self, lease, nexttime, earliest, allow_in_future = None):
393 """ Schedules VMs as soon as possible
394
395 This method is a bit more complex that __schedule_exact because
396 we need to figure out what "as soon as possible" actually is.
397 This involves attempting several mappings, at different points
398 in time, before we can schedule the lease.
399
400 This method will always check, at least, if the lease can be scheduled
401 at the earliest possible moment at which the lease could be prepared
402 (e.g., if the lease can't start until 1 hour in the future because that's
403 the earliest possible time at which the disk images it requires can
404 be transferred, then that's when the scheduler will check). Note, however,
405 that this "earliest possible moment" is determined by the preparation
406 scheduler.
407
408 Additionally, if the lease can't be scheduled at the earliest
409 possible moment, it can also check if the lease can be scheduled
410 in the future. This partially implements a backfilling algorithm
411 (the maximum number of future leases is stored in the max_in_future
412 attribute of VMScheduler), the other part being implemented in the
413 __process_queue method of LeaseScheduler.
414
415 Note that, if the method is allowed to scheduled in the future,
416 and assuming that the lease doesn't request more resources than
417 the site itself, this method will always schedule the VMs succesfully
418 (since there's always an empty spot somewhere in the future).
419
420
421 Arguments:
422 lease -- Lease to schedule
423 nexttime -- The next time at which the scheduler can allocate resources.
424 earliest -- The earliest possible starting times on each physical node
425 allow_in_future -- Boolean indicating whether the scheduler is
426 allowed to schedule the VMs in the future.
427 """
428
429
430
431
432
433
434
435 lease_id = lease.id
436 remaining_duration = lease.duration.get_remaining_duration()
437 shutdown_time = self.__estimate_shutdown_time(lease)
438
439
440
441
442 mustresume = (lease.get_state() in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED, Lease.STATE_SUSPENDED_SCHEDULED))
443
444
445
446 min_duration = self.__compute_scheduling_threshold(lease)
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464 if not mustresume:
465
466
467 cps = [(node, e.time) for node, e in earliest.items()]
468 cps.sort(key=itemgetter(1))
469 curcp = None
470 changepoints = []
471 nodes = []
472 for node, time in cps:
473 nodes.append(node)
474 if time != curcp:
475 changepoints.append([time, set(nodes)])
476 curcp = time
477 else:
478 changepoints[-1][1] = set(nodes)
479 else:
480
481
482
483 if get_config().get("migration") == constants.MIGRATE_NO:
484 vmrr = lease.get_last_vmrr()
485 onlynodes = set(vmrr.nodes.values())
486 else:
487 onlynodes = None
488 changepoints = list(set([x.time for x in earliest.values()]))
489 changepoints.sort()
490 changepoints = [(x, onlynodes) for x in changepoints]
491
492
493
494
495 if allow_in_future:
496 res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
497
498
499 futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
500
501
502 futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
503 if not mustresume:
504 futurecp = [(p,None) for p in futurecp]
505 else:
506 futurecp = [(p,onlynodes) for p in futurecp]
507 else:
508 futurecp = []
509
510
511
512
513
514
515
516
517
518
519
520
521 if mustresume:
522 duration = remaining_duration + self.__estimate_resume_time(lease)
523 else:
524 duration = remaining_duration
525
526 duration += shutdown_time
527
528 in_future = False
529
530
531
532 requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
533
534
535 start, end, mapping, preemptions = self.__find_fit_at_points(lease,
536 requested_resources,
537 changepoints,
538 duration,
539 min_duration)
540
541 if start == None and not allow_in_future:
542
543
544 raise NotSchedulableException, "Could not find enough resources for this request"
545
546
547
548 if start == None and allow_in_future:
549 start, end, mapping, preemptions = self.__find_fit_at_points(lease,
550 requested_resources,
551 futurecp,
552 duration,
553 min_duration
554 )
555
556
557
558 if start == None:
559 raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
560
561 in_future = True
562
563
564
565
566
567
568
569
570
571
572 res = {}
573
574 for (vnode,pnode) in mapping.items():
575 vnode_res = requested_resources[vnode]
576 if res.has_key(pnode):
577 res[pnode].incr(vnode_res)
578 else:
579 res[pnode] = ResourceTuple.copy(vnode_res)
580
581 vmrr = VMResourceReservation(lease, start, end, mapping, res)
582 vmrr.state = ResourceReservation.STATE_SCHEDULED
583
584
585 if mustresume:
586 self.__schedule_resumption(vmrr, start)
587
588
589
590 mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
591 if mustsuspend:
592 self.__schedule_suspension(vmrr, end)
593 else:
594
595 if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
596 vmrr.end = vmrr.start + remaining_duration + shutdown_time
597 self.__schedule_shutdown(vmrr)
598
599 if in_future:
600 self.future_leases.add(lease)
601 get_persistence().persist_future_leases(self.future_leases)
602
603 susp_str = res_str = ""
604 if mustresume:
605 res_str = " (resuming)"
606 if mustsuspend:
607 susp_str = " (suspending)"
608 self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
609
610 return vmrr, preemptions
611
612
614 """ Tries to map a lease in a given list of points in time
615
616 This method goes through a given list of points in time and tries
617 to find the earliest time at which that lease can be allocated
618 resources.
619
620 Arguments:
621 lease -- Lease to schedule
622 requested_resources -- A dictionary of lease node -> ResourceTuple.
623 changepoints -- The list of changepoints
624 duration -- The amount of time requested
625 min_duration -- The minimum amount of time that should be allocated
626
627 Returns:
628 start -- The time at which resources have been found for the lease
629 actualend -- The time at which the resources won't be available. Note
630 that this is not necessarily (start + duration) since the mapper
631 might be unable to find enough resources for the full requested duration.
632 mapping -- A mapping of lease nodes to physical nodes
633 preemptions -- A list of
634 (if no mapping is found, all these values are set to None)
635 """
636 found = False
637
638 for time, onlynodes in changepoints:
639 start = time
640 end = start + duration
641 self.logger.debug("Attempting to map from %s to %s" % (start, end))
642
643
644
645 susptype = get_config().get("suspension")
646 if susptype == constants.SUSPENSION_NONE or (lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL):
647 strictend = True
648 else:
649 strictend = False
650
651
652 mapping, actualend, preemptions = self.mapper.map(lease,
653 requested_resources,
654 start,
655 end,
656 strictend = strictend,
657 onlynodes = onlynodes)
658
659
660
661 if mapping != None:
662 if actualend < end:
663 actualduration = actualend - start
664 if actualduration >= min_duration:
665 self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
666 found = True
667 break
668 else:
669 self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
670 else:
671 self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
672 found = True
673 break
674
675 if found:
676 return start, actualend, mapping, preemptions
677 else:
678 return None, None, None, None
679
680
682 """ Computes the times at which suspend/resume operations would have to start
683
684 When suspending or resuming a VM, the VM's memory is dumped to a
685 file on disk. To correctly estimate the time required to suspend
686 a lease with multiple VMs, Haizea makes sure that no two
687 suspensions/resumptions happen at the same time (e.g., if eight
688 memory files were being saved at the same time to disk, the disk's
689 performance would be reduced in a way that is not as easy to estimate
690 as if only one file were being saved at a time). Based on a number
691 of parameters, this method estimates the times at which the
692 suspend/resume commands would have to be sent to guarantee this
693 exclusion.
694
695 Arguments:
696 vmrr -- The VM reservation that will be suspended/resumed
697 time -- The time at which the suspend should end or the resume should start.
698 direction -- DIRECTION_BACKWARD: start at "time" and compute the times going
699 backward (for suspensions) DIRECTION_FORWARD: start at time "time" and compute
700 the times going forward.
701 exclusion -- SUSPRES_EXCLUSION_GLOBAL (memory is saved to global filesystem)
702 or SUSPRES_EXCLUSION_LOCAL (saved to local filesystem)
703 rate -- The rate at which an individual VM is suspended/resumed
704 override -- If specified, then instead of computing the time to
705 suspend/resume VM based on its memory and the "rate" parameter,
706 use this override value.
707
708 """
709 times = []
710 enactment_overhead = get_config().get("enactment-overhead")
711
712 if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
713
714
715
716
717 t = time
718 t_prev = None
719
720 for (vnode,pnode) in vmrr.nodes.items():
721 if override == None:
722 mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
723 op_time = self.__compute_suspend_resume_time(mem, rate)
724 else:
725 op_time = override
726
727 op_time += enactment_overhead
728
729 t_prev = t
730
731 if direction == constants.DIRECTION_FORWARD:
732 t += op_time
733 times.append((t_prev, t, {pnode:[vnode]}))
734 elif direction == constants.DIRECTION_BACKWARD:
735 t -= op_time
736 times.append((t, t_prev, {pnode:[vnode]}))
737
738 elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
739
740
741
742 pervnode_times = []
743 vnodes_in_pnode = {}
744 for (vnode,pnode) in vmrr.nodes.items():
745 vnodes_in_pnode.setdefault(pnode, []).append(vnode)
746 for pnode in vnodes_in_pnode:
747 t = time
748 t_prev = None
749 for vnode in vnodes_in_pnode[pnode]:
750 if override == None:
751 mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
752 op_time = self.__compute_suspend_resume_time(mem, rate)
753 else:
754 op_time = override
755
756 t_prev = t
757
758 if direction == constants.DIRECTION_FORWARD:
759 t += op_time
760 pervnode_times.append((t_prev, t, vnode))
761 elif direction == constants.DIRECTION_BACKWARD:
762 t -= op_time
763 pervnode_times.append((t, t_prev, vnode))
764
765
766 uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
767 for (start, end) in uniq_times:
768 vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
769 node_mappings = {}
770 for vnode in vnodes:
771 pnode = vmrr.nodes[vnode]
772 node_mappings.setdefault(pnode, []).append(vnode)
773 times.append([start,end,node_mappings])
774
775
776 for t in times:
777 num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
778 overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
779 if direction == constants.DIRECTION_FORWARD:
780 t[1] += overhead
781 elif direction == constants.DIRECTION_BACKWARD:
782 t[0] -= overhead
783
784
785 if direction == constants.DIRECTION_FORWARD:
786 times.sort(key=itemgetter(0))
787 elif direction == constants.DIRECTION_BACKWARD:
788 times.sort(key=itemgetter(1))
789 times.reverse()
790
791 prev_start = None
792 prev_end = None
793 for t in times:
794 if prev_start != None:
795 start = t[0]
796 end = t[1]
797 if direction == constants.DIRECTION_FORWARD:
798 if start < prev_end:
799 diff = prev_end - start
800 t[0] += diff
801 t[1] += diff
802 elif direction == constants.DIRECTION_BACKWARD:
803 if end > prev_start:
804 diff = end - prev_start
805 t[0] -= diff
806 t[1] -= diff
807 prev_start = t[0]
808 prev_end = t[1]
809
810 return times
811
812
836
837
839 """ Schedules the suspension of a VM reservation
840
841 Most of the work is done in __compute_susprem_times. See that
842 method's documentation for more details.
843
844 Arguments:
845 vmrr -- The VM reservation that will be suspended
846 suspend_by -- The time by which the VMs should be suspended.
847
848 """
849 config = get_config()
850 susp_exclusion = config.get("suspendresume-exclusion")
851 override = get_config().get("override-suspend-time")
852 rate = config.get("suspend-rate")
853
854 if suspend_by < vmrr.start or suspend_by > vmrr.end:
855 raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
856
857
858 times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
859
860
861 suspend_rrs = []
862 for (start, end, node_mappings) in times:
863 suspres = {}
864 all_vnodes = []
865 for (pnode,vnodes) in node_mappings.items():
866 num_vnodes = len(vnodes)
867 r = Capacity([constants.RES_MEM,constants.RES_DISK])
868 mem = 0
869 for vnode in vnodes:
870 mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
871 r.set_quantity(constants.RES_MEM, mem * num_vnodes)
872 r.set_quantity(constants.RES_DISK, mem * num_vnodes)
873 suspres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
874 all_vnodes += vnodes
875
876 susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
877 susprr.state = ResourceReservation.STATE_SCHEDULED
878 suspend_rrs.append(susprr)
879
880 suspend_rrs.sort(key=attrgetter("start"))
881
882 susp_start = suspend_rrs[0].start
883 if susp_start < vmrr.start:
884 raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
885
886 vmrr.update_end(susp_start)
887
888
889 for rr in vmrr.post_rrs:
890 self.slottable.remove_reservation(rr)
891 vmrr.post_rrs = []
892
893
894 for susprr in suspend_rrs:
895 vmrr.post_rrs.append(susprr)
896
897
899 """ Schedules the resumption of a VM reservation
900
901 Most of the work is done in __compute_susprem_times. See that
902 method's documentation for more details.
903
904 Arguments:
905 vmrr -- The VM reservation that will be resumed
906 resume_at -- The time at which the resumption should start
907
908 """
909 config = get_config()
910 resm_exclusion = config.get("suspendresume-exclusion")
911 override = get_config().get("override-resume-time")
912 rate = config.get("resume-rate")
913
914 if resume_at < vmrr.start or resume_at > vmrr.end:
915 raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
916
917
918 times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
919
920
921 resume_rrs = []
922 for (start, end, node_mappings) in times:
923 resmres = {}
924 all_vnodes = []
925 for (pnode,vnodes) in node_mappings.items():
926 num_vnodes = len(vnodes)
927 r = Capacity([constants.RES_MEM,constants.RES_DISK])
928 mem = 0
929 for vnode in vnodes:
930 mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
931 r.set_quantity(constants.RES_MEM, mem * num_vnodes)
932 r.set_quantity(constants.RES_DISK, mem * num_vnodes)
933 resmres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
934 all_vnodes += vnodes
935 resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
936 resmrr.state = ResourceReservation.STATE_SCHEDULED
937 resume_rrs.append(resmrr)
938
939 resume_rrs.sort(key=attrgetter("start"))
940
941 resm_end = resume_rrs[-1].end
942 if resm_end > vmrr.end:
943 raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
944
945 vmrr.update_start(resm_end)
946
947
948 for resmrr in resume_rrs:
949 vmrr.pre_rrs.append(resmrr)
950
951
953 """ Compute the time to suspend/resume a single VM
954
955 Arguments:
956 mem -- Amount of memory used by the VM
957 rate -- The rate at which an individual VM is suspended/resumed
958
959 """
960 time = float(mem) / rate
961 time = round_datetime_delta(TimeDelta(seconds = time))
962 return time
963
964
966 """ Estimate the time to suspend an entire lease
967
968 Most of the work is done in __estimate_suspend_resume_time. See
969 that method's documentation for more details.
970
971 Arguments:
972 lease -- Lease that is going to be suspended
973
974 """
975 rate = get_config().get("suspend-rate")
976 override = get_config().get("override-suspend-time")
977 if override != None:
978 return override
979 else:
980 return self.__estimate_suspend_resume_time(lease, rate)
981
982
984 """ Estimate the time to resume an entire lease
985
986 Most of the work is done in __estimate_suspend_resume_time. See
987 that method's documentation for more details.
988
989 Arguments:
990 lease -- Lease that is going to be resumed
991
992 """
993 rate = get_config().get("resume-rate")
994 override = get_config().get("override-resume-time")
995 if override != None:
996 return override
997 else:
998 return self.__estimate_suspend_resume_time(lease, rate)
999
1000
1002 """ Estimate the time to suspend/resume an entire lease
1003
1004 Note that, unlike __compute_suspend_resume_time, this estimates
1005 the time to suspend/resume an entire lease (which may involve
1006 suspending several VMs)
1007
1008 Arguments:
1009 lease -- Lease that is going to be suspended/resumed
1010 rate -- The rate at which an individual VM is suspended/resumed
1011
1012 """
1013 susp_exclusion = get_config().get("suspendresume-exclusion")
1014 enactment_overhead = get_config().get("enactment-overhead")
1015 mem = 0
1016 for vnode in lease.requested_resources:
1017 mem += lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1018 if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
1019 return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1020 elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
1021
1022 return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1023
1024
1026 """ Estimate the time to shutdown an entire lease
1027
1028 Arguments:
1029 lease -- Lease that is going to be shutdown
1030
1031 """
1032 enactment_overhead = get_config().get("enactment-overhead").seconds
1033 return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
1034
1035
1037 """ Compute the scheduling threshold (the 'minimum duration') of a lease
1038
1039 To avoid thrashing, Haizea will not schedule a lease unless all overheads
1040 can be correctly scheduled (which includes image transfers, suspensions, etc.).
1041 However, this can still result in situations where a lease is prepared,
1042 and then immediately suspended because of a blocking lease in the future.
1043 The scheduling threshold is used to specify that a lease must
1044 not be scheduled unless it is guaranteed to run for a minimum amount of
1045 time (the rationale behind this is that you ideally don't want leases
1046 to be scheduled if they're not going to be active for at least as much time
1047 as was spent in overheads).
1048
1049 An important part of computing this value is the "scheduling threshold factor".
1050 The default value is 1, meaning that the lease will be active for at least
1051 as much time T as was spent on overheads (e.g., if preparing the lease requires
1052 60 seconds, and we know that it will have to be suspended, requiring 30 seconds,
1053 Haizea won't schedule the lease unless it can run for at least 90 minutes).
1054 In other words, a scheduling factor of F required a minimum duration of
1055 F*T. A value of 0 could lead to thrashing, since Haizea could end up with
1056 situations where a lease starts and immediately gets suspended.
1057
1058 Arguments:
1059 lease -- Lease for which we want to find the scheduling threshold
1060 """
1061
1062 config = get_config()
1063 threshold = config.get("force-scheduling-threshold")
1064 if threshold != None:
1065
1066 return threshold
1067 else:
1068 factor = config.get("scheduling-threshold-factor")
1069
1070
1071
1072
1073 susp_overhead = self.__estimate_suspend_time(lease)
1074 safe_duration = susp_overhead
1075
1076 if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED:
1077 resm_overhead = self.__estimate_resume_time(lease)
1078 safe_duration += resm_overhead
1079
1080
1081 min_duration = safe_duration
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092 threshold = safe_duration + (min_duration * factor)
1093 return threshold
1094
1095
1096
1097
1098
1099
1100
1101
1145
1146
1148 """ Handles the end of a VMResourceReservation
1149
1150 Arguments:
1151 l -- Lease the VMResourceReservation belongs to
1152 rr -- THe VMResourceReservation
1153 """
1154 self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
1155 self.logger.vdebug("LEASE-%i Before:" % l.id)
1156 l.print_contents()
1157 now_time = round_datetime(get_clock().get_time())
1158 diff = now_time - rr.start
1159 l.duration.accumulate_duration(diff)
1160 rr.state = ResourceReservation.STATE_DONE
1161
1162 self.logger.vdebug("LEASE-%i After:" % l.id)
1163 l.print_contents()
1164 self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
1165 self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1166
1167
1169 """ Handles the unexpected end of a VMResourceReservation
1170
1171 Arguments:
1172 l -- Lease the VMResourceReservation belongs to
1173 rr -- THe VMResourceReservation
1174 """
1175
1176 self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
1177 for rr in vmrr.post_rrs:
1178 self.slottable.remove_reservation(rr)
1179 vmrr.post_rrs = []
1180 vmrr.end = get_clock().get_time()
1181 self._handle_end_vm(l, vmrr)
1182
1183
1185 """ Handles the start of a SuspensionResourceReservation
1186
1187 Arguments:
1188 l -- Lease the SuspensionResourceReservation belongs to
1189 rr -- The SuspensionResourceReservation
1190
1191 """
1192 self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
1193 l.print_contents()
1194 rr.state = ResourceReservation.STATE_ACTIVE
1195
1196 try:
1197 self.resourcepool.suspend_vms(l, rr)
1198 except EnactmentError, exc:
1199 self.logger.error("Enactment error when suspending VMs.")
1200
1201
1202
1203
1204 raise
1205
1206 if rr.is_first():
1207 l.set_state(Lease.STATE_SUSPENDING)
1208 l.print_contents()
1209 self.logger.info("Suspending lease %i..." % (l.id))
1210 self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
1211
1212
1233
1234
1236 """ Handles the start of a ResumptionResourceReservation
1237
1238 Arguments:
1239 l -- Lease the ResumptionResourceReservation belongs to
1240 rr -- The ResumptionResourceReservation
1241
1242 """
1243 self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
1244 l.print_contents()
1245
1246 try:
1247 self.resourcepool.resume_vms(l, rr)
1248 except EnactmentError, exc:
1249 self.logger.error("Enactment error when resuming VMs.")
1250
1251
1252
1253
1254 raise
1255
1256 rr.state = ResourceReservation.STATE_ACTIVE
1257 if rr.is_first():
1258 l.set_state(Lease.STATE_RESUMING)
1259 l.print_contents()
1260 self.logger.info("Resuming lease %i..." % (l.id))
1261 self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
1262
1263
1284
1285
1287 """ Handles the start of a ShutdownResourceReservation
1288
1289 Arguments:
1290 l -- Lease the SuspensionResourceReservation belongs to
1291 rr -- The SuspensionResourceReservation
1292 """
1293
1294 self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
1295 l.print_contents()
1296 rr.state = ResourceReservation.STATE_ACTIVE
1297 try:
1298 self.resourcepool.stop_vms(l, rr)
1299 except EnactmentError, exc:
1300 self.logger.error("Enactment error when shutting down VMs.")
1301
1302
1303
1304
1305 raise
1306
1307 l.print_contents()
1308 self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
1309
1310
1312 """ Handles the end of a SuspensionResourceReservation
1313
1314 Arguments:
1315 l -- Lease the SuspensionResourceReservation belongs to
1316 rr -- The SuspensionResourceReservation
1317
1318 """
1319 self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
1320 l.print_contents()
1321 rr.state = ResourceReservation.STATE_DONE
1322 l.print_contents()
1323 self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
1324 self.logger.info("Lease %i's VMs have shutdown." % (l.id))
1325 raise NormalEndLeaseException
1326
1327
1329 """ Handles the start of a MemImageMigrationResourceReservation
1330
1331 Arguments:
1332 l -- Lease the MemImageMigrationResourceReservation belongs to
1333 rr -- The MemImageMigrationResourceReservation
1334
1335 """
1336 self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
1337 l.print_contents()
1338 rr.state = ResourceReservation.STATE_ACTIVE
1339 l.print_contents()
1340 self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
1341 self.logger.info("Migrating lease %i..." % (l.id))
1342
1343
1345 """ Handles the end of a MemImageMigrationResourceReservation
1346
1347 Arguments:
1348 l -- Lease the MemImageMigrationResourceReservation belongs to
1349 rr -- The MemImageMigrationResourceReservation
1350
1351 """
1352 self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
1353 l.print_contents()
1354
1355 for vnode in rr.transfers:
1356 origin = rr.transfers[vnode][0]
1357 dest = rr.transfers[vnode][1]
1358
1359
1360 self.resourcepool.remove_ramfile(origin, l.id, vnode)
1361 self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
1362
1363 rr.state = ResourceReservation.STATE_DONE
1364 l.print_contents()
1365 self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
1366 self.logger.info("Migrated lease %i..." % (l.id))
1367
1368
1369
1371 - def __init__(self, lease, start, end, nodes, res):
1379
1384
1389
1390
1392 if self.lease.duration.known != None:
1393 remdur = self.lease.duration.get_remaining_known_duration()
1394 rrdur = self.end - self.start
1395 if remdur < rrdur:
1396 self.prematureend = self.start + remdur
1397
1398
1399
1400
1401 if self.prematureend == self.start:
1402 self.prematureend += 1
1403 else:
1404 self.prematureend = None
1405 else:
1406 self.prematureend = None
1407
1409 if len(self.post_rrs) == 0:
1410 return self.end
1411 else:
1412 return self.post_rrs[-1].end
1413
1416
1419
1421 logger = logging.getLogger("LEASES")
1422 for resmrr in self.pre_rrs:
1423 resmrr.print_contents(loglevel)
1424 logger.log(loglevel, "--")
1425 logger.log(loglevel, "Type : VM")
1426 logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes))
1427 if self.prematureend != None:
1428 logger.log(loglevel, "Premature end : %s" % self.prematureend)
1429 ResourceReservation.print_contents(self, loglevel)
1430 for susprr in self.post_rrs:
1431 logger.log(loglevel, "--")
1432 susprr.print_contents(loglevel)
1433
1434
1436 - def __init__(self, lease, start, end, res, vnodes, vmrr):
1440
1442 logger = logging.getLogger("LEASES")
1443 logger.log(loglevel, "Type : SUSPEND")
1444 logger.log(loglevel, "Vnodes : %s" % self.vnodes)
1445 ResourceReservation.print_contents(self, loglevel)
1446
1448 return (self == self.vmrr.post_rrs[0])
1449
1451 return (self == self.vmrr.post_rrs[-1])
1452
1453
1455 - def __init__(self, lease, start, end, res, vnodes, vmrr):
1459
1461 logger = logging.getLogger("LEASES")
1462 logger.log(loglevel, "Type : RESUME")
1463 logger.log(loglevel, "Vnodes : %s" % self.vnodes)
1464 ResourceReservation.print_contents(self, loglevel)
1465
1469
1473
1474
1485
1486
1488 - def __init__(self, lease, start, end, res, vmrr, transfers):
1490
1492 logger = logging.getLogger("LEASES")
1493 logger.log(loglevel, "Type : MEM IMAGE MIGRATION")
1494 logger.log(loglevel, "Transfers : %s" % self.transfers)
1495 ResourceReservation.print_contents(self, loglevel)
1496