Package haizea :: Package core :: Package scheduler :: Module lease_scheduler
[hide private]
[frames] | no frames]

Source Code for Module haizea.core.scheduler.lease_scheduler

  1  # -------------------------------------------------------------------------- # 
  2  # Copyright 2006-2009, University of Chicago                                 # 
  3  # Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   # 
  4  # Complutense de Madrid (dsa-research.org)                                   # 
  5  #                                                                            # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); you may    # 
  7  # not use this file except in compliance with the License. You may obtain    # 
  8  # a copy of the License at                                                   # 
  9  #                                                                            # 
 10  # http://www.apache.org/licenses/LICENSE-2.0                                 # 
 11  #                                                                            # 
 12  # Unless required by applicable law or agreed to in writing, software        # 
 13  # distributed under the License is distributed on an "AS IS" BASIS,          # 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   # 
 15  # See the License for the specific language governing permissions and        # 
 16  # limitations under the License.                                             # 
 17  # -------------------------------------------------------------------------- # 
 18   
 19   
 20  """This module provides the main classes for Haizea's lease scheduler, particularly 
 21  the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e., 
 22  the code that decides what physical hosts a VM should be mapped to), which is 
 23  located in the vm_scheduler module. Lease preparation code (e.g., image transfer  
 24  scheduling) is located in the preparation_schedulers package. In fact, the 
 25  main purpose of the lease schedule is to orchestrate these preparation and VM 
 26  schedulers. 
 27   
 28  This module also includes a Queue class and a LeaseTable class, which are used 
 29  by the lease scheduler. 
 30  """ 
 31   
 32  import haizea.common.constants as constants 
 33  from haizea.common.utils import round_datetime, get_config, get_clock, get_policy, get_persistence 
 34  from haizea.core.leases import Lease 
 35  from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime 
 36  from haizea.core.scheduler.slottable import ResourceReservation 
 37  from operator import attrgetter 
 38   
 39  import logging 
 40   
