Package haizea :: Package core :: Package scheduler :: Module vm_scheduler
[hide private]
[frames] | no frames]

Source Code for Module haizea.core.scheduler.vm_scheduler

   1  # -------------------------------------------------------------------------- # 
   2  # Copyright 2006-2009, University of Chicago                                 # 
   3  # Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   # 
   4  # Complutense de Madrid (dsa-research.org)                                   # 
   5  #                                                                            # 
   6  # Licensed under the Apache License, Version 2.0 (the "License"); you may    # 
   7  # not use this file except in compliance with the License. You may obtain    # 
   8  # a copy of the License at                                                   # 
   9  #                                                                            # 
  10  # http://www.apache.org/licenses/LICENSE-2.0                                 # 
  11  #                                                                            # 
  12  # Unless required by applicable law or agreed to in writing, software        # 
  13  # distributed under the License is distributed on an "AS IS" BASIS,          # 
  14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   # 
  15  # See the License for the specific language governing permissions and        # 
  16  # limitations under the License.                                             # 
  17  # -------------------------------------------------------------------------- # 
  18   
  19  """This module provides the main classes for Haizea's VM Scheduler. All the 
  20  scheduling code that decides when and where a lease is scheduled is contained 
  21  in the VMScheduler class (except for the code that specifically decides 
  22  what physical machines each virtual machine is mapped to, which is factored out 
  23  into the "mapper" module). This module also provides the classes for the 
  24  reservations that will be placed in the slot table and correspond to VMs.  
  25  """ 
  26   
  27  import haizea.common.constants as constants 
  28  from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_persistence 
  29  from haizea.core.leases import Lease, Capacity 
  30  from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple 
  31  from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation 
  32  from operator import attrgetter, itemgetter 
  33  from mx.DateTime import TimeDelta 
  34   
  35  import logging 
  36   
  37   
