Package haizea :: Package core :: Module manager
[hide private]
[frames] | no frames]

Source Code for Module haizea.core.manager

   1  # -------------------------------------------------------------------------- # 
   2  # Copyright 2006-2009, University of Chicago                                 # 
   3  # Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   # 
   4  # Complutense de Madrid (dsa-research.org)                                   # 
   5  #                                                                            # 
   6  # Licensed under the Apache License, Version 2.0 (the "License"); you may    # 
   7  # not use this file except in compliance with the License. You may obtain    # 
   8  # a copy of the License at                                                   # 
   9  #                                                                            # 
  10  # http://www.apache.org/licenses/LICENSE-2.0                                 # 
  11  #                                                                            # 
  12  # Unless required by applicable law or agreed to in writing, software        # 
  13  # distributed under the License is distributed on an "AS IS" BASIS,          # 
  14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   # 
  15  # See the License for the specific language governing permissions and        # 
  16  # limitations under the License.                                             # 
  17  # -------------------------------------------------------------------------- # 
  18   
  19  """The manager (resource manager) module is the root of Haizea. If you want to 
  20  see where the ball starts rolling, look at the following two functions: 
  21   
  22  * manager.Manager.__init__() 
  23  * manager.Manager.start() 
  24   
  25  This module provides the following classes: 
  26   
  27  * Manager: Haizea itself. Pretty much everything else 
  28    is contained in this class. 
  29  * Clock: A base class for Haizea's clock. 
  30  * SimulatedClock: A clock for simulations. 
  31  * RealClock: A clock that advances in realtime. 
  32  """ 
  33    
  34  import haizea.common.constants as constants 
  35  from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler 
  36  from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler 
  37  from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment 
  38  from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment 
  39  from haizea.core.frontends.tracefile import TracefileFrontend 
  40  from haizea.core.frontends.opennebula import OpenNebulaFrontend 
  41  from haizea.core.frontends.rpc import RPCFrontend 
  42  from haizea.core.accounting import AccountingDataCollection 
  43  from haizea.core.scheduler import UnrecoverableError 
  44  from haizea.core.scheduler.lease_scheduler import LeaseScheduler 
  45  from haizea.core.scheduler.vm_scheduler import VMScheduler 
  46  from haizea.core.scheduler.mapper import class_mappings as mapper_mappings 
  47  from haizea.core.scheduler.slottable import SlotTable, ResourceReservation 
  48  from haizea.core.scheduler.policy import PolicyManager 
  49  from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages 
  50  from haizea.core.leases import Lease, Site 
  51  from haizea.core.log import HaizeaLogger 
  52  from haizea.core.rpcserver import RPCServer 
  53  from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton 
  54  from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient 
  55  from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings  
  56  from haizea.pluggable.accounting import probe_class_mappings 
  57   
  58  import operator 
  59  import logging 
  60  import signal 
  61  import sys, os 
  62  import traceback 
  63  import shelve 
  64  import socket 
  65  from time import sleep 
  66  from math import ceil 
  67  from mx.DateTime import now, TimeDelta 
  68   
  69  DAEMON_STDOUT = DAEMON_STDIN = "/dev/null" 
  70  DAEMON_STDERR = "/var/tmp/haizea.err" 
  71  DEFAULT_LOGFILE = "/var/tmp/haizea.log" 
  72   
