1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """This module provides the main classes for Haizea's lease scheduler, particularly
21 the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e.,
22 the code that decides what physical hosts a VM should be mapped to), which is
23 located in the vm_scheduler module. Lease preparation code (e.g., image transfer
24 scheduling) is located in the preparation_schedulers package. In fact, the
25 main purpose of the lease schedule is to orchestrate these preparation and VM
26 schedulers.
27
28 This module also includes a Queue class and a LeaseTable class, which are used
29 by the lease scheduler.
30 """
31
32 import haizea.common.constants as constants
33 from haizea.common.utils import round_datetime, get_config, get_clock, get_policy, get_persistence
34 from haizea.core.leases import Lease
35 from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
36 from haizea.core.scheduler.slottable import ResourceReservation
37 from operator import attrgetter
38
39 import logging
40
42 """The Haizea Lease Scheduler
43
44 This is the main scheduling class in Haizea. It handles lease scheduling which,
45 in turn, involves VM scheduling, preparation scheduling (such as transferring
46 a VM image), and numerous bookkeeping operations. All these operations are
47 handled by other classes, so this class acts mostly as an orchestrator that
48 coordinates all the different operations involved in scheduling a lease.
49 """
50
51 - def __init__(self, vm_scheduler, preparation_scheduler, slottable, accounting):
52 """Constructor
53
54 The constructor does little more than create the lease scheduler's
55 attributes. However, it does expect (in the arguments) a fully-constructed
56 VMScheduler, PreparationScheduler, SlotTable, and PolicyManager (these are
57 constructed in the Manager's constructor).
58
59 Arguments:
60 vm_scheduler -- VM scheduler
61 preparation_scheduler -- Preparation scheduler
62 slottable -- Slottable
63 accounting -- AccountingDataCollection object
64 """
65
66
67 self.logger = logging.getLogger("LSCHED")
68
69
70 self.vm_scheduler = vm_scheduler
71 """
72 VM Scheduler
73 @type: VMScheduler
74 """
75 self.preparation_scheduler = preparation_scheduler
76 self.slottable = slottable
77 self.accounting = accounting
78
79
80 self.queue = Queue()
81 self.leases = LeaseTable()
82 self.completed_leases = LeaseTable()
83
84
85
86
87
88
89
90
91
92 self.handlers = {}
93 for (type, handler) in self.vm_scheduler.handlers.items():
94 self.handlers[type] = handler
95
96 for (type, handler) in self.preparation_scheduler.handlers.items():
97 self.handlers[type] = handler
98
99
101 """Requests a leases. This is the entry point of leases into the scheduler.
102
103 Request a lease. The decision on whether to accept or reject a
104 lease is deferred to the policy manager (through its admission
105 control policy).
106
107 If the policy determines the lease can be
108 accepted, it is marked as "Pending". This still doesn't
109 guarantee that the lease will be scheduled (e.g., an AR lease
110 could still be rejected if the scheduler determines there are no
111 resources for it; but that is a *scheduling* decision, not a admission
112 control policy decision). The ultimate fate of the lease is determined
113 the next time the scheduling function is called.
114
115 If the policy determines the lease cannot be accepted, it is marked
116 as rejected.
117
118 Arguments:
119 lease -- Lease object. Its state must be STATE_NEW.
120 """
121 self.logger.info("Lease #%i has been requested." % lease.id)
122 if lease.submit_time == None:
123 lease.submit_time = round_datetime(get_clock().get_time())
124 lease.print_contents()
125 lease.set_state(Lease.STATE_PENDING)
126 if get_policy().accept_lease(lease):
127 self.logger.info("Lease #%i has been marked as pending." % lease.id)
128 self.leases.add(lease)
129 else:
130 self.logger.info("Lease #%i has not been accepted" % lease.id)
131 lease.set_state(Lease.STATE_REJECTED)
132 self.completed_leases.add(lease)
133
134 self.accounting.at_lease_request(lease)
135 get_persistence().persist_lease(lease)
136
138 """ The main scheduling function
139
140 The scheduling function looks at all pending requests and schedules them.
141 Note that most of the actual scheduling code is contained in the
142 __schedule_lease method and in the VMScheduler and PreparationScheduler classes.
143
144 Arguments:
145 nexttime -- The next time at which the scheduler can allocate resources.
146 """
147
148
149 pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)
150 ar_leases = [req for req in pending_leases if req.get_type() == Lease.ADVANCE_RESERVATION]
151 im_leases = [req for req in pending_leases if req.get_type() == Lease.IMMEDIATE]
152 be_leases = [req for req in pending_leases if req.get_type() == Lease.BEST_EFFORT]
153
154
155 for lease in be_leases:
156 self.__enqueue(lease)
157 lease.set_state(Lease.STATE_QUEUED)
158 self.logger.info("Queued best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
159 get_persistence().persist_lease(lease)
160
161
162 for lease in im_leases:
163 self.logger.info("Scheduling immediate lease #%i (%i nodes)" % (lease.id, lease.numnodes))
164 lease.print_contents()
165
166 try:
167 self.__schedule_lease(lease, nexttime=nexttime)
168 self.logger.info("Immediate lease #%i has been scheduled." % lease.id)
169 lease.print_contents()
170 except NotSchedulableException, exc:
171 self.logger.info("Immediate lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
172 lease.set_state(Lease.STATE_REJECTED)
173 self.completed_leases.add(lease)
174 self.accounting.at_lease_done(lease)
175 self.leases.remove(lease)
176 get_persistence().persist_lease(lease)
177
178
179 for lease in ar_leases:
180 self.logger.info("Scheduling AR lease #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
181 lease.print_contents()
182
183 try:
184 self.__schedule_lease(lease, nexttime)
185 self.logger.info("AR lease #%i has been scheduled." % lease.id)
186 lease.print_contents()
187 except NotSchedulableException, exc:
188 self.logger.info("AR lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
189 lease.set_state(Lease.STATE_REJECTED)
190 self.completed_leases.add(lease)
191 self.accounting.at_lease_done(lease)
192 self.leases.remove(lease)
193 get_persistence().persist_lease(lease)
194
195
196 self.__process_queue(nexttime)
197 get_persistence().persist_queue(self.queue)
198
237
297
299 """Gets a lease with the given ID
300
301 This method is useful for UIs (like the CLI) that operate on the lease ID.
302 If no lease with a given ID is found, None is returned.
303
304 Arguments:
305 lease_id -- The ID of the lease
306 """
307 if not self.leases.has_lease(lease_id):
308 return None
309 else:
310 return self.leases.get_lease(lease_id)
311
372
373
397
398
425
426
467
468
470 """Return True is the queue is empty, False otherwise"""
471 return self.queue.is_empty()
472
473
475 """Return True if there are any leases scheduled in the future"""
476 return not self.slottable.is_empty()
477
478
480 """ Traverses the queue in search of leases that can be scheduled.
481
482 This method processes the queue in order, but takes into account that
483 it may be possible to schedule leases in the future (using a
484 backfilling algorithm)
485
486 Arguments:
487 nexttime -- The next time at which the scheduler can allocate resources.
488 """
489
490 done = False
491 newqueue = Queue()
492 while not done and not self.is_queue_empty():
493 if not self.vm_scheduler.can_schedule_in_future() and self.slottable.is_full(nexttime, restype = constants.RES_CPU):
494 self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.")
495 done = True
496 else:
497 lease = self.queue.dequeue()
498 try:
499 self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
500 lease.print_contents()
501 self.__schedule_lease(lease, nexttime)
502 except NotSchedulableException, msg:
503
504 newqueue.enqueue(lease)
505 self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
506 if get_config().get("backfilling") == constants.BACKFILLING_OFF:
507 done = True
508
509 for lease in self.queue:
510 newqueue.enqueue(lease)
511
512 self.queue = newqueue
513
514
516 """ Schedules a lease.
517
518 This method orchestrates the preparation and VM scheduler to
519 schedule a lease.
520
521 Arguments:
522 lease -- Lease to schedule.
523 nexttime -- The next time at which the scheduler can allocate resources.
524 """
525
526 lease_state = lease.get_state()
527 migration = get_config().get("migration")
528
529
530 if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
531
532
533 earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
534 elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
535
536
537
538
539
540
541 node_ids = self.slottable.nodes.keys()
542 earliest = {}
543 if migration == constants.MIGRATE_NO:
544
545
546 for node in node_ids:
547 earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
548 else:
549
550
551
552 prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)
553 vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
554 for node in node_ids:
555 earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
556 else:
557 raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
558
559
560
561
562
563
564
565
566
567 (vmrr, preemptions) = self.vm_scheduler.schedule(lease, nexttime, earliest)
568
569
570
571 if len(preemptions) > 0:
572 self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id))
573 for l in preemptions:
574 self.__preempt_lease(l, preemption_time=vmrr.start)
575
576
577 is_ready = False
578 preparation_rrs = []
579 if lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration != constants.MIGRATE_NO:
580
581 migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
582 if len(migr_rrs) > 0:
583 end_migr = migr_rrs[-1].end
584 else:
585 end_migr = nexttime
586 migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
587 migr_rrs.reverse()
588 for migr_rr in migr_rrs:
589 vmrr.pre_rrs.insert(0, migr_rr)
590 if len(migr_rrs) == 0:
591 is_ready = True
592 elif lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration == constants.MIGRATE_NO:
593
594 is_ready = True
595 elif lease_state in (Lease.STATE_PENDING, Lease.STATE_QUEUED):
596
597 preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
598
599
600
601
602
603 for rr in preparation_rrs:
604 lease.append_preparationrr(rr)
605
606
607 lease.append_vmrr(vmrr)
608
609
610
611
612
613 for rr in preparation_rrs:
614 self.slottable.add_reservation(rr)
615
616
617 for rr in vmrr.pre_rrs:
618 self.slottable.add_reservation(rr)
619
620
621 self.slottable.add_reservation(vmrr)
622
623
624 for rr in vmrr.post_rrs:
625 self.slottable.add_reservation(rr)
626
627
628 if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
629 lease.set_state(Lease.STATE_SCHEDULED)
630 if is_ready:
631 lease.set_state(Lease.STATE_READY)
632 elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
633 lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
634
635 get_persistence().persist_lease(lease)
636
637 lease.print_contents()
638
639
641 """ Preempts a lease.
642
643 This method preempts a lease such that any resources allocated
644 to that lease after a given time are freed up. This may require
645 scheduling the lease to suspend before that time, or cancelling
646 the lease altogether.
647
648 Arguments:
649 lease -- Lease to schedule.
650 preemption_time -- Time at which lease must be preempted
651 """
652
653 self.logger.info("Preempting lease #%i..." % (lease.id))
654 self.logger.vdebug("Lease before preemption:")
655 lease.print_contents()
656 vmrr = lease.get_last_vmrr()
657
658 if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
659 self.logger.debug("Lease was set to start in the middle of the preempting lease.")
660 must_cancel_and_requeue = True
661 else:
662 susptype = get_config().get("suspension")
663 if susptype == constants.SUSPENSION_NONE:
664 must_cancel_and_requeue = True
665 else:
666 can_suspend = self.vm_scheduler.can_suspend_at(lease, preemption_time)
667 if not can_suspend:
668 self.logger.debug("Suspending the lease does not meet scheduling threshold.")
669 must_cancel_and_requeue = True
670 else:
671 if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
672 self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
673 must_cancel_and_requeue = True
674 else:
675 self.logger.debug("Lease can be suspended")
676 must_cancel_and_requeue = False
677
678 if must_cancel_and_requeue:
679 self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
680 self.preparation_scheduler.cancel_preparation(lease)
681 self.vm_scheduler.cancel_vm(vmrr)
682 lease.remove_vmrr(vmrr)
683
684 if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
685 lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
686 else:
687 lease.set_state(Lease.STATE_QUEUED)
688 self.__enqueue_in_order(lease)
689 else:
690 self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
691 self.vm_scheduler.preempt_vm(vmrr, preemption_time)
692
693 get_persistence().persist_lease(lease)
694
695 self.logger.vdebug("Lease after preemption:")
696 lease.print_contents()
697
698
700 """Queues a best-effort lease request
701
702 Arguments:
703 lease -- Lease to be queued
704 """
705 self.queue.enqueue(lease)
706
707
709 """Queues a lease in order (currently, time of submission)
710
711 Arguments:
712 lease -- Lease to be queued
713 """
714 self.queue.enqueue_in_order(lease)
715
716
718 """Performs actions that have to be done each time a reservation ends.
719
720 Arguments:
721 rr -- Reservation that ended
722 """
723 self.slottable.remove_reservation(rr)
724
725
739
740
741
743 """A simple queue for leases
744
745 This class is a simple queue container for leases, with some
746 extra syntactic sugar added for convenience.
747 """
748
751
753 return len(self.__q)==0
754
757
759 return self.__q.pop(0)
760
762 self.__q.append(r)
763 self.__q.sort(key=attrgetter("submit_time"))
764
767
769 return (1 == len([l for l in self.__q if l.id == lease_id]))
770
772 return [l for l in self.__q if l.id == lease_id][0]
773
776
778 return iter(self.__q)
779
781 """A simple container for leases
782
783 This class is a simple dictionary-like container for leases, with some
784 extra syntactic sugar added for convenience.
785 """
786
789
791 return self.entries.has_key(lease_id)
792
794 return self.entries[lease_id]
795
797 return len(self.entries)==0
798
800 del self.entries[lease.id]
801
802 - def add(self, lease):
803 self.entries[lease.id] = lease
804
806 if type==None:
807 return self.entries.values()
808 else:
809 return [e for e in self.entries.values() if e.get_type() == type]
810
812 return [e for e in self.entries.values() if e.get_state() == state]
813