38 -class VMScheduler(object):
39 """The Haizea VM Scheduler 40 41 This class is responsible for taking a lease and scheduling VMs to satisfy 42 the requirements of that lease. 43 """ 44
45 - def __init__(self, slottable, resourcepool, mapper, max_in_future):
46 """Constructor 47 48 The constructor does little more than create the VM scheduler's 49 attributes. However, it does expect (in the arguments) a fully-constructed 50 SlotTable, ResourcePool, and Mapper (these are constructed in the 51 Manager's constructor). 52 53 Arguments: 54 slottable -- Slot table 55 resourcepool -- Resource pool where enactment commands will be sent to 56 mapper -- Mapper 57 """ 58 self.slottable = slottable 59 self.resourcepool = resourcepool 60 self.mapper = mapper 61 self.logger = logging.getLogger("VMSCHED") 62 63 # Register the handlers for the types of reservations used by 64 # the VM scheduler 65 self.handlers = {} 66 self.handlers[VMResourceReservation] = ReservationEventHandler( 67 sched = self, 68 on_start = VMScheduler._handle_start_vm, 69 on_end = VMScheduler._handle_end_vm) 70 71 self.handlers[ShutdownResourceReservation] = ReservationEventHandler( 72 sched = self, 73 on_start = VMScheduler._handle_start_shutdown, 74 on_end = VMScheduler._handle_end_shutdown) 75 76 self.handlers[SuspensionResourceReservation] = ReservationEventHandler( 77 sched = self, 78 on_start = VMScheduler._handle_start_suspend, 79 on_end = VMScheduler._handle_end_suspend) 80 81 self.handlers[ResumptionResourceReservation] = ReservationEventHandler( 82 sched = self, 83 on_start = VMScheduler._handle_start_resume, 84 on_end = VMScheduler._handle_end_resume) 85 86 self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler( 87 sched = self, 88 on_start = VMScheduler._handle_start_migrate, 89 on_end = VMScheduler._handle_end_migrate) 90 91 self.max_in_future = max_in_future 92 93 self.future_leases = set()
94 95
96 - def schedule(self, lease, nexttime, earliest):
97 """ The scheduling function 98 99 This particular function doesn't do much except call __schedule_asap 100 and __schedule_exact (which do all the work). 101 102 Arguments: 103 lease -- Lease to schedule 104 nexttime -- The next time at which the scheduler can allocate resources. 105 earliest -- The earliest possible starting times on each physical node 106 """ 107 if lease.get_type() == Lease.BEST_EFFORT: 108 return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = self.can_schedule_in_future()) 109 elif lease.get_type() == Lease.ADVANCE_RESERVATION: 110 return self.__schedule_exact(lease, nexttime, earliest) 111 elif lease.get_type() == Lease.IMMEDIATE: 112 return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = False)
113 114
115 - def estimate_migration_time(self, lease):
116 """ Estimates the time required to migrate a lease's VMs 117 118 This function conservatively estimates that all the VMs are going to 119 be migrated to other nodes. Since all the transfers are intra-node, 120 the bottleneck is the transfer from whatever node has the most 121 memory to transfer. 122 123 Note that this method only estimates the time to migrate the memory 124 state files for the VMs. Migrating the software environment (which may 125 or may not be a disk image) is the responsibility of the preparation 126 scheduler, which has it's own set of migration scheduling methods. 127 128 Arguments: 129 lease -- Lease that might be migrated 130 """ 131 migration = get_config().get("migration") 132 if migration == constants.MIGRATE_YES: 133 vmrr = lease.get_last_vmrr() 134 mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())]) 135 for pnode in vmrr.nodes.values(): 136 mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM) 137 mem_in_pnode[pnode] += mem 138 max_mem_to_transfer = max(mem_in_pnode.values()) 139 bandwidth = self.resourcepool.info.get_migration_bandwidth() 140 return estimate_transfer_time(max_mem_to_transfer, bandwidth) 141 elif migration == constants.MIGRATE_YES_NOTRANSFER: 142 return TimeDelta(seconds=0)
143
144 - def schedule_migration(self, lease, vmrr, nexttime):
145 """ Schedules migrations for a lease 146 147 Arguments: 148 lease -- Lease being migrated 149 vmrr -- The VM reservation before which the migration will take place 150 nexttime -- The next time at which the scheduler can allocate resources. 151 """ 152 153 # Determine what migrations have to be done. We do this by looking at 154 # the mapping in the previous VM RR and in the new VM RR 155 last_vmrr = lease.get_last_vmrr() 156 vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes]) 157 158 # Determine if we actually have to migrate 159 mustmigrate = False 160 for vnode in vnode_migrations: 161 if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]: 162 mustmigrate = True 163 break 164 165 if not mustmigrate: 166 return [] 167 168 # If Haizea is configured to migrate without doing any transfers, 169 # then we just return a nil-duration migration RR 170 if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER: 171 start = nexttime 172 end = nexttime 173 res = {} 174 migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations) 175 migr_rr.state = ResourceReservation.STATE_SCHEDULED 176 return [migr_rr] 177 178 # Figure out what migrations can be done simultaneously 179 migrations = [] 180 while len(vnode_migrations) > 0: 181 pnodes = set() 182 migration = {} 183 for vnode in vnode_migrations: 184 origin = vnode_migrations[vnode][0] 185 dest = vnode_migrations[vnode][1] 186 if not origin in pnodes and not dest in pnodes: 187 migration[vnode] = vnode_migrations[vnode] 188 pnodes.add(origin) 189 pnodes.add(dest) 190 for vnode in migration: 191 del vnode_migrations[vnode] 192 migrations.append(migration) 193 194 # Create migration RRs 195 start = max(last_vmrr.post_rrs[-1].end, nexttime) 196 bandwidth = self.resourcepool.info.get_migration_bandwidth() 197 migr_rrs = [] 198 for m in migrations: 199 vnodes_to_migrate = m.keys() 200 max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate]) 201 migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth) 202 end = start + migr_time 203 res = {} 204 for (origin,dest) in m.values(): 205 resorigin = Capacity([constants.RES_NETOUT]) 206 resorigin.set_quantity(constants.RES_NETOUT, bandwidth) 207 resdest = Capacity([constants.RES_NETIN]) 208 resdest.set_quantity(constants.RES_NETIN, bandwidth) 209 res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin) 210 res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest) 211 migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m) 212 migr_rr.state = ResourceReservation.STATE_SCHEDULED 213 migr_rrs.append(migr_rr) 214 start = end 215 216 return migr_rrs
217
218 - def cancel_vm(self, vmrr):
219 """ Cancels a VM resource reservation 220 221 Arguments: 222 vmrr -- VM RR to be cancelled 223 """ 224 225 # If this VM RR is part of a lease that was scheduled in the future, 226 # remove that lease from the set of future leases. 227 if vmrr.lease in self.future_leases: 228 self.future_leases.remove(vmrr.lease) 229 get_persistence().persist_future_leases(self.future_leases) 230 231 # If there are any pre-RRs that are scheduled, remove them 232 for rr in vmrr.pre_rrs: 233 if rr.state == ResourceReservation.STATE_SCHEDULED: 234 self.slottable.remove_reservation(rr) 235 236 # If there are any post RRs, remove them 237 for rr in vmrr.post_rrs: 238 self.slottable.remove_reservation(rr) 239 240 # Remove the reservation itself 241 self.slottable.remove_reservation(vmrr)
242 243
244 - def can_suspend_at(self, lease, t):
245 """ Determines if it is possible to suspend a lease before a given time 246 247 Arguments: 248 vmrr -- VM RR to be preempted 249 t -- Time by which the VM must be preempted 250 """ 251 # TODO: Make more general, should determine vmrr based on current time 252 # This won't currently break, though, since the calling function 253 # operates on the last VM RR. 254 vmrr = lease.get_last_vmrr() 255 time_until_suspend = t - vmrr.start 256 min_duration = self.__compute_scheduling_threshold(lease) 257 can_suspend = time_until_suspend >= min_duration 258 return can_suspend
259 260
261 - def preempt_vm(self, vmrr, t):
262 """ Preempts a VM reservation at a given time 263 264 This method assumes that the lease is, in fact, preemptable, 265 that the VMs are running at the given time, and that there is 266 enough time to suspend the VMs before the given time (all these 267 checks are done in the lease scheduler). 268 269 Arguments: 270 vmrr -- VM RR to be preempted 271 t -- Time by which the VM must be preempted 272 """ 273 274 # Save original start and end time of the vmrr 275 old_start = vmrr.start 276 old_end = vmrr.end 277 278 # Schedule the VM suspension 279 self.__schedule_suspension(vmrr, t) 280 281 # Update the VMRR in the slot table 282 self.slottable.update_reservation(vmrr, old_start, old_end) 283 284 # Add the suspension RRs to the VM's post-RRs 285 for susprr in vmrr.post_rrs: 286 self.slottable.add_reservation(susprr)
287 288
290 """ Returns a list of future leases that are reschedulable. 291 292 Currently, this list is just the best-effort leases scheduled 293 in the future as determined by the backfilling algorithm. 294 Advance reservation leases, by their nature, cannot be 295 rescheduled to find a "better" starting time. 296 """ 297 return list(self.future_leases)
298 299
300 - def can_schedule_in_future(self):
301 """ Returns True if the backfilling algorithm would allow a lease 302 to be scheduled in the future. 303 304 """ 305 if self.max_in_future == -1: # Unlimited 306 return True 307 else: 308 return len(self.future_leases) < self.max_in_future
309 310
311 - def get_utilization(self, time):
312 """ Computes resource utilization (currently just CPU-based) 313 314 This utilization information shows what 315 portion of the physical resources is used by each type of reservation 316 (e.g., 70% are running a VM, 5% are doing suspensions, etc.) 317 318 Arguments: 319 time -- Time at which to determine utilization 320 """ 321 total = self.slottable.get_total_capacity(restype = constants.RES_CPU) 322 util = {} 323 reservations = self.slottable.get_reservations_at(time) 324 for r in reservations: 325 for node in r.resources_in_pnode: 326 if isinstance(r, VMResourceReservation): 327 use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU) 328 util[type(r)] = use + util.setdefault(type(r),0.0) 329 elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation): 330 use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU) 331 util[type(r)] = use + util.setdefault(type(r),0.0) 332 util[None] = total - sum(util.values()) 333 334 if total != 0: 335 for k in util: 336 util[k] /= total 337 338 return util
339 340
341 - def __schedule_exact(self, lease, nexttime, earliest):
342 """ Schedules VMs that must start at an exact time 343 344 This type of lease is "easy" to schedule because we know the exact 345 start time, which means that's the only starting time we have to 346 check. So, this method does little more than call the mapper. 347 348 Arguments: 349 lease -- Lease to schedule 350 nexttime -- The next time at which the scheduler can allocate resources. 351 earliest -- The earliest possible starting times on each physical node 352 """ 353 354 # Determine the start and end time 355 start = lease.start.requested 356 end = start + lease.duration.requested 357 358 # Convert Capacity objects in lease object into ResourceTuples that 359 # we can hand over to the mapper. 360 requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()]) 361 362 # Let the mapper do its magiv 363 mapping, actualend, preemptions = self.mapper.map(lease, 364 requested_resources, 365 start, 366 end, 367 strictend = True) 368 369 # If no mapping was found, tell the lease scheduler about it 370 if mapping == None: 371 raise NotSchedulableException, "Not enough resources in specified interval" 372 373 # Create VM resource reservations 374 res = {} 375 376 for (vnode,pnode) in mapping.items(): 377 vnode_res = requested_resources[vnode] 378 if res.has_key(pnode): 379 res[pnode].incr(vnode_res) 380 else: 381 res[pnode] = ResourceTuple.copy(vnode_res) 382 383 vmrr = VMResourceReservation(lease, start, end, mapping, res) 384 vmrr.state = ResourceReservation.STATE_SCHEDULED 385 386 # Schedule shutdown for the VM 387 self.__schedule_shutdown(vmrr) 388 389 return vmrr, preemptions
390 391
392 - def __schedule_asap(self, lease, nexttime, earliest, allow_in_future = None):
393 """ Schedules VMs as soon as possible 394 395 This method is a bit more complex that __schedule_exact because 396 we need to figure out what "as soon as possible" actually is. 397 This involves attempting several mappings, at different points 398 in time, before we can schedule the lease. 399 400 This method will always check, at least, if the lease can be scheduled 401 at the earliest possible moment at which the lease could be prepared 402 (e.g., if the lease can't start until 1 hour in the future because that's 403 the earliest possible time at which the disk images it requires can 404 be transferred, then that's when the scheduler will check). Note, however, 405 that this "earliest possible moment" is determined by the preparation 406 scheduler. 407 408 Additionally, if the lease can't be scheduled at the earliest 409 possible moment, it can also check if the lease can be scheduled 410 in the future. This partially implements a backfilling algorithm 411 (the maximum number of future leases is stored in the max_in_future 412 attribute of VMScheduler), the other part being implemented in the 413 __process_queue method of LeaseScheduler. 414 415 Note that, if the method is allowed to scheduled in the future, 416 and assuming that the lease doesn't request more resources than 417 the site itself, this method will always schedule the VMs succesfully 418 (since there's always an empty spot somewhere in the future). 419 420 421 Arguments: 422 lease -- Lease to schedule 423 nexttime -- The next time at which the scheduler can allocate resources. 424 earliest -- The earliest possible starting times on each physical node 425 allow_in_future -- Boolean indicating whether the scheduler is 426 allowed to schedule the VMs in the future. 427 """ 428 429 430 431 # 432 # STEP 1: PROLEGOMENA 433 # 434 435 lease_id = lease.id 436 remaining_duration = lease.duration.get_remaining_duration() 437 shutdown_time = self.__estimate_shutdown_time(lease) 438 439 # We might be scheduling a suspended lease. If so, we will 440 # also have to schedule its resumption. Right now, just 441 # figure out if this is such a lease. 442 mustresume = (lease.get_state() in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED, Lease.STATE_SUSPENDED_SCHEDULED)) 443 444 # This is the minimum duration that we must be able to schedule. 445 # See __compute_scheduling_threshold for more details. 446 min_duration = self.__compute_scheduling_threshold(lease) 447 448 449 # 450 # STEP 2: FIND THE CHANGEPOINTS 451 # 452 453 # Find the changepoints, and the available nodes at each changepoint 454 # We need to do this because the preparation scheduler may have 455 # determined that some nodes might require more time to prepare 456 # than others (e.g., if using disk image caching, some nodes 457 # might have the required disk image predeployed, while others 458 # may require transferring the image to that node). 459 # 460 # The end result of this step is a list (cps) where each entry 461 # is a (t,nodes) pair, where "t" is the time of the changepoint 462 # and "nodes" is the set of nodes that are available at that time. 463 464 if not mustresume: 465 # If this is not a suspended lease, then the changepoints 466 # are determined based on the "earliest" parameter. 467 cps = [(node, e.time) for node, e in earliest.items()] 468 cps.sort(key=itemgetter(1)) 469 curcp = None 470 changepoints = [] 471 nodes = [] 472 for node, time in cps: 473 nodes.append(node) 474 if time != curcp: 475 changepoints.append([time, set(nodes)]) 476 curcp = time 477 else: 478 changepoints[-1][1] = set(nodes) 479 else: 480 # If the lease is suspended, we take into account that, if 481 # migration is disabled, we can only schedule the lease 482 # on the nodes it is currently scheduled on. 483 if get_config().get("migration") == constants.MIGRATE_NO: 484 vmrr = lease.get_last_vmrr() 485 onlynodes = set(vmrr.nodes.values()) 486 else: 487 onlynodes = None 488 changepoints = list(set([x.time for x in earliest.values()])) 489 changepoints.sort() 490 changepoints = [(x, onlynodes) for x in changepoints] 491 492 493 # If we can schedule VMs in the future, 494 # we also consider future changepoints 495 if allow_in_future: 496 res = self.slottable.get_reservations_ending_after(changepoints[-1][0]) 497 # We really only care about changepoints where VMs end (which is 498 # when resources become available) 499 futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)] 500 # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be 501 # included in futurecp. 502 futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res] 503 if not mustresume: 504 futurecp = [(p,None) for p in futurecp] 505 else: 506 futurecp = [(p,onlynodes) for p in futurecp] 507 else: 508 futurecp = [] 509 510 511 # 512 # STEP 3: FIND A MAPPING 513 # 514 515 # In this step we find a starting time and a mapping for the VMs, 516 # which involves going through the changepoints in order and seeing 517 # if we can find a mapping. 518 # Most of the work is done in the __find_fit_at_points 519 520 # If resuming, we also have to allocate enough time for the resumption 521 if mustresume: 522 duration = remaining_duration + self.__estimate_resume_time(lease) 523 else: 524 duration = remaining_duration 525 526 duration += shutdown_time 527 528 in_future = False 529 530 # Convert Capacity objects in lease object into ResourceTuples that 531 # we can hand over to the mapper. 532 requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()]) 533 534 # First, try to find a mapping assuming we can't schedule in the future 535 start, end, mapping, preemptions = self.__find_fit_at_points(lease, 536 requested_resources, 537 changepoints, 538 duration, 539 min_duration) 540 541 if start == None and not allow_in_future: 542 # We did not find a suitable starting time. This can happen 543 # if we're unable to schedule in the future 544 raise NotSchedulableException, "Could not find enough resources for this request" 545 546 # If we haven't been able to fit the lease, check if we can 547 # reserve it in the future 548 if start == None and allow_in_future: 549 start, end, mapping, preemptions = self.__find_fit_at_points(lease, 550 requested_resources, 551 futurecp, 552 duration, 553 min_duration 554 ) 555 # TODO: The following will also raise an exception if a lease 556 # makes a request that could *never* be satisfied with the 557 # current resources. 558 if start == None: 559 raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)" 560 561 in_future = True 562 563 # 564 # STEP 4: CREATE RESERVATIONS 565 # 566 567 # At this point, the lease is feasible. We just need to create 568 # the reservations for the VMs and, possibly, for the VM resumption, 569 # suspension, and shutdown. 570 571 # VM resource reservation 572 res = {} 573 574 for (vnode,pnode) in mapping.items(): 575 vnode_res = requested_resources[vnode] 576 if res.has_key(pnode): 577 res[pnode].incr(vnode_res) 578 else: 579 res[pnode] = ResourceTuple.copy(vnode_res) 580 581 vmrr = VMResourceReservation(lease, start, end, mapping, res) 582 vmrr.state = ResourceReservation.STATE_SCHEDULED 583 584 # VM resumption resource reservation 585 if mustresume: 586 self.__schedule_resumption(vmrr, start) 587 588 # If the mapper couldn't find a mapping for the full duration 589 # of the lease, then we need to schedule a suspension. 590 mustsuspend = (vmrr.end - vmrr.start) < remaining_duration 591 if mustsuspend: 592 self.__schedule_suspension(vmrr, end) 593 else: 594 # Compensate for any overestimation 595 if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time: 596 vmrr.end = vmrr.start + remaining_duration + shutdown_time 597 self.__schedule_shutdown(vmrr) 598 599 if in_future: 600 self.future_leases.add(lease) 601 get_persistence().persist_future_leases(self.future_leases) 602 603 susp_str = res_str = "" 604 if mustresume: 605 res_str = " (resuming)" 606 if mustsuspend: 607 susp_str = " (suspending)" 608 self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str)) 609 610 return vmrr, preemptions
611 612
613 - def __find_fit_at_points(self, lease, requested_resources, changepoints, duration, min_duration):
614 """ Tries to map a lease in a given list of points in time 615 616 This method goes through a given list of points in time and tries 617 to find the earliest time at which that lease can be allocated 618 resources. 619 620 Arguments: 621 lease -- Lease to schedule 622 requested_resources -- A dictionary of lease node -> ResourceTuple. 623 changepoints -- The list of changepoints 624 duration -- The amount of time requested 625 min_duration -- The minimum amount of time that should be allocated 626 627 Returns: 628 start -- The time at which resources have been found for the lease 629 actualend -- The time at which the resources won't be available. Note 630 that this is not necessarily (start + duration) since the mapper 631 might be unable to find enough resources for the full requested duration. 632 mapping -- A mapping of lease nodes to physical nodes 633 preemptions -- A list of 634 (if no mapping is found, all these values are set to None) 635 """ 636 found = False 637 638 for time, onlynodes in changepoints: 639 start = time 640 end = start + duration 641 self.logger.debug("Attempting to map from %s to %s" % (start, end)) 642 643 # If suspension is disabled, we will only accept mappings that go 644 # from "start" strictly until "end". 645 susptype = get_config().get("suspension") 646 if susptype == constants.SUSPENSION_NONE or (lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL): 647 strictend = True 648 else: 649 strictend = False 650 651 # Let the mapper work its magic 652 mapping, actualend, preemptions = self.mapper.map(lease, 653 requested_resources, 654 start, 655 end, 656 strictend = strictend, 657 onlynodes = onlynodes) 658 659 # We have a mapping; we still have to check if it satisfies 660 # the minimum duration. 661 if mapping != None: 662 if actualend < end: 663 actualduration = actualend - start 664 if actualduration >= min_duration: 665 self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend)) 666 found = True 667 break 668 else: 669 self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration)) 670 else: 671 self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end)) 672 found = True 673 break 674 675 if found: 676 return start, actualend, mapping, preemptions 677 else: 678 return None, None, None, None
679 680
681 - def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
682 """ Computes the times at which suspend/resume operations would have to start 683 684 When suspending or resuming a VM, the VM's memory is dumped to a 685 file on disk. To correctly estimate the time required to suspend 686 a lease with multiple VMs, Haizea makes sure that no two 687 suspensions/resumptions happen at the same time (e.g., if eight 688 memory files were being saved at the same time to disk, the disk's 689 performance would be reduced in a way that is not as easy to estimate 690 as if only one file were being saved at a time). Based on a number 691 of parameters, this method estimates the times at which the 692 suspend/resume commands would have to be sent to guarantee this 693 exclusion. 694 695 Arguments: 696 vmrr -- The VM reservation that will be suspended/resumed 697 time -- The time at which the suspend should end or the resume should start. 698 direction -- DIRECTION_BACKWARD: start at "time" and compute the times going 699 backward (for suspensions) DIRECTION_FORWARD: start at time "time" and compute 700 the times going forward. 701 exclusion -- SUSPRES_EXCLUSION_GLOBAL (memory is saved to global filesystem) 702 or SUSPRES_EXCLUSION_LOCAL (saved to local filesystem) 703 rate -- The rate at which an individual VM is suspended/resumed 704 override -- If specified, then instead of computing the time to 705 suspend/resume VM based on its memory and the "rate" parameter, 706 use this override value. 707 708 """ 709 times = [] # (start, end, {pnode -> vnodes}) 710 enactment_overhead = get_config().get("enactment-overhead") 711 712 if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL: 713 # Global exclusion (which represents, e.g., reading/writing the memory image files 714 # from a global file system) meaning no two suspensions/resumptions can happen at 715 # the same time in the entire resource pool. 716 717 t = time 718 t_prev = None 719 720 for (vnode,pnode) in vmrr.nodes.items(): 721 if override == None: 722 mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM) 723 op_time = self.__compute_suspend_resume_time(mem, rate) 724 else: 725 op_time = override 726 727 op_time += enactment_overhead 728 729 t_prev = t 730 731 if direction == constants.DIRECTION_FORWARD: 732 t += op_time 733 times.append((t_prev, t, {pnode:[vnode]})) 734 elif direction == constants.DIRECTION_BACKWARD: 735 t -= op_time 736 times.append((t, t_prev, {pnode:[vnode]})) 737 738 elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL: 739 # Local exclusion (which represents, e.g., reading the memory image files 740 # from a local file system) means no two resumptions can happen at the same 741 # time in the same physical node. 742 pervnode_times = [] # (start, end, vnode) 743 vnodes_in_pnode = {} 744 for (vnode,pnode) in vmrr.nodes.items(): 745 vnodes_in_pnode.setdefault(pnode, []).append(vnode) 746 for pnode in vnodes_in_pnode: 747 t = time 748 t_prev = None 749 for vnode in vnodes_in_pnode[pnode]: 750 if override == None: 751 mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM) 752 op_time = self.__compute_suspend_resume_time(mem, rate) 753 else: 754 op_time = override 755 756 t_prev = t 757 758 if direction == constants.DIRECTION_FORWARD: 759 t += op_time 760 pervnode_times.append((t_prev, t, vnode)) 761 elif direction == constants.DIRECTION_BACKWARD: 762 t -= op_time 763 pervnode_times.append((t, t_prev, vnode)) 764 765 # Consolidate suspend/resume operations happening at the same time 766 uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times]) 767 for (start, end) in uniq_times: 768 vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end] 769 node_mappings = {} 770 for vnode in vnodes: 771 pnode = vmrr.nodes[vnode] 772 node_mappings.setdefault(pnode, []).append(vnode) 773 times.append([start,end,node_mappings]) 774 775 # Add the enactment overhead 776 for t in times: 777 num_vnodes = sum([len(vnodes) for vnodes in t[2].values()]) 778 overhead = TimeDelta(seconds = num_vnodes * enactment_overhead) 779 if direction == constants.DIRECTION_FORWARD: 780 t[1] += overhead 781 elif direction == constants.DIRECTION_BACKWARD: 782 t[0] -= overhead 783 784 # Fix overlaps 785 if direction == constants.DIRECTION_FORWARD: 786 times.sort(key=itemgetter(0)) 787 elif direction == constants.DIRECTION_BACKWARD: 788 times.sort(key=itemgetter(1)) 789 times.reverse() 790 791 prev_start = None 792 prev_end = None 793 for t in times: 794 if prev_start != None: 795 start = t[0] 796 end = t[1] 797 if direction == constants.DIRECTION_FORWARD: 798 if start < prev_end: 799 diff = prev_end - start 800 t[0] += diff 801 t[1] += diff 802 elif direction == constants.DIRECTION_BACKWARD: 803 if end > prev_start: 804 diff = end - prev_start 805 t[0] -= diff 806 t[1] -= diff 807 prev_start = t[0] 808 prev_end = t[1] 809 810 return times
811 812
813 - def __schedule_shutdown(self, vmrr):
814 """ Schedules the shutdown of a VM reservation 815 816 Arguments: 817 vmrr -- The VM reservation that will be shutdown 818 819 """ 820 shutdown_time = self.__estimate_shutdown_time(vmrr.lease) 821 822 start = vmrr.end - shutdown_time 823 end = vmrr.end 824 825 shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr) 826 shutdown_rr.state = ResourceReservation.STATE_SCHEDULED 827 828 vmrr.update_end(start) 829 830 # If there are any post RRs, remove them 831 for rr in vmrr.post_rrs: 832 self.slottable.remove_reservation(rr) 833 vmrr.post_rrs = [] 834 835 vmrr.post_rrs.append(shutdown_rr)
836 837
838 - def __schedule_suspension(self, vmrr, suspend_by):
839 """ Schedules the suspension of a VM reservation 840 841 Most of the work is done in __compute_susprem_times. See that 842 method's documentation for more details. 843 844 Arguments: 845 vmrr -- The VM reservation that will be suspended 846 suspend_by -- The time by which the VMs should be suspended. 847 848 """ 849 config = get_config() 850 susp_exclusion = config.get("suspendresume-exclusion") 851 override = get_config().get("override-suspend-time") 852 rate = config.get("suspend-rate") 853 854 if suspend_by < vmrr.start or suspend_by > vmrr.end: 855 raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end) 856 857 # Find the suspension times 858 times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override) 859 860 # Create the suspension resource reservations 861 suspend_rrs = [] 862 for (start, end, node_mappings) in times: 863 suspres = {} 864 all_vnodes = [] 865 for (pnode,vnodes) in node_mappings.items(): 866 num_vnodes = len(vnodes) 867 r = Capacity([constants.RES_MEM,constants.RES_DISK]) 868 mem = 0 869 for vnode in vnodes: 870 mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM) 871 r.set_quantity(constants.RES_MEM, mem * num_vnodes) 872 r.set_quantity(constants.RES_DISK, mem * num_vnodes) 873 suspres[pnode] = self.slottable.create_resource_tuple_from_capacity(r) 874 all_vnodes += vnodes 875 876 susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr) 877 susprr.state = ResourceReservation.STATE_SCHEDULED 878 suspend_rrs.append(susprr) 879 880 suspend_rrs.sort(key=attrgetter("start")) 881 882 susp_start = suspend_rrs[0].start 883 if susp_start < vmrr.start: 884 raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start) 885 886 vmrr.update_end(susp_start) 887 888 # If there are any post RRs, remove them 889 for rr in vmrr.post_rrs: 890 self.slottable.remove_reservation(rr) 891 vmrr.post_rrs = [] 892 893 # Add the suspension RRs to the VM RR 894 for susprr in suspend_rrs: 895 vmrr.post_rrs.append(susprr)
896 897
898 - def __schedule_resumption(self, vmrr, resume_at):
899 """ Schedules the resumption of a VM reservation 900 901 Most of the work is done in __compute_susprem_times. See that 902 method's documentation for more details. 903 904 Arguments: 905 vmrr -- The VM reservation that will be resumed 906 resume_at -- The time at which the resumption should start 907 908 """ 909 config = get_config() 910 resm_exclusion = config.get("suspendresume-exclusion") 911 override = get_config().get("override-resume-time") 912 rate = config.get("resume-rate") 913 914 if resume_at < vmrr.start or resume_at > vmrr.end: 915 raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end) 916 917 # Find the resumption times 918 times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override) 919 920 # Create the resumption resource reservations 921 resume_rrs = [] 922 for (start, end, node_mappings) in times: 923 resmres = {} 924 all_vnodes = [] 925 for (pnode,vnodes) in node_mappings.items(): 926 num_vnodes = len(vnodes) 927 r = Capacity([constants.RES_MEM,constants.RES_DISK]) 928 mem = 0 929 for vnode in vnodes: 930 mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM) 931 r.set_quantity(constants.RES_MEM, mem * num_vnodes) 932 r.set_quantity(constants.RES_DISK, mem * num_vnodes) 933 resmres[pnode] = self.slottable.create_resource_tuple_from_capacity(r) 934 all_vnodes += vnodes 935 resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr) 936 resmrr.state = ResourceReservation.STATE_SCHEDULED 937 resume_rrs.append(resmrr) 938 939 resume_rrs.sort(key=attrgetter("start")) 940 941 resm_end = resume_rrs[-1].end 942 if resm_end > vmrr.end: 943 raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end) 944 945 vmrr.update_start(resm_end) 946 947 # Add the resumption RRs to the VM RR 948 for resmrr in resume_rrs: 949 vmrr.pre_rrs.append(resmrr)
950 951
952 - def __compute_suspend_resume_time(self, mem, rate):
953 """ Compute the time to suspend/resume a single VM 954 955 Arguments: 956 mem -- Amount of memory used by the VM 957 rate -- The rate at which an individual VM is suspended/resumed 958 959 """ 960 time = float(mem) / rate 961 time = round_datetime_delta(TimeDelta(seconds = time)) 962 return time
963 964
965 - def __estimate_suspend_time(self, lease):
966 """ Estimate the time to suspend an entire lease 967 968 Most of the work is done in __estimate_suspend_resume_time. See 969 that method's documentation for more details. 970 971 Arguments: 972 lease -- Lease that is going to be suspended 973 974 """ 975 rate = get_config().get("suspend-rate") 976 override = get_config().get("override-suspend-time") 977 if override != None: 978 return override 979 else: 980 return self.__estimate_suspend_resume_time(lease, rate)
981 982
983 - def __estimate_resume_time(self, lease):
984 """ Estimate the time to resume an entire lease 985 986 Most of the work is done in __estimate_suspend_resume_time. See 987 that method's documentation for more details. 988 989 Arguments: 990 lease -- Lease that is going to be resumed 991 992 """ 993 rate = get_config().get("resume-rate") 994 override = get_config().get("override-resume-time") 995 if override != None: 996 return override 997 else: 998 return self.__estimate_suspend_resume_time(lease, rate)
999 1000
1001 - def __estimate_suspend_resume_time(self, lease, rate):
1002 """ Estimate the time to suspend/resume an entire lease 1003 1004 Note that, unlike __compute_suspend_resume_time, this estimates 1005 the time to suspend/resume an entire lease (which may involve 1006 suspending several VMs) 1007 1008 Arguments: 1009 lease -- Lease that is going to be suspended/resumed 1010 rate -- The rate at which an individual VM is suspended/resumed 1011 1012 """ 1013 susp_exclusion = get_config().get("suspendresume-exclusion") 1014 enactment_overhead = get_config().get("enactment-overhead") 1015 mem = 0 1016 for vnode in lease.requested_resources: 1017 mem += lease.requested_resources[vnode].get_quantity(constants.RES_MEM) 1018 if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL: 1019 return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead) 1020 elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL: 1021 # Overestimating 1022 return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1023 1024
1025 - def __estimate_shutdown_time(self, lease):
1026 """ Estimate the time to shutdown an entire lease 1027 1028 Arguments: 1029 lease -- Lease that is going to be shutdown 1030 1031 """ 1032 enactment_overhead = get_config().get("enactment-overhead").seconds 1033 return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
1034 1035
1036 - def __compute_scheduling_threshold(self, lease):
1037 """ Compute the scheduling threshold (the 'minimum duration') of a lease 1038 1039 To avoid thrashing, Haizea will not schedule a lease unless all overheads 1040 can be correctly scheduled (which includes image transfers, suspensions, etc.). 1041 However, this can still result in situations where a lease is prepared, 1042 and then immediately suspended because of a blocking lease in the future. 1043 The scheduling threshold is used to specify that a lease must 1044 not be scheduled unless it is guaranteed to run for a minimum amount of 1045 time (the rationale behind this is that you ideally don't want leases 1046 to be scheduled if they're not going to be active for at least as much time 1047 as was spent in overheads). 1048 1049 An important part of computing this value is the "scheduling threshold factor". 1050 The default value is 1, meaning that the lease will be active for at least 1051 as much time T as was spent on overheads (e.g., if preparing the lease requires 1052 60 seconds, and we know that it will have to be suspended, requiring 30 seconds, 1053 Haizea won't schedule the lease unless it can run for at least 90 minutes). 1054 In other words, a scheduling factor of F required a minimum duration of 1055 F*T. A value of 0 could lead to thrashing, since Haizea could end up with 1056 situations where a lease starts and immediately gets suspended. 1057 1058 Arguments: 1059 lease -- Lease for which we want to find the scheduling threshold 1060 """ 1061 # TODO: Take into account other things like boot overhead, migration overhead, etc. 1062 config = get_config() 1063 threshold = config.get("force-scheduling-threshold") 1064 if threshold != None: 1065 # If there is a hard-coded threshold, use that 1066 return threshold 1067 else: 1068 factor = config.get("scheduling-threshold-factor") 1069 1070 # First, figure out the "safe duration" (the minimum duration 1071 # so that we at least allocate enough time for all the 1072 # overheads). 1073 susp_overhead = self.__estimate_suspend_time(lease) 1074 safe_duration = susp_overhead 1075 1076 if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED: 1077 resm_overhead = self.__estimate_resume_time(lease) 1078 safe_duration += resm_overhead 1079 1080 # TODO: Incorporate other overheads into the minimum duration 1081 min_duration = safe_duration 1082 1083 # At the very least, we want to allocate enough time for the 1084 # safe duration (otherwise, we'll end up with incorrect schedules, 1085 # where a lease is scheduled to suspend, but isn't even allocated 1086 # enough time to suspend). 1087 # The factor is assumed to be non-negative. i.e., a factor of 0 1088 # means we only allocate enough time for potential suspend/resume 1089 # operations, while a factor of 1 means the lease will get as much 1090 # running time as spend on the runtime overheads involved in setting 1091 # it up 1092 threshold = safe_duration + (min_duration * factor) 1093 return threshold
1094 1095 1096 #-------------------------------------------------------------------# 1097 # # 1098 # SLOT TABLE EVENT HANDLERS # 1099 # # 1100 #-------------------------------------------------------------------# 1101
1102 - def _handle_start_vm(self, l, rr):
1103 """ Handles the start of a VMResourceReservation 1104 1105 Arguments: 1106 l -- Lease the VMResourceReservation belongs to 1107 rr -- THe VMResourceReservation 1108 """ 1109 self.logger.debug("LEASE-%i Start of handleStartVM" % l.id) 1110 l.print_contents() 1111 lease_state = l.get_state() 1112 if lease_state == Lease.STATE_READY: 1113 l.set_state(Lease.STATE_ACTIVE) 1114 rr.state = ResourceReservation.STATE_ACTIVE 1115 now_time = get_clock().get_time() 1116 l.start.actual = now_time 1117 1118 try: 1119 self.resourcepool.start_vms(l, rr) 1120 except EnactmentError, exc: 1121 self.logger.error("Enactment error when starting VMs.") 1122 # Right now, this is a non-recoverable error, so we just 1123 # propagate it upwards to the lease scheduler 1124 # In the future, it may be possible to react to these 1125 # kind of errors. 1126 raise 1127 1128 elif lease_state == Lease.STATE_RESUMED_READY: 1129 l.set_state(Lease.STATE_ACTIVE) 1130 rr.state = ResourceReservation.STATE_ACTIVE 1131 # No enactment to do here, since all the suspend/resume actions are 1132 # handled during the suspend/resume RRs 1133 else: 1134 raise InconsistentLeaseStateError(l, doing = "starting a VM") 1135 1136 # If this was a future reservation (as determined by backfilling), 1137 # remove that status, since the future is now. 1138 if rr.lease in self.future_leases: 1139 self.future_leases.remove(l) 1140 get_persistence().persist_future_leases(self.future_leases) 1141 1142 l.print_contents() 1143 self.logger.debug("LEASE-%i End of handleStartVM" % l.id) 1144 self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1145 1146
1147 - def _handle_end_vm(self, l, rr):
1148 """ Handles the end of a VMResourceReservation 1149 1150 Arguments: 1151 l -- Lease the VMResourceReservation belongs to 1152 rr -- THe VMResourceReservation 1153 """ 1154 self.logger.debug("LEASE-%i Start of handleEndVM" % l.id) 1155 self.logger.vdebug("LEASE-%i Before:" % l.id) 1156 l.print_contents() 1157 now_time = round_datetime(get_clock().get_time()) 1158 diff = now_time - rr.start 1159 l.duration.accumulate_duration(diff) 1160 rr.state = ResourceReservation.STATE_DONE 1161 1162 self.logger.vdebug("LEASE-%i After:" % l.id) 1163 l.print_contents() 1164 self.logger.debug("LEASE-%i End of handleEndVM" % l.id) 1165 self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1166 1167
1168 - def _handle_unscheduled_end_vm(self, l, vmrr):
1169 """ Handles the unexpected end of a VMResourceReservation 1170 1171 Arguments: 1172 l -- Lease the VMResourceReservation belongs to 1173 rr -- THe VMResourceReservation 1174 """ 1175 1176 self.logger.info("LEASE-%i The VM has ended prematurely." % l.id) 1177 for rr in vmrr.post_rrs: 1178 self.slottable.remove_reservation(rr) 1179 vmrr.post_rrs = [] 1180 vmrr.end = get_clock().get_time() 1181 self._handle_end_vm(l, vmrr)
1182 1183
1184 - def _handle_start_suspend(self, l, rr):
1185 """ Handles the start of a SuspensionResourceReservation 1186 1187 Arguments: 1188 l -- Lease the SuspensionResourceReservation belongs to 1189 rr -- The SuspensionResourceReservation 1190 1191 """ 1192 self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id) 1193 l.print_contents() 1194 rr.state = ResourceReservation.STATE_ACTIVE 1195 1196 try: 1197 self.resourcepool.suspend_vms(l, rr) 1198 except EnactmentError, exc: 1199 self.logger.error("Enactment error when suspending VMs.") 1200 # Right now, this is a non-recoverable error, so we just 1201 # propagate it upwards to the lease scheduler 1202 # In the future, it may be possible to react to these 1203 # kind of errors. 1204 raise 1205 1206 if rr.is_first(): 1207 l.set_state(Lease.STATE_SUSPENDING) 1208 l.print_contents() 1209 self.logger.info("Suspending lease %i..." % (l.id)) 1210 self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
1211 1212
1213 - def _handle_end_suspend(self, l, rr):
1214 """ Handles the end of a SuspensionResourceReservation 1215 1216 Arguments: 1217 l -- Lease the SuspensionResourceReservation belongs to 1218 rr -- The SuspensionResourceReservation 1219 """ 1220 self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id) 1221 l.print_contents() 1222 # TODO: React to incomplete suspend 1223 self.resourcepool.verify_suspend(l, rr) 1224 rr.state = ResourceReservation.STATE_DONE 1225 if rr.is_last(): 1226 l.set_state(Lease.STATE_SUSPENDED_PENDING) 1227 l.print_contents() 1228 self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id) 1229 self.logger.info("Lease %i suspended." % (l.id)) 1230 1231 if l.get_state() == Lease.STATE_SUSPENDED_PENDING: 1232 raise RescheduleLeaseException
1233 1234
1235 - def _handle_start_resume(self, l, rr):
1236 """ Handles the start of a ResumptionResourceReservation 1237 1238 Arguments: 1239 l -- Lease the ResumptionResourceReservation belongs to 1240 rr -- The ResumptionResourceReservation 1241 1242 """ 1243 self.logger.debug("LEASE-%i Start of handleStartResume" % l.id) 1244 l.print_contents() 1245 1246 try: 1247 self.resourcepool.resume_vms(l, rr) 1248 except EnactmentError, exc: 1249 self.logger.error("Enactment error when resuming VMs.") 1250 # Right now, this is a non-recoverable error, so we just 1251 # propagate it upwards to the lease scheduler 1252 # In the future, it may be possible to react to these 1253 # kind of errors. 1254 raise 1255 1256 rr.state = ResourceReservation.STATE_ACTIVE 1257 if rr.is_first(): 1258 l.set_state(Lease.STATE_RESUMING) 1259 l.print_contents() 1260 self.logger.info("Resuming lease %i..." % (l.id)) 1261 self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
1262 1263
1264 - def _handle_end_resume(self, l, rr):
1265 """ Handles the end of a ResumptionResourceReservation 1266 1267 Arguments: 1268 l -- Lease the ResumptionResourceReservation belongs to 1269 rr -- The ResumptionResourceReservation 1270 1271 """ 1272 self.logger.debug("LEASE-%i Start of handleEndResume" % l.id) 1273 l.print_contents() 1274 # TODO: React to incomplete resume 1275 self.resourcepool.verify_resume(l, rr) 1276 rr.state = ResourceReservation.STATE_DONE 1277 if rr.is_last(): 1278 l.set_state(Lease.STATE_RESUMED_READY) 1279 self.logger.info("Resumed lease %i" % (l.id)) 1280 for vnode, pnode in rr.vmrr.nodes.items(): 1281 self.resourcepool.remove_ramfile(pnode, l.id, vnode) 1282 l.print_contents() 1283 self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
1284 1285
1286 - def _handle_start_shutdown(self, l, rr):
1287 """ Handles the start of a ShutdownResourceReservation 1288 1289 Arguments: 1290 l -- Lease the SuspensionResourceReservation belongs to 1291 rr -- The SuspensionResourceReservation 1292 """ 1293 1294 self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id) 1295 l.print_contents() 1296 rr.state = ResourceReservation.STATE_ACTIVE 1297 try: 1298 self.resourcepool.stop_vms(l, rr) 1299 except EnactmentError, exc: 1300 self.logger.error("Enactment error when shutting down VMs.") 1301 # Right now, this is a non-recoverable error, so we just 1302 # propagate it upwards to the lease scheduler 1303 # In the future, it may be possible to react to these 1304 # kind of errors. 1305 raise 1306 1307 l.print_contents() 1308 self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
1309 1310
1311 - def _handle_end_shutdown(self, l, rr):
1312 """ Handles the end of a SuspensionResourceReservation 1313 1314 Arguments: 1315 l -- Lease the SuspensionResourceReservation belongs to 1316 rr -- The SuspensionResourceReservation 1317 1318 """ 1319 self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id) 1320 l.print_contents() 1321 rr.state = ResourceReservation.STATE_DONE 1322 l.print_contents() 1323 self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id) 1324 self.logger.info("Lease %i's VMs have shutdown." % (l.id)) 1325 raise NormalEndLeaseException
1326 1327
1328 - def _handle_start_migrate(self, l, rr):
1329 """ Handles the start of a MemImageMigrationResourceReservation 1330 1331 Arguments: 1332 l -- Lease the MemImageMigrationResourceReservation belongs to 1333 rr -- The MemImageMigrationResourceReservation 1334 1335 """ 1336 self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id) 1337 l.print_contents() 1338 rr.state = ResourceReservation.STATE_ACTIVE 1339 l.print_contents() 1340 self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id) 1341 self.logger.info("Migrating lease %i..." % (l.id))
1342 1343
1344 - def _handle_end_migrate(self, l, rr):
1345 """ Handles the end of a MemImageMigrationResourceReservation 1346 1347 Arguments: 1348 l -- Lease the MemImageMigrationResourceReservation belongs to 1349 rr -- The MemImageMigrationResourceReservation 1350 1351 """ 1352 self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id) 1353 l.print_contents() 1354 1355 for vnode in rr.transfers: 1356 origin = rr.transfers[vnode][0] 1357 dest = rr.transfers[vnode][1] 1358 1359 # Update RAM files 1360 self.resourcepool.remove_ramfile(origin, l.id, vnode) 1361 self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM)) 1362 1363 rr.state = ResourceReservation.STATE_DONE 1364 l.print_contents() 1365 self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id) 1366 self.logger.info("Migrated lease %i..." % (l.id))
1367 1368 1369
1370 -class VMResourceReservation(ResourceReservation):
1371 - def __init__(self, lease, start, end, nodes, res):
1372 ResourceReservation.__init__(self, lease, start, end, res) 1373 self.nodes = nodes # { vnode -> pnode } 1374 self.pre_rrs = [] 1375 self.post_rrs = [] 1376 1377 # ONLY for simulation 1378 self.__update_prematureend()
1379
1380 - def update_start(self, time):
1381 self.start = time 1382 # ONLY for simulation 1383 self.__update_prematureend()
1384
1385 - def update_end(self, time):
1386 self.end = time 1387 # ONLY for simulation 1388 self.__update_prematureend()
1389 1390 # ONLY for simulation
1391 - def __update_prematureend(self):
1392 if self.lease.duration.known != None: 1393 remdur = self.lease.duration.get_remaining_known_duration() 1394 rrdur = self.end - self.start 1395 if remdur < rrdur: 1396 self.prematureend = self.start + remdur 1397 # Kludgy, but this corner case actually does happen 1398 # (because of preemptions, it may turn out that 1399 # the premature end time coincides with the 1400 # starting time of the VMRR) 1401 if self.prematureend == self.start: 1402 self.prematureend += 1 1403 else: 1404 self.prematureend = None 1405 else: 1406 self.prematureend = None
1407
1408 - def get_final_end(self):
1409 if len(self.post_rrs) == 0: 1410 return self.end 1411 else: 1412 return self.post_rrs[-1].end
1413
1414 - def is_suspending(self):
1415 return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
1416
1417 - def is_shutting_down(self):
1418 return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
1419
1420 - def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1421 logger = logging.getLogger("LEASES") 1422 for resmrr in self.pre_rrs: 1423 resmrr.print_contents(loglevel) 1424 logger.log(loglevel, "--") 1425 logger.log(loglevel, "Type : VM") 1426 logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes)) 1427 if self.prematureend != None: 1428 logger.log(loglevel, "Premature end : %s" % self.prematureend) 1429 ResourceReservation.print_contents(self, loglevel) 1430 for susprr in self.post_rrs: 1431 logger.log(loglevel, "--") 1432 susprr.print_contents(loglevel)
1433 1434
1435 -class SuspensionResourceReservation(ResourceReservation):
1436 - def __init__(self, lease, start, end, res, vnodes, vmrr):
1437 ResourceReservation.__init__(self, lease, start, end, res) 1438 self.vmrr = vmrr 1439 self.vnodes = vnodes
1440
1441 - def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1442 logger = logging.getLogger("LEASES") 1443 logger.log(loglevel, "Type : SUSPEND") 1444 logger.log(loglevel, "Vnodes : %s" % self.vnodes) 1445 ResourceReservation.print_contents(self, loglevel)
1446
1447 - def is_first(self):
1448 return (self == self.vmrr.post_rrs[0])
1449
1450 - def is_last(self):
1451 return (self == self.vmrr.post_rrs[-1])
1452 1453
1454 -class ResumptionResourceReservation(ResourceReservation):
1455 - def __init__(self, lease, start, end, res, vnodes, vmrr):
1456 ResourceReservation.__init__(self, lease, start, end, res) 1457 self.vmrr = vmrr 1458 self.vnodes = vnodes
1459
1460 - def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1461 logger = logging.getLogger("LEASES") 1462 logger.log(loglevel, "Type : RESUME") 1463 logger.log(loglevel, "Vnodes : %s" % self.vnodes) 1464 ResourceReservation.print_contents(self, loglevel)
1465
1466 - def is_first(self):
1467 resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)] 1468 return (self == resm_rrs[0])
1469
1470 - def is_last(self):
1471 resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)] 1472 return (self == resm_rrs[-1])
1473 1474
1475 -class ShutdownResourceReservation(ResourceReservation):
1476 - def __init__(self, lease, start, end, res, vnodes, vmrr):
1477 ResourceReservation.__init__(self, lease, start, end, res) 1478 self.vmrr = vmrr 1479 self.vnodes = vnodes
1480
1481 - def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1482 logger = logging.getLogger("LEASES") 1483 logger.log(loglevel, "Type : SHUTDOWN") 1484 ResourceReservation.print_contents(self, loglevel)
1485 1486
1487 -class MemImageMigrationResourceReservation(MigrationResourceReservation):
1488 - def __init__(self, lease, start, end, res, vmrr, transfers):
1489 MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
1490
1491 - def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1492 logger = logging.getLogger("LEASES") 1493 logger.log(loglevel, "Type : MEM IMAGE MIGRATION") 1494 logger.log(loglevel, "Transfers : %s" % self.transfers) 1495 ResourceReservation.print_contents(self, loglevel)
1496