73 -class Manager(object):
74 """The root of Haizea 75 76 This class is the root of Haizea. Pretty much everything else (scheduler, 77 enactment modules, etc.) is contained in this class. The Manager 78 class is meant to be a singleton. 79 80 """ 81 82 __metaclass__ = Singleton 83
84 - def __init__(self, config, daemon=False, pidfile=None):
85 """Initializes the manager. 86 87 Argument: 88 config -- a populated instance of haizea.common.config.RMConfig 89 daemon -- True if Haizea must run as a daemon, False if it must 90 run in the foreground 91 pidfile -- When running as a daemon, file to save pid to 92 """ 93 self.config = config 94 95 # Create the RM components 96 97 mode = config.get("mode") 98 99 self.daemon = daemon 100 self.pidfile = pidfile 101 102 if mode == "simulated": 103 # Simulated-time simulations always run in the foreground 104 clock = self.config.get("clock") 105 if clock == constants.CLOCK_SIMULATED: 106 self.daemon = False 107 elif mode == "opennebula": 108 clock = constants.CLOCK_REAL 109 110 self.init_logging() 111 112 if clock == constants.CLOCK_SIMULATED: 113 starttime = self.config.get("starttime") 114 self.clock = SimulatedClock(self, starttime) 115 self.rpc_server = None 116 elif clock == constants.CLOCK_REAL: 117 wakeup_interval = self.config.get("wakeup-interval") 118 non_sched = self.config.get("non-schedulable-interval") 119 if mode == "opennebula": 120 fastforward = self.config.get("dry-run") 121 else: 122 fastforward = False 123 self.clock = RealClock(self, wakeup_interval, non_sched, fastforward) 124 if fastforward: 125 # No need for an RPC server when doing a dry run 126 self.rpc_server = None 127 else: 128 self.rpc_server = RPCServer(self) 129 130 # Create the RPC singleton client for OpenNebula mode 131 if mode == "opennebula": 132 host = self.config.get("one.host") 133 port = self.config.get("one.port") 134 rv = OpenNebulaXMLRPCClient.get_userpass_from_env() 135 if rv == None: 136 print "ONE_AUTH environment variable is not set" 137 exit(1) 138 else: 139 user, passw = rv[0], rv[1] 140 try: 141 OpenNebulaXMLRPCClientSingleton(host, port, user, passw) 142 except socket.error, e: 143 print "Unable to connect to OpenNebula" 144 print "Reason: %s" % e 145 exit(1) 146 147 # Enactment modules 148 if mode == "simulated": 149 resources = self.config.get("simul.resources") 150 if resources == "in-tracefile": 151 tracefile = self.config.get("tracefile") 152 site = Site.from_lwf_file(tracefile) 153 elif resources.startswith("file:"): 154 sitefile = resources.split(":") 155 site = Site.from_xml_file(sitefile) 156 else: 157 site = Site.from_resources_string(resources) 158 159 deploy_bandwidth = config.get("imagetransfer-bandwidth") 160 info_enact = SimulatedResourcePoolInfo(site) 161 vm_enact = SimulatedVMEnactment() 162 deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth) 163 elif mode == "opennebula": 164 # Enactment modules 165 info_enact = OpenNebulaResourcePoolInfo() 166 vm_enact = OpenNebulaVMEnactment() 167 # No deployment in OpenNebula. Using dummy one for now. 168 deploy_enact = OpenNebulaDummyDeploymentEnactment() 169 170 if mode == "simulated": 171 preparation_type = self.config.get("lease-preparation") 172 elif mode == "opennebula": 173 # No deployment in OpenNebula. 174 preparation_type = constants.PREPARATION_UNMANAGED 175 176 # Resource pool 177 if preparation_type == constants.PREPARATION_TRANSFER: 178 if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES: 179 resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact) 180 else: 181 resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact) 182 else: 183 resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact) 184 185 # Slot table 186 slottable = SlotTable(info_enact.get_resource_types()) 187 for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes(): 188 rt = slottable.create_resource_tuple_from_capacity(n.capacity) 189 slottable.add_node(n.id, rt) 190 191 # Policy manager 192 admission = self.config.get("policy.admission") 193 admission = admission_class_mappings.get(admission, admission) 194 admission = import_class(admission) 195 admission = admission(slottable) 196 197 preemption = self.config.get("policy.preemption") 198 preemption = preemption_class_mappings.get(preemption, preemption) 199 preemption = import_class(preemption) 200 preemption = preemption(slottable) 201 202 host_selection = self.config.get("policy.host-selection") 203 host_selection = host_class_mappings.get(host_selection, host_selection) 204 host_selection = import_class(host_selection) 205 host_selection = host_selection(slottable) 206 207 self.policy = PolicyManager(admission, preemption, host_selection) 208 209 # Preparation scheduler 210 if preparation_type == constants.PREPARATION_UNMANAGED: 211 preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact) 212 elif preparation_type == constants.PREPARATION_TRANSFER: 213 preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact) 214 215 # VM mapper and scheduler 216 mapper = self.config.get("mapper") 217 mapper = mapper_mappings.get(mapper, mapper) 218 mapper = import_class(mapper) 219 mapper = mapper(slottable, self.policy) 220 221 # When using backfilling, set the number of leases that can be 222 # scheduled in the future. 223 backfilling = self.config.get("backfilling") 224 if backfilling == constants.BACKFILLING_OFF: 225 max_in_future = 0 226 elif backfilling == constants.BACKFILLING_AGGRESSIVE: 227 max_in_future = 1 228 elif backfilling == constants.BACKFILLING_CONSERVATIVE: 229 max_in_future = -1 # Unlimited 230 elif backfilling == constants.BACKFILLING_INTERMEDIATE: 231 max_in_future = self.config.get("backfilling-reservations") 232 233 vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future) 234 235 # Statistics collection 236 attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()]) 237 238 self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs) 239 # Load probes 240 probes = self.config.get("accounting-probes") 241 probes = probes.split() 242 for probe in probes: 243 probe_class = probe_class_mappings.get(probe, probe) 244 probe_class = import_class(probe_class) 245 probe_obj = probe_class(self.accounting) 246 self.accounting.add_probe(probe_obj) 247 248 # Lease Scheduler 249 self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting) 250 251 # Lease request frontends 252 if mode == "simulated": 253 if clock == constants.CLOCK_SIMULATED: 254 # In pure simulation, we can only use the tracefile frontend 255 self.frontends = [TracefileFrontend(self.clock.get_start_time())] 256 elif clock == constants.CLOCK_REAL: 257 # In simulation with a real clock, only the RPC frontend can be used 258 self.frontends = [RPCFrontend()] 259 elif mode == "opennebula": 260 self.frontends = [OpenNebulaFrontend()] 261 262 persistence_file = self.config.get("persistence-file") 263 if persistence_file == "none": 264 persistence_file = None 265 self.persistence = PersistenceManager(persistence_file) 266 267 self.logger = logging.getLogger("RM")
268 269
270 - def init_logging(self):
271 """Initializes logging 272 273 """ 274 275 logger = logging.getLogger("") 276 if self.daemon: 277 handler = logging.FileHandler(self.config.get("logfile")) 278 else: 279 handler = logging.StreamHandler() 280 if sys.version_info[1] <= 4: 281 formatter = logging.Formatter('%(name)-7s %(message)s') 282 else: 283 formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s') 284 handler.setFormatter(formatter) 285 logger.addHandler(handler) 286 level = logging.getLevelName(self.config.get("loglevel")) 287 logger.setLevel(level) 288 logging.setLoggerClass(HaizeaLogger)
289 290
291 - def daemonize(self):
292 """Daemonizes the Haizea process. 293 294 Based on code in: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 295 296 """ 297 # First fork 298 try: 299 pid = os.fork() 300 if pid > 0: 301 # Exit first parent 302 sys.exit(0) 303 except OSError, e: 304 sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror)) 305 sys.exit(1) 306 307 # Decouple from parent environment. 308 os.chdir(".") 309 os.umask(0) 310 os.setsid() 311 312 # Second fork 313 try: 314 pid = os.fork() 315 if pid > 0: 316 # Exit second parent. 317 sys.exit(0) 318 except OSError, e: 319 sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror)) 320 sys.exit(2) 321 322 # Open file descriptors and print start message 323 si = file(DAEMON_STDIN, 'r') 324 so = file(DAEMON_STDOUT, 'a+') 325 se = file(DAEMON_STDERR, 'a+', 0) 326 pid = os.getpid() 327 sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid) 328 sys.stderr.flush() 329 file(self.pidfile,'w+').write("%i\n" % pid) 330 331 # Redirect standard file descriptors. 332 os.close(sys.stdin.fileno()) 333 os.close(sys.stdout.fileno()) 334 os.close(sys.stderr.fileno()) 335 os.dup2(si.fileno(), sys.stdin.fileno()) 336 os.dup2(so.fileno(), sys.stdout.fileno()) 337 os.dup2(se.fileno(), sys.stderr.fileno())
338
339 - def start(self):
340 """Starts the resource manager""" 341 self.logger.info("Starting resource manager") 342 343 for frontend in self.frontends: 344 frontend.load(self) 345 346 if self.daemon: 347 self.daemonize() 348 if self.rpc_server: 349 self.rpc_server.start() 350 351 self.__recover() 352 353 # Start the clock 354 try: 355 self.clock.run() 356 except UnrecoverableError, exc: 357 self.__unrecoverable_error(exc) 358 except Exception, exc: 359 self.__unexpected_exception(exc)
360
361 - def stop(self):
362 """Stops the resource manager by stopping the clock""" 363 self.clock.stop()
364
365 - def graceful_stop(self):
366 """Stops the resource manager gracefully and exits""" 367 368 self.logger.status("Stopping resource manager gracefully...") 369 370 # Stop collecting data (this finalizes counters) 371 self.accounting.stop() 372 373 self.persistence.close() 374 375 # TODO: When gracefully stopping mid-scheduling, we need to figure out what to 376 # do with leases that are still running. 377 378 self.print_status() 379 380 # In debug mode, dump the lease descriptors. 381 for lease in self.scheduler.completed_leases.entries.values(): 382 lease.print_contents() 383 384 # Write all collected data to disk 385 leases = self.scheduler.completed_leases.entries 386 self.accounting.save_to_disk(leases) 387 388 # Stop RPC server 389 if self.rpc_server != None: 390 self.rpc_server.stop()
391
392 - def process_requests(self, nexttime):
393 """Process any new requests in the request frontend 394 395 Checks the request frontend to see if there are any new requests that 396 have to be processed. AR leases are sent directly to the schedule. 397 Best-effort leases are queued. 398 399 Arguments: 400 nexttime -- The next time at which the scheduler can allocate resources. 401 This is meant to be provided by the clock simply as a sanity 402 measure when running in real time (to avoid scheduling something 403 "now" to actually have "now" be in the past once the scheduling 404 function returns. i.e., nexttime has nothing to do with whether 405 there are resources available at that time or not. 406 407 """ 408 409 # Get requests from frontend 410 requests = [] 411 for frontend in self.frontends: 412 requests += frontend.get_accumulated_requests() 413 requests.sort(key=operator.attrgetter("submit_time")) 414 415 # Request leases and run the scheduling function. 416 try: 417 self.logger.vdebug("Requesting leases") 418 for req in requests: 419 self.scheduler.request_lease(req) 420 421 self.logger.vdebug("Running scheduling function") 422 self.scheduler.schedule(nexttime) 423 except UnrecoverableError, exc: 424 self.__unrecoverable_error(exc) 425 except Exception, exc: 426 self.__unexpected_exception(exc)
427
428 - def process_starting_reservations(self, time):
429 """Process reservations starting/stopping at specified time""" 430 431 # The lease scheduler takes care of this. 432 try: 433 self.scheduler.process_starting_reservations(time) 434 except UnrecoverableError, exc: 435 self.__unrecoverable_error(exc) 436 except Exception, exc: 437 self.__unexpected_exception(exc)
438
439 - def process_ending_reservations(self, time):
440 """Process reservations starting/stopping at specified time""" 441 442 # The lease scheduler takes care of this. 443 try: 444 self.scheduler.process_ending_reservations(time) 445 except UnrecoverableError, exc: 446 self.__unrecoverable_error(exc) 447 except Exception, exc: 448 self.__unexpected_exception(exc)
449
450 - def notify_event(self, lease_id, event):
451 """Notifies an asynchronous event to Haizea. 452 453 Arguments: 454 lease_id -- ID of lease that is affected by event 455 event -- Event (currently, only the constants.EVENT_END_VM event is supported) 456 """ 457 try: 458 lease = self.scheduler.get_lease_by_id(lease_id) 459 self.scheduler.notify_event(lease, event) 460 except UnrecoverableError, exc: 461 self.__unrecoverable_error(exc) 462 except Exception, exc: 463 self.__unexpected_exception(exc)
464
465 - def cancel_lease(self, lease_id):
466 """Cancels a lease. 467 468 Arguments: 469 lease_id -- ID of lease to cancel 470 """ 471 try: 472 lease = self.scheduler.get_lease_by_id(lease_id) 473 self.scheduler.cancel_lease(lease) 474 except UnrecoverableError, exc: 475 self.__unrecoverable_error(exc) 476 except Exception, exc: 477 self.__unexpected_exception(exc)
478
479 - def get_next_changepoint(self):
480 """Return next changepoint in the slot table""" 481 return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
482
483 - def exists_more_leases(self):
484 """Return True if there are any leases still "in the system" """ 485 return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
486
487 - def print_status(self):
488 """Prints status summary.""" 489 490 leases = self.scheduler.leases.get_leases() 491 completed_leases = self.scheduler.completed_leases.get_leases() 492 self.logger.status("--- Haizea status summary ---") 493 self.logger.status("Number of leases (not including completed): %i" % len(leases)) 494 self.logger.status("Completed leases: %i" % len(completed_leases)) 495 self.logger.status("---- End summary ----")
496
497 - def __recover(self):
498 """Loads persisted leases and scheduling information 499 500 This method does three things: 501 1. Recover persisted leases. Note that not all persisted leases 502 may be recoverable. For example, if a lease was scheduled 503 to start at a certain time, but that time passed while 504 Haizea was not running, the lease will simply be transitioned 505 to a failed state. 506 2. Recover the queue. 507 3. Recover the list of "future leases" as determined by 508 the backfilling algorithm. 509 """ 510 511 # Load leases 512 leases = self.persistence.get_leases() 513 for lease in leases: 514 # Create a list of RRs 515 rrs = lease.preparation_rrs + lease.vm_rrs 516 for vmrr in lease.vm_rrs: 517 rrs += vmrr.pre_rrs + vmrr.post_rrs 518 519 # Bind resource tuples in RRs to slot table 520 for rr in rrs: 521 for restuple in rr.resources_in_pnode.values(): 522 restuple.slottable = self.scheduler.slottable 523 524 self.logger.debug("Attempting to recover lease %i" % lease.id) 525 lease.print_contents() 526 527 # Check the lease's state and determine how to proceed. 528 load_rrs = False 529 lease_state = lease.get_state() 530 if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL): 531 self.logger.info("Recovered lease %i (already done)" % lease.id) 532 self.scheduler.completed_leases.add(lease) 533 elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING): 534 self.scheduler.leases.add(lease) 535 elif lease_state == Lease.STATE_QUEUED: 536 load_rrs = True 537 self.scheduler.leases.add(lease) 538 self.logger.info("Recovered lease %i (queued)" % lease.id) 539 elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY): 540 # Check if schedule is still valid. 541 vmrr = lease.get_last_vmrr() 542 if len(vmrr.pre_rrs) > 0: 543 start = vmrr.pre_rrs[0].start 544 else: 545 start = vmrr.start 546 if self.clock.get_time() < start: 547 load_rrs = True 548 self.scheduler.leases.add(lease) 549 self.logger.info("Recovered lease %i" % lease.id) 550 else: 551 lease.set_state(Lease.STATE_FAIL) 552 self.scheduler.completed_leases.add(lease) 553 self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id) 554 elif lease_state == Lease.STATE_ACTIVE: 555 vmrr = lease.get_last_vmrr() 556 if self.clock.get_time() < self.clock.get_time(): 557 # TODO: Check if VMs are actually running 558 load_rrs = True 559 self.scheduler.leases.add(lease) 560 self.logger.info("Recovered lease %i" % lease.id) 561 else: 562 # TODO: May have to stop extant virtual machines 563 lease.set_state(Lease.STATE_FAIL) 564 self.scheduler.completed_leases.add(lease) 565 self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id) 566 else: 567 # No support for recovering lease in the 568 # remaining states 569 lease.set_state(Lease.STATE_FAIL) 570 self.scheduler.completed_leases.add(lease) 571 self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state)) 572 573 # Load the lease's RRs into the slot table 574 if load_rrs: 575 for rr in rrs: 576 if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED): 577 self.scheduler.slottable.add_reservation(rr) 578 579 # Rebuild the queue 580 queue = self.persistence.get_queue() 581 for lease_id in queue: 582 if self.scheduler.leases.has_lease(lease_id): 583 lease = self.scheduler.leases.get_lease(lease_id) 584 self.scheduler.queue.enqueue(lease) 585 586 # Rebuild the "future leases" 587 future = self.persistence.get_future_leases() 588 for lease_id in future: 589 if self.scheduler.leases.has_lease(lease_id): 590 lease = self.scheduler.leases.get_lease(lease_id) 591 self.scheduler.vm_scheduler.future_leases.add(lease)
592 593
594 - def __unrecoverable_error(self, exc):
595 """Handles an unrecoverable error. 596 597 This method prints information on the unrecoverable error and makes Haizea panic. 598 """ 599 self.logger.error("An unrecoverable error has happened.") 600 self.logger.error("Original exception:") 601 self.__print_exception(exc.exc, exc.get_traceback()) 602 self.logger.error("Unrecoverable error traceback:") 603 self.__print_exception(exc, sys.exc_info()[2]) 604 self.__panic()
605
606 - def __unexpected_exception(self, exc):
607 """Handles an unrecoverable error. 608 609 This method prints information on the unrecoverable error and makes Haizea panic. 610 """ 611 self.logger.error("An unexpected exception has happened.") 612 self.__print_exception(exc, sys.exc_info()[2]) 613 self.__panic()
614
615 - def __print_exception(self, exc, exc_traceback):
616 """Prints an exception's traceback to the log.""" 617 tb = traceback.format_tb(exc_traceback) 618 for line in tb: 619 self.logger.error(line) 620 self.logger.error("Message: %s" % exc)
621 622
623 - def __panic(self):
624 """Makes Haizea crash and burn in a panicked frenzy""" 625 626 self.logger.status("Panicking...") 627 628 # Stop RPC server 629 if self.rpc_server != None: 630 self.rpc_server.stop() 631 632 # Dump state 633 self.print_status() 634 self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint()) 635 636 # Print lease descriptors 637 leases = self.scheduler.leases.get_leases() 638 if len(leases)>0: 639 self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv") 640 for lease in leases: 641 lease.print_contents() 642 self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^") 643 644 # Exit 645 treatment = self.config.get("lease-failure-handling") 646 if treatment == constants.ONFAILURE_EXIT_RAISE: 647 raise 648 else: 649 exit(1)
650 651
652 -class Clock(object):
653 """Base class for the resource manager's clock. 654 655 The clock is in charge of periodically waking the resource manager so it 656 will process new requests and handle existing reservations. This is a 657 base class defining abstract methods. 658 659 """
660 - def __init__(self, manager):
661 self.manager = manager 662 self.done = False
663
664 - def get_time(self):
665 """Return the current time""" 666 return abstract()
667
668 - def get_start_time(self):
669 """Return the time at which the clock started ticking""" 670 return abstract() 671
672 - def get_next_schedulable_time(self):
673 """Return the next time at which resources could be scheduled. 674 675 The "next schedulable time" server sanity measure when running 676 in real time (to avoid scheduling something "now" to actually 677 have "now" be in the past once the scheduling function returns. 678 i.e., the "next schedulable time" has nothing to do with whether 679 there are resources available at that time or not. 680 """ 681 return abstract() 682
683 - def run(self):
684 """Start and run the clock. This function is, in effect, 685 the main loop of the resource manager.""" 686 return abstract()
687
688 - def stop(self):
689 """Stop the clock. 690 691 Stopping the clock makes Haizea exit. 692 """ 693 self.done = True
694 695
696 -class SimulatedClock(Clock):
697 """Simulates the passage of time... really fast. 698 699 The simulated clock steps through time to produce an ideal schedule. 700 See the run() function for a description of how time is incremented 701 exactly in the simulated clock. 702 703 """ 704
705 - def __init__(self, manager, starttime):
706 """Initialize the simulated clock, starting at the provided starttime""" 707 Clock.__init__(self, manager) 708 self.starttime = starttime 709 self.time = starttime 710 self.logger = logging.getLogger("CLOCK") 711 self.statusinterval = self.manager.config.get("status-message-interval")
712
713 - def get_time(self):
714 """See docstring in base Clock class.""" 715 return self.time
716
717 - def get_start_time(self):
718 """See docstring in base Clock class.""" 719 return self.starttime
720
721 - def get_next_schedulable_time(self):
722 """See docstring in base Clock class.""" 723 return self.time
724
725 - def run(self):
726 """Runs the simulated clock through time. 727 728 The clock starts at the provided start time. At each point in time, 729 it wakes up the resource manager and then skips to the next time 730 where "something" is happening (see __get_next_time for a more 731 rigorous description of this). 732 733 The clock stops when there is nothing left to do (no pending or 734 queue requests, and no future reservations) 735 736 The simulated clock can only work in conjunction with the 737 tracefile request frontend. 738 """ 739 self.logger.status("Starting simulated clock") 740 self.manager.accounting.start(self.get_start_time()) 741 prevstatustime = self.time 742 743 # Main loop 744 while not self.done: 745 # Check if there are any changes in the resource pool 746 new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes() 747 for n in new_nodes: 748 rt = slottable.create_resource_tuple_from_capacity(n.capacity) 749 slottable.add_node(n.id, rt) 750 751 # Check to see if there are any leases which are ending prematurely. 752 # Note that this is unique to simulation. 753 prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time) 754 755 # Notify the resource manager about the premature ends 756 for rr in prematureends: 757 self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM) 758 759 # Process reservations starting/stopping at the current time and 760 # check if there are any new requests. 761 self.manager.process_ending_reservations(self.time) 762 self.manager.process_starting_reservations(self.time) 763 self.manager.process_requests(self.time) 764 765 # Since processing requests may have resulted in new reservations 766 # starting now, we process reservations again. 767 self.manager.process_starting_reservations(self.time) 768 # And one final call to deal with nil-duration reservations 769 self.manager.process_ending_reservations(self.time) 770 771 self.manager.accounting.at_timestep(self.manager.scheduler) 772 773 # Print a status message 774 if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval: 775 self.manager.print_status() 776 prevstatustime = self.time 777 778 # Skip to next point in time. 779 self.time, self.done = self.__get_next_time() 780 781 self.logger.status("Simulated clock has stopped") 782 783 # Stop the resource manager 784 self.manager.graceful_stop()
785 786
787 - def __get_next_time(self):
788 """Determines what is the next point in time to skip to. 789 790 At a given point in time, the next time is the earliest of the following: 791 * The arrival of the next lease request 792 * The start or end of a reservation (a "changepoint" in the slot table) 793 * A premature end of a lease 794 """ 795 796 # Determine candidate next times 797 tracefrontend = self.__get_trace_frontend() 798 nextchangepoint = self.manager.get_next_changepoint() 799 nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time) 800 nextreqtime = tracefrontend.get_next_request_time() 801 self.logger.debug("Next change point (in slot table): %s" % nextchangepoint) 802 self.logger.debug("Next request time: %s" % nextreqtime) 803 self.logger.debug("Next premature end: %s" % nextprematureend) 804 805 # The previous time is now 806 prevtime = self.time 807 808 # We initialize the next time to now too, to detect if 809 # we've been unable to determine what the next time is. 810 newtime = self.time 811 812 # Find the earliest of the three, accounting for None values 813 if nextchangepoint != None and nextreqtime == None: 814 newtime = nextchangepoint 815 elif nextchangepoint == None and nextreqtime != None: 816 newtime = nextreqtime 817 elif nextchangepoint != None and nextreqtime != None: 818 newtime = min(nextchangepoint, nextreqtime) 819 820 if nextprematureend != None: 821 newtime = min(nextprematureend, newtime) 822 823 # If there's no more leases in the system, and no more pending requests, 824 # then we're done. 825 if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests(): 826 self.done = True 827 828 # We can also be done if we've specified that we want to stop when 829 # the best-effort requests are all done or when they've all been submitted. 830 stopwhen = self.manager.config.get("stop-when") 831 besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT) 832 pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT] 833 if stopwhen == constants.STOPWHEN_BEDONE: 834 if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0: 835 self.done = True 836 elif stopwhen == constants.STOPWHEN_BESUBMITTED: 837 if len(pendingbesteffort) == 0: 838 self.done = True 839 840 # If we didn't arrive at a new time, and we're not done, we've fallen into 841 # an infinite loop. This is A Bad Thing(tm). 842 if newtime == prevtime and self.done != True: 843 raise Exception, "Simulated clock has fallen into an infinite loop." 844 845 return newtime, self.done
846
847 - def __get_trace_frontend(self):
848 """Gets the tracefile frontend from the resource manager""" 849 frontends = self.manager.frontends 850 tracef = [f for f in frontends if isinstance(f, TracefileFrontend)] 851 if len(tracef) != 1: 852 raise Exception, "The simulated clock can only work with a tracefile request frontend." 853 else: 854 return tracef[0]
855 856
857 -class RealClock(Clock):
858 """A realtime clock. 859 860 The real clock wakes up periodically to, in turn, tell the resource manager 861 to wake up. The real clock can also be run in a "fastforward" mode for 862 debugging purposes (however, unlike the simulated clock, the clock will 863 always skip a fixed amount of time into the future). 864 """
865 - def __init__(self, manager, quantum, non_sched, fastforward = False):
866 """Initializes the real clock. 867 868 Arguments: 869 manager -- the resource manager 870 quantum -- interval between clock wakeups 871 fastforward -- if True, the clock won't actually sleep 872 for the duration of the quantum.""" 873 Clock.__init__(self, manager) 874 self.fastforward = fastforward 875 if not self.fastforward: 876 self.lastwakeup = None 877 else: 878 self.lastwakeup = round_datetime(now()) 879 self.logger = logging.getLogger("CLOCK") 880 self.starttime = self.get_time() 881 self.nextschedulable = None 882 self.nextperiodicwakeup = None 883 self.quantum = TimeDelta(seconds=quantum) 884 self.non_sched = TimeDelta(seconds=non_sched)
885
886 - def get_time(self):
887 """See docstring in base Clock class.""" 888 if not self.fastforward: 889 return now() 890 else: 891 return self.lastwakeup
892
893 - def get_start_time(self):
894 """See docstring in base Clock class.""" 895 return self.starttime
896
897 - def get_next_schedulable_time(self):
898 """See docstring in base Clock class.""" 899 return self.nextschedulable
900
901 - def run(self):
902 """Runs the real clock through time. 903 904 The clock starts when run() is called. In each iteration of the main loop 905 it will do the following: 906 - Wake up the resource manager 907 - Determine if there will be anything to do before the next 908 time the clock will wake up (after the quantum has passed). Note 909 that this information is readily available on the slot table. 910 If so, set next-wakeup-time to (now + time until slot table 911 event). Otherwise, set it to (now + quantum) 912 - Sleep until next-wake-up-time 913 914 The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the 915 foreground) or a SIGTERM signal is received. 916 """ 917 self.logger.status("Starting clock") 918 self.manager.accounting.start(self.get_start_time()) 919 920 try: 921 signal.signal(signal.SIGINT, self.signalhandler_gracefulstop) 922 signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop) 923 except ValueError, exc: 924 # This means Haizea is not the main thread, which will happen 925 # when running it as part of a py.test. We simply ignore this 926 # to allow the test to continue. 927 pass 928 929 # Main loop 930 while not self.done: 931 self.logger.status("Waking up to manage resources") 932 933 # Save the waking time. We want to use a consistent time in the 934 # resource manager operations (if we use now(), we'll get a different 935 # time every time) 936 if not self.fastforward: 937 self.lastwakeup = round_datetime(self.get_time()) 938 self.logger.status("Wake-up time recorded as %s" % self.lastwakeup) 939 940 # Next schedulable time 941 self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched) 942 943 # Check if there are any changes in the resource pool 944 new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes() 945 for n in new_nodes: 946 rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity) 947 self.manager.scheduler.slottable.add_node(n.id, rt) 948 949 # Wake up the resource manager 950 self.manager.process_ending_reservations(self.lastwakeup) 951 self.manager.process_starting_reservations(self.lastwakeup) 952 # TODO: Compute nextschedulable here, before processing requests 953 self.manager.process_requests(self.nextschedulable) 954 955 self.manager.accounting.at_timestep(self.manager.scheduler) 956 957 # Next wakeup time 958 time_now = now() 959 if self.lastwakeup + self.quantum <= time_now: 960 quantums = (time_now - self.lastwakeup) / self.quantum 961 quantums = int(ceil(quantums)) * self.quantum 962 self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums) 963 else: 964 self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum) 965 966 # Determine if there's anything to do before the next wakeup time 967 nextchangepoint = self.manager.get_next_changepoint() 968 if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup: 969 # We need to wake up earlier to handle a slot table event 970 nextwakeup = nextchangepoint 971 self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup) 972 else: 973 # Nothing to do before waking up 974 nextwakeup = self.nextperiodicwakeup 975 self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup) 976 977 # The only exit condition from the real clock is if the stop_when_no_more_leases 978 # is set to True, and there's no more work left to do. 979 # TODO: This first if is a kludge. Other options should only interact with 980 # options through the configfile's get method. The "stop-when-no-more-leases" 981 # option is currently OpenNebula-specific (while the real clock isn't; it can 982 # be used by both the simulator and the OpenNebula mode). This has to be 983 # fixed. 984 if self.manager.config._options.has_key("stop-when-no-more-leases"): 985 stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases") 986 if stop_when_no_more_leases and not self.manager.exists_more_leases(): 987 self.done = True 988 989 # Sleep 990 if not self.done: 991 if not self.fastforward: 992 sleep((nextwakeup - now()).seconds) 993 else: 994 self.lastwakeup = nextwakeup 995 996 self.logger.status("Real clock has stopped") 997 998 # Stop the resource manager 999 self.manager.graceful_stop()
1000
1001 - def signalhandler_gracefulstop(self, signum, frame):
1002 """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully.""" 1003 1004 sigstr = "" 1005 if signum == signal.SIGTERM: 1006 sigstr = " (SIGTERM)" 1007 elif signum == signal.SIGINT: 1008 sigstr = " (SIGINT)" 1009 self.logger.status("Received signal %i%s" %(signum, sigstr)) 1010 self.done = True
1011 1012
1013 -class PersistenceManager(object):
1014 """Persistence manager. 1015 1016 The persistence manager is in charge of persisting leases, and some 1017 scheduling data, to disk. This allows Haizea to recover from crashes. 1018 """ 1019
1020 - def __init__(self, file):
1021 """Constructor 1022 1023 Initializes the persistence manager. If the specified file 1024 does not exist, it is created. If the file is created, it 1025 is opened but the information is not recovered (this is 1026 the responsibility of the Manager class) 1027 1028 Arguments: 1029 file -- Persistence file. If None is specified, then 1030 persistence is disabled and Haizea will run entirely 1031 in-memory. 1032 """ 1033 if file == None: 1034 self.disabled = True 1035 self.shelf = {} 1036 else: 1037 self.disabled = False 1038 file = os.path.expanduser(file) 1039 d = os.path.dirname(file) 1040 if not os.path.exists(d): 1041 os.makedirs(d) 1042 self.shelf = shelve.open(file, flag='c', protocol = -1)
1043
1044 - def persist_lease(self, lease):
1045 """Persists a single lease to disk 1046 1047 Arguments: 1048 lease -- Lease to persist 1049 """ 1050 if not self.disabled: 1051 self.shelf["lease-%i" % lease.id] = lease 1052 self.shelf.sync()
1053
1054 - def persist_queue(self, queue):
1055 """Persists the queue to disk 1056 1057 Arguments: 1058 queue -- The queue 1059 """ 1060 if not self.disabled: 1061 self.shelf["queue"] = [l.id for l in queue] 1062 self.shelf.sync()
1063
1064 - def persist_future_leases(self, leases):
1065 """Persists the set of future leases 1066 1067 Arguments: 1068 leases -- "Future leases" (as determined by backfilling algorithm) 1069 """ 1070 if not self.disabled: 1071 self.shelf["future"] = [l.id for l in leases] 1072 self.shelf.sync()
1073
1074 - def get_leases(self):
1075 """Returns the leases persisted to disk. 1076 1077 """ 1078 return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1079
1080 - def get_queue(self):
1081 """Returns the queue persisted to disk. 1082 1083 """ 1084 if self.shelf.has_key("queue"): 1085 return self.shelf["queue"] 1086 else: 1087 return []
1088
1089 - def get_future_leases(self):
1090 """Returns the future leases persisted to disk. 1091 1092 """ 1093 if self.shelf.has_key("future"): 1094 return self.shelf["future"] 1095 else: 1096 return []
1097
1098 - def close(self):
1099 """Closes the persistence manager. 1100 1101 Closing the persistence manager saves any remaining 1102 data to disk. 1103 """ 1104 if not self.disabled: 1105 self.shelf.close()
1106