41 -class LeaseScheduler(object):
42 """The Haizea Lease Scheduler 43 44 This is the main scheduling class in Haizea. It handles lease scheduling which, 45 in turn, involves VM scheduling, preparation scheduling (such as transferring 46 a VM image), and numerous bookkeeping operations. All these operations are 47 handled by other classes, so this class acts mostly as an orchestrator that 48 coordinates all the different operations involved in scheduling a lease. 49 """ 50
51 - def __init__(self, vm_scheduler, preparation_scheduler, slottable, accounting):
52 """Constructor 53 54 The constructor does little more than create the lease scheduler's 55 attributes. However, it does expect (in the arguments) a fully-constructed 56 VMScheduler, PreparationScheduler, SlotTable, and PolicyManager (these are 57 constructed in the Manager's constructor). 58 59 Arguments: 60 vm_scheduler -- VM scheduler 61 preparation_scheduler -- Preparation scheduler 62 slottable -- Slottable 63 accounting -- AccountingDataCollection object 64 """ 65 66 # Logger 67 self.logger = logging.getLogger("LSCHED") 68 69 # Assign schedulers and slottable 70 self.vm_scheduler = vm_scheduler 71 """ 72 VM Scheduler 73 @type: VMScheduler 74 """ 75 self.preparation_scheduler = preparation_scheduler 76 self.slottable = slottable 77 self.accounting = accounting 78 79 # Create other data structures 80 self.queue = Queue() 81 self.leases = LeaseTable() 82 self.completed_leases = LeaseTable() 83 84 # Handlers are callback functions that get called whenever a type of 85 # resource reservation starts or ends. Each scheduler publishes the 86 # handlers it supports through its "handlers" attributes. For example, 87 # the VMScheduler provides _handle_start_vm and _handle_end_vm that 88 # must be called when a VMResourceReservation start or end is encountered 89 # in the slot table. 90 # 91 # Handlers are called from the process_reservations method of this class 92 self.handlers = {} 93 for (type, handler) in self.vm_scheduler.handlers.items(): 94 self.handlers[type] = handler 95 96 for (type, handler) in self.preparation_scheduler.handlers.items(): 97 self.handlers[type] = handler
98 99
100 - def request_lease(self, lease):
101 """Requests a leases. This is the entry point of leases into the scheduler. 102 103 Request a lease. The decision on whether to accept or reject a 104 lease is deferred to the policy manager (through its admission 105 control policy). 106 107 If the policy determines the lease can be 108 accepted, it is marked as "Pending". This still doesn't 109 guarantee that the lease will be scheduled (e.g., an AR lease 110 could still be rejected if the scheduler determines there are no 111 resources for it; but that is a *scheduling* decision, not a admission 112 control policy decision). The ultimate fate of the lease is determined 113 the next time the scheduling function is called. 114 115 If the policy determines the lease cannot be accepted, it is marked 116 as rejected. 117 118 Arguments: 119 lease -- Lease object. Its state must be STATE_NEW. 120 """ 121 self.logger.info("Lease #%i has been requested." % lease.id) 122 if lease.submit_time == None: 123 lease.submit_time = round_datetime(get_clock().get_time()) 124 lease.print_contents() 125 lease.set_state(Lease.STATE_PENDING) 126 if get_policy().accept_lease(lease): 127 self.logger.info("Lease #%i has been marked as pending." % lease.id) 128 self.leases.add(lease) 129 else: 130 self.logger.info("Lease #%i has not been accepted" % lease.id) 131 lease.set_state(Lease.STATE_REJECTED) 132 self.completed_leases.add(lease) 133 134 self.accounting.at_lease_request(lease) 135 get_persistence().persist_lease(lease)
136
137 - def schedule(self, nexttime):
138 """ The main scheduling function 139 140 The scheduling function looks at all pending requests and schedules them. 141 Note that most of the actual scheduling code is contained in the 142 __schedule_lease method and in the VMScheduler and PreparationScheduler classes. 143 144 Arguments: 145 nexttime -- The next time at which the scheduler can allocate resources. 146 """ 147 148 # Get pending leases 149 pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING) 150 ar_leases = [req for req in pending_leases if req.get_type() == Lease.ADVANCE_RESERVATION] 151 im_leases = [req for req in pending_leases if req.get_type() == Lease.IMMEDIATE] 152 be_leases = [req for req in pending_leases if req.get_type() == Lease.BEST_EFFORT] 153 154 # Queue best-effort leases 155 for lease in be_leases: 156 self.__enqueue(lease) 157 lease.set_state(Lease.STATE_QUEUED) 158 self.logger.info("Queued best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested)) 159 get_persistence().persist_lease(lease) 160 161 # Schedule immediate leases 162 for lease in im_leases: 163 self.logger.info("Scheduling immediate lease #%i (%i nodes)" % (lease.id, lease.numnodes)) 164 lease.print_contents() 165 166 try: 167 self.__schedule_lease(lease, nexttime=nexttime) 168 self.logger.info("Immediate lease #%i has been scheduled." % lease.id) 169 lease.print_contents() 170 except NotSchedulableException, exc: 171 self.logger.info("Immediate lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason)) 172 lease.set_state(Lease.STATE_REJECTED) 173 self.completed_leases.add(lease) 174 self.accounting.at_lease_done(lease) 175 self.leases.remove(lease) 176 get_persistence().persist_lease(lease) 177 178 # Schedule AR requests 179 for lease in ar_leases: 180 self.logger.info("Scheduling AR lease #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested)) 181 lease.print_contents() 182 183 try: 184 self.__schedule_lease(lease, nexttime) 185 self.logger.info("AR lease #%i has been scheduled." % lease.id) 186 lease.print_contents() 187 except NotSchedulableException, exc: 188 self.logger.info("AR lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason)) 189 lease.set_state(Lease.STATE_REJECTED) 190 self.completed_leases.add(lease) 191 self.accounting.at_lease_done(lease) 192 self.leases.remove(lease) 193 get_persistence().persist_lease(lease) 194 195 # Process queue (i.e., traverse queue in search of leases that can be scheduled) 196 self.__process_queue(nexttime) 197 get_persistence().persist_queue(self.queue)
198
199 - def process_starting_reservations(self, nowtime):
200 """Processes starting reservations 201 202 This method checks the slottable to see if there are any reservations that are 203 starting at "nowtime". If so, the appropriate handler is called. 204 205 Arguments: 206 nowtime -- Time at which to check for starting reservations. 207 """ 208 209 # Find starting/ending reservations 210 starting = self.slottable.get_reservations_starting_at(nowtime) 211 starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED] 212 213 # Process starting reservations 214 for rr in starting: 215 lease = rr.lease 216 # Call the appropriate handler, and catch exceptions and errors. 217 try: 218 self.handlers[type(rr)].on_start(lease, rr) 219 220 # An InconsistentLeaseStateError is raised when the lease is in an inconsistent 221 # state. This is usually indicative of a programming error, but not necessarily 222 # one that affects all leases, so we just fail this lease. Note that Haizea can also 223 # be configured to stop immediately when a lease fails. 224 except InconsistentLeaseStateError, exc: 225 self.fail_lease(lease, exc) 226 # An EnactmentError is raised when the handler had to perform an enactment action 227 # (e.g., stopping a VM), and that enactment action failed. This is currently treated 228 # as a non-recoverable error for the lease, and the lease is failed. 229 except EnactmentError, exc: 230 self.fail_lease(lease, exc) 231 232 # Other exceptions are not expected, and generally indicate a programming error. 233 # Thus, they are propagated upwards to the Manager where they will make 234 # Haizea crash and burn. 235 236 get_persistence().persist_lease(lease)
237
238 - def process_ending_reservations(self, nowtime):
239 """Processes ending reservations 240 241 This method checks the slottable to see if there are any reservations that are 242 ending at "nowtime". If so, the appropriate handler is called. 243 244 Arguments: 245 nowtime -- Time at which to check for starting/ending reservations. 246 """ 247 248 # Find starting/ending reservations 249 ending = self.slottable.get_reservations_ending_at(nowtime) 250 ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE] 251 252 # Process ending reservations 253 for rr in ending: 254 lease = rr.lease 255 self._handle_end_rr(rr) 256 257 # Call the appropriate handler, and catch exceptions and errors. 258 try: 259 self.handlers[type(rr)].on_end(lease, rr) 260 261 # A RescheduleLeaseException indicates that the lease has to be rescheduled 262 except RescheduleLeaseException, exc: 263 # Currently, the only leases that get rescheduled are best-effort leases, 264 # once they've been suspended. 265 if rr.lease.get_type() == Lease.BEST_EFFORT: 266 if lease.get_state() == Lease.STATE_SUSPENDED_PENDING: 267 # Put back in the queue, in the same order it arrived 268 self.__enqueue_in_order(lease) 269 lease.set_state(Lease.STATE_SUSPENDED_QUEUED) 270 get_persistence().persist_queue(self.queue) 271 else: 272 raise InconsistentLeaseStateError(lease, doing = "rescheduling best-effort lease") 273 274 # A NormalEndLeaseException indicates that the end of this reservations marks 275 # the normal end of the lease. 276 except NormalEndLeaseException, msg: 277 self._handle_end_lease(lease) 278 279 # An InconsistentLeaseStateError is raised when the lease is in an inconsistent 280 # state. This is usually indicative of a programming error, but not necessarily 281 # one that affects all leases, so we just fail this lease. Note that Haizea can also 282 # be configured to stop immediately when a lease fails. 283 except InconsistentLeaseStateError, exc: 284 self.fail_lease(lease, exc) 285 286 # An EnactmentError is raised when the handler had to perform an enactment action 287 # (e.g., stopping a VM), and that enactment action failed. This is currently treated 288 # as a non-recoverable error for the lease, and the lease is failed. 289 except EnactmentError, exc: 290 self.fail_lease(lease, exc) 291 292 # Other exceptions are not expected, and generally indicate a programming error. 293 # Thus, they are propagated upwards to the Manager where they will make 294 # Haizea crash and burn. 295 296 get_persistence().persist_lease(lease)
297
298 - def get_lease_by_id(self, lease_id):
299 """Gets a lease with the given ID 300 301 This method is useful for UIs (like the CLI) that operate on the lease ID. 302 If no lease with a given ID is found, None is returned. 303 304 Arguments: 305 lease_id -- The ID of the lease 306 """ 307 if not self.leases.has_lease(lease_id): 308 return None 309 else: 310 return self.leases.get_lease(lease_id)
311
312 - def cancel_lease(self, lease):
313 """Cancels a lease. 314 315 Arguments: 316 lease -- Lease to cancel 317 """ 318 time = get_clock().get_time() 319 320 self.logger.info("Cancelling lease %i..." % lease.id) 321 322 lease_state = lease.get_state() 323 324 if lease_state == Lease.STATE_PENDING: 325 # If a lease is pending, we just need to change its state and 326 # remove it from the lease table. Since this is done at the 327 # end of this method, we do nothing here. 328 pass 329 330 elif lease_state == Lease.STATE_ACTIVE: 331 # If a lease is active, that means we have to shut down its VMs to cancel it. 332 self.logger.info("Lease %i is active. Stopping active reservation..." % lease.id) 333 vmrr = lease.get_active_vmrrs(time)[0] 334 self._handle_end_rr(vmrr) 335 self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr) 336 337 # Force machines to shut down 338 try: 339 self.vm_scheduler.resourcepool.stop_vms(lease, vmrr) 340 except EnactmentError, exc: 341 self.logger.error("Enactment error when shutting down VMs.") 342 # Right now, this is a non-recoverable error, so we just 343 # propagate it upwards. 344 # In the future, it may be possible to react to these 345 # kind of errors. 346 raise 347 348 elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY, Lease.STATE_RESUMED_READY]: 349 # If a lease is scheduled or ready, we just need to cancel all future reservations 350 # for that lease 351 self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease.id) 352 rrs = lease.get_scheduled_reservations() 353 for r in rrs: 354 self.slottable.remove_reservation(r) 355 356 elif lease_state in [Lease.STATE_QUEUED, Lease.STATE_SUSPENDED_QUEUED]: 357 # If a lease is in the queue, waiting to be scheduled, cancelling 358 # just requires removing it from the queue 359 360 self.logger.info("Lease %i is in the queue. Removing..." % lease.id) 361 self.queue.remove_lease(lease) 362 get_persistence().persist_queue(self.queue) 363 else: 364 # Cancelling in any of the other states is currently unsupported 365 raise InconsistentLeaseStateError(lease, doing = "cancelling the VM") 366 367 # Change state, and remove from lease table 368 lease.set_state(Lease.STATE_CANCELLED) 369 self.completed_leases.add(lease) 370 self.leases.remove(lease) 371 get_persistence().persist_lease(lease)
372 373
374 - def fail_lease(self, lease, exc=None):
375 """Transitions a lease to a failed state, and does any necessary cleaning up 376 377 Arguments: 378 lease -- Lease to fail 379 exc -- The exception that made the lease fail 380 """ 381 treatment = get_config().get("lease-failure-handling") 382 383 if treatment == constants.ONFAILURE_CANCEL: 384 # In this case, a lease failure is handled by cancelling the lease, 385 # but allowing Haizea to continue to run normally. 386 rrs = lease.get_scheduled_reservations() 387 for r in rrs: 388 self.slottable.remove_reservation(r) 389 lease.set_state(Lease.STATE_FAIL) 390 self.completed_leases.add(lease) 391 self.leases.remove(lease) 392 get_persistence().persist_lease(lease) 393 elif treatment == constants.ONFAILURE_EXIT or treatment == constants.ONFAILURE_EXIT_RAISE: 394 # In this case, a lease failure makes Haizea exit. This is useful when debugging, 395 # so we can immediately know about any errors. 396 raise UnrecoverableError(exc)
397 398
399 - def notify_event(self, lease, event):
400 """Notifies an event that affects a lease. 401 402 This is the entry point of asynchronous events into the scheduler. Currently, 403 the only supported event is the premature end of a VM (i.e., before its 404 scheduled end). Other events will emerge when we integrate Haizea with OpenNebula 1.4, 405 since that version will support sending asynchronous events to Haizea. 406 407 Arguments: 408 lease -- Lease the event refers to 409 event -- Event type 410 """ 411 time = get_clock().get_time() 412 if event == constants.EVENT_END_VM: 413 vmrr = lease.get_last_vmrr() 414 self._handle_end_rr(vmrr) 415 # TODO: Exception handling 416 self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr) 417 self._handle_end_lease(lease) 418 get_persistence().persist_lease(lease) 419 420 # We need to reevaluate the schedule to see if there are any 421 # leases scheduled in the future that could be rescheduled 422 # to start earlier 423 nexttime = get_clock().get_next_schedulable_time() 424 self.reevaluate_schedule(nexttime)
425 426
427 - def reevaluate_schedule(self, nexttime):
428 """Reevaluates the schedule. 429 430 This method can be called whenever resources are freed up 431 unexpectedly (e.g., a lease than ends earlier than expected)) 432 to check if any leases scheduled in the future could be 433 rescheduled to start earlier on the freed up resources. 434 435 Currently, this method only checks if best-effort leases 436 scheduled in the future (using a backfilling algorithm) 437 can be rescheduled 438 439 Arguments: 440 nexttime -- The next time at which the scheduler can allocate resources. 441 """ 442 future = self.vm_scheduler.get_future_reschedulable_leases() 443 for l in future: 444 # We can only reschedule leases in the following four states 445 if l.get_state() in (Lease.STATE_PREPARING, Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED): 446 # For each reschedulable lease already scheduled in the 447 # future, we cancel the lease's preparantion and 448 # the last scheduled VM. 449 vmrr = l.get_last_vmrr() 450 self.preparation_scheduler.cancel_preparation(l) 451 self.vm_scheduler.cancel_vm(vmrr) 452 l.remove_vmrr(vmrr) 453 if l.get_state() in (Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_PREPARING): 454 l.set_state(Lease.STATE_PENDING) 455 elif l.get_state() == Lease.STATE_SUSPENDED_SCHEDULED: 456 l.set_state(Lease.STATE_SUSPENDED_PENDING) 457 458 # At this point, the lease just looks like a regular 459 # pending lease that can be handed off directly to the 460 # __schedule_lease method. 461 # TODO: We should do exception handling here. However, 462 # since we can only reschedule best-effort leases that were 463 # originally schedule in the future, the scheduling function 464 # should always be able to schedule the lease (worst-case 465 # scenario is that it simply replicates the previous schedule) 466 self.__schedule_lease(l, nexttime)
467 468
469 - def is_queue_empty(self):
470 """Return True is the queue is empty, False otherwise""" 471 return self.queue.is_empty()
472 473
474 - def exists_scheduled_leases(self):
475 """Return True if there are any leases scheduled in the future""" 476 return not self.slottable.is_empty()
477 478
479 - def __process_queue(self, nexttime):
480 """ Traverses the queue in search of leases that can be scheduled. 481 482 This method processes the queue in order, but takes into account that 483 it may be possible to schedule leases in the future (using a 484 backfilling algorithm) 485 486 Arguments: 487 nexttime -- The next time at which the scheduler can allocate resources. 488 """ 489 490 done = False 491 newqueue = Queue() 492 while not done and not self.is_queue_empty(): 493 if not self.vm_scheduler.can_schedule_in_future() and self.slottable.is_full(nexttime, restype = constants.RES_CPU): 494 self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.") 495 done = True 496 else: 497 lease = self.queue.dequeue() 498 try: 499 self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id) 500 lease.print_contents() 501 self.__schedule_lease(lease, nexttime) 502 except NotSchedulableException, msg: 503 # Put back on queue 504 newqueue.enqueue(lease) 505 self.logger.info("Lease %i could not be scheduled at this time." % lease.id) 506 if get_config().get("backfilling") == constants.BACKFILLING_OFF: 507 done = True 508 509 for lease in self.queue: 510 newqueue.enqueue(lease) 511 512 self.queue = newqueue
513 514
515 - def __schedule_lease(self, lease, nexttime):
516 """ Schedules a lease. 517 518 This method orchestrates the preparation and VM scheduler to 519 schedule a lease. 520 521 Arguments: 522 lease -- Lease to schedule. 523 nexttime -- The next time at which the scheduler can allocate resources. 524 """ 525 526 lease_state = lease.get_state() 527 migration = get_config().get("migration") 528 529 # Determine earliest start time in each node 530 if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED: 531 # This lease might require preparation. Ask the preparation 532 # scheduler for the earliest starting time. 533 earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime) 534 elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED: 535 # This lease may have to be migrated. 536 # We have to ask both the preparation scheduler and the VM 537 # scheduler what would be the earliest possible starting time 538 # on each node, assuming we have to transfer files between 539 # nodes. 540 541 node_ids = self.slottable.nodes.keys() 542 earliest = {} 543 if migration == constants.MIGRATE_NO: 544 # If migration is disabled, the earliest starting time 545 # is simply nexttime. 546 for node in node_ids: 547 earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION) 548 else: 549 # Otherwise, we ask the preparation scheduler and the VM 550 # scheduler how long it would take them to migrate the 551 # lease state. 552 prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease) 553 vm_migr_time = self.vm_scheduler.estimate_migration_time(lease) 554 for node in node_ids: 555 earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION) 556 else: 557 raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease") 558 559 # Now, we give the lease to the VM scheduler, along with the 560 # earliest possible starting times. If the VM scheduler can 561 # schedule VMs for this lease, it will return a resource reservation 562 # that we can add to the slot table, along with a list of 563 # leases that have to be preempted. 564 # If the VM scheduler can't schedule the VMs, it will throw an 565 # exception (we don't catch it here, and it is just thrown up 566 # to the calling method. 567 (vmrr, preemptions) = self.vm_scheduler.schedule(lease, nexttime, earliest) 568 569 # If scheduling the lease involves preempting other leases, 570 # go ahead and preempt them. 571 if len(preemptions) > 0: 572 self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id)) 573 for l in preemptions: 574 self.__preempt_lease(l, preemption_time=vmrr.start) 575 576 # Schedule lease preparation 577 is_ready = False 578 preparation_rrs = [] 579 if lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration != constants.MIGRATE_NO: 580 # The lease might require migration 581 migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime) 582 if len(migr_rrs) > 0: 583 end_migr = migr_rrs[-1].end 584 else: 585 end_migr = nexttime 586 migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr) 587 migr_rrs.reverse() 588 for migr_rr in migr_rrs: 589 vmrr.pre_rrs.insert(0, migr_rr) 590 if len(migr_rrs) == 0: 591 is_ready = True 592 elif lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration == constants.MIGRATE_NO: 593 # No migration means the lease is ready 594 is_ready = True 595 elif lease_state in (Lease.STATE_PENDING, Lease.STATE_QUEUED): 596 # The lease might require initial preparation 597 preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest) 598 599 # At this point, the lease is feasible. 600 # Commit changes by adding RRs to lease and to slot table 601 602 # Add preparation RRs (if any) to lease 603 for rr in preparation_rrs: 604 lease.append_preparationrr(rr) 605 606 # Add VMRR to lease 607 lease.append_vmrr(vmrr) 608 609 610 # Add resource reservations to slottable 611 612 # Preparation RRs (if any) 613 for rr in preparation_rrs: 614 self.slottable.add_reservation(rr) 615 616 # Pre-VM RRs (if any) 617 for rr in vmrr.pre_rrs: 618 self.slottable.add_reservation(rr) 619 620 # VM 621 self.slottable.add_reservation(vmrr) 622 623 # Post-VM RRs (if any) 624 for rr in vmrr.post_rrs: 625 self.slottable.add_reservation(rr) 626 627 # Change lease state 628 if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED: 629 lease.set_state(Lease.STATE_SCHEDULED) 630 if is_ready: 631 lease.set_state(Lease.STATE_READY) 632 elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED: 633 lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED) 634 635 get_persistence().persist_lease(lease) 636 637 lease.print_contents()
638 639
640 - def __preempt_lease(self, lease, preemption_time):
641 """ Preempts a lease. 642 643 This method preempts a lease such that any resources allocated 644 to that lease after a given time are freed up. This may require 645 scheduling the lease to suspend before that time, or cancelling 646 the lease altogether. 647 648 Arguments: 649 lease -- Lease to schedule. 650 preemption_time -- Time at which lease must be preempted 651 """ 652 653 self.logger.info("Preempting lease #%i..." % (lease.id)) 654 self.logger.vdebug("Lease before preemption:") 655 lease.print_contents() 656 vmrr = lease.get_last_vmrr() 657 658 if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time: 659 self.logger.debug("Lease was set to start in the middle of the preempting lease.") 660 must_cancel_and_requeue = True 661 else: 662 susptype = get_config().get("suspension") 663 if susptype == constants.SUSPENSION_NONE: 664 must_cancel_and_requeue = True 665 else: 666 can_suspend = self.vm_scheduler.can_suspend_at(lease, preemption_time) 667 if not can_suspend: 668 self.logger.debug("Suspending the lease does not meet scheduling threshold.") 669 must_cancel_and_requeue = True 670 else: 671 if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL: 672 self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.") 673 must_cancel_and_requeue = True 674 else: 675 self.logger.debug("Lease can be suspended") 676 must_cancel_and_requeue = False 677 678 if must_cancel_and_requeue: 679 self.logger.info("... lease #%i has been cancelled and requeued." % lease.id) 680 self.preparation_scheduler.cancel_preparation(lease) 681 self.vm_scheduler.cancel_vm(vmrr) 682 lease.remove_vmrr(vmrr) 683 # TODO: Take into account other states 684 if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED: 685 lease.set_state(Lease.STATE_SUSPENDED_QUEUED) 686 else: 687 lease.set_state(Lease.STATE_QUEUED) 688 self.__enqueue_in_order(lease) 689 else: 690 self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time)) 691 self.vm_scheduler.preempt_vm(vmrr, preemption_time) 692 693 get_persistence().persist_lease(lease) 694 695 self.logger.vdebug("Lease after preemption:") 696 lease.print_contents()
697 698
699 - def __enqueue(self, lease):
700 """Queues a best-effort lease request 701 702 Arguments: 703 lease -- Lease to be queued 704 """ 705 self.queue.enqueue(lease)
706 707
708 - def __enqueue_in_order(self, lease):
709 """Queues a lease in order (currently, time of submission) 710 711 Arguments: 712 lease -- Lease to be queued 713 """ 714 self.queue.enqueue_in_order(lease)
715 716
717 - def _handle_end_rr(self, rr):
718 """Performs actions that have to be done each time a reservation ends. 719 720 Arguments: 721 rr -- Reservation that ended 722 """ 723 self.slottable.remove_reservation(rr)
724 725
726 - def _handle_end_lease(self, l):
727 """Performs actions that have to be done each time a lease ends. 728 729 Arguments: 730 lease -- Lease that has ended 731 """ 732 l.set_state(Lease.STATE_DONE) 733 l.duration.actual = l.duration.accumulated 734 l.end = round_datetime(get_clock().get_time()) 735 self.preparation_scheduler.cleanup(l) 736 self.completed_leases.add(l) 737 self.leases.remove(l) 738 self.accounting.at_lease_done(l)
739 740 741
742 -class Queue(object):
743 """A simple queue for leases 744 745 This class is a simple queue container for leases, with some 746 extra syntactic sugar added for convenience. 747 """ 748
749 - def __init__(self):
750 self.__q = []
751
752 - def is_empty(self):
753 return len(self.__q)==0
754
755 - def enqueue(self, r):
756 self.__q.append(r)
757
758 - def dequeue(self):
759 return self.__q.pop(0)
760
761 - def enqueue_in_order(self, r):
762 self.__q.append(r) 763 self.__q.sort(key=attrgetter("submit_time"))
764
765 - def length(self):
766 return len(self.__q)
767
768 - def has_lease(self, lease_id):
769 return (1 == len([l for l in self.__q if l.id == lease_id]))
770
771 - def get_lease(self, lease_id):
772 return [l for l in self.__q if l.id == lease_id][0]
773
774 - def remove_lease(self, lease):
775 self.__q.remove(lease)
776
777 - def __iter__(self):
778 return iter(self.__q)
779
780 -class LeaseTable(object):
781 """A simple container for leases 782 783 This class is a simple dictionary-like container for leases, with some 784 extra syntactic sugar added for convenience. 785 """ 786
787 - def __init__(self):
788 self.entries = {}
789
790 - def has_lease(self, lease_id):
791 return self.entries.has_key(lease_id)
792
793 - def get_lease(self, lease_id):
794 return self.entries[lease_id]
795
796 - def is_empty(self):
797 return len(self.entries)==0
798
799 - def remove(self, lease):
800 del self.entries[lease.id]
801
802 - def add(self, lease):
803 self.entries[lease.id] = lease
804
805 - def get_leases(self, type=None):
806 if type==None: 807 return self.entries.values() 808 else: 809 return [e for e in self.entries.values() if e.get_type() == type]
810
811 - def get_leases_by_state(self, state):
812 return [e for e in self.entries.values() if e.get_state() == state]
813