1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """The manager (resource manager) module is the root of Haizea. If you want to
20 see where the ball starts rolling, look at the following two functions:
21
22 * manager.Manager.__init__()
23 * manager.Manager.start()
24
25 This module provides the following classes:
26
27 * Manager: Haizea itself. Pretty much everything else
28 is contained in this class.
29 * Clock: A base class for Haizea's clock.
30 * SimulatedClock: A clock for simulations.
31 * RealClock: A clock that advances in realtime.
32 """
33
34 import haizea.common.constants as constants
35 from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
36 from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
37 from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
38 from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
39 from haizea.core.frontends.tracefile import TracefileFrontend
40 from haizea.core.frontends.opennebula import OpenNebulaFrontend
41 from haizea.core.frontends.rpc import RPCFrontend
42 from haizea.core.accounting import AccountingDataCollection
43 from haizea.core.scheduler import UnrecoverableError
44 from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45 from haizea.core.scheduler.vm_scheduler import VMScheduler
46 from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47 from haizea.core.scheduler.slottable import SlotTable, ResourceReservation
48 from haizea.core.scheduler.policy import PolicyManager
49 from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50 from haizea.core.leases import Lease, Site
51 from haizea.core.log import HaizeaLogger
52 from haizea.core.rpcserver import RPCServer
53 from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton
54 from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient
55 from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings
56 from haizea.pluggable.accounting import probe_class_mappings
57
58 import operator
59 import logging
60 import signal
61 import sys, os
62 import traceback
63 import shelve
64 import socket
65 from time import sleep
66 from math import ceil
67 from mx.DateTime import now, TimeDelta
68
69 DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
70 DAEMON_STDERR = "/var/tmp/haizea.err"
71 DEFAULT_LOGFILE = "/var/tmp/haizea.log"
72
74 """The root of Haizea
75
76 This class is the root of Haizea. Pretty much everything else (scheduler,
77 enactment modules, etc.) is contained in this class. The Manager
78 class is meant to be a singleton.
79
80 """
81
82 __metaclass__ = Singleton
83
84 - def __init__(self, config, daemon=False, pidfile=None):
85 """Initializes the manager.
86
87 Argument:
88 config -- a populated instance of haizea.common.config.RMConfig
89 daemon -- True if Haizea must run as a daemon, False if it must
90 run in the foreground
91 pidfile -- When running as a daemon, file to save pid to
92 """
93 self.config = config
94
95
96
97 mode = config.get("mode")
98
99 self.daemon = daemon
100 self.pidfile = pidfile
101
102 if mode == "simulated":
103
104 clock = self.config.get("clock")
105 if clock == constants.CLOCK_SIMULATED:
106 self.daemon = False
107 elif mode == "opennebula":
108 clock = constants.CLOCK_REAL
109
110 self.init_logging()
111
112 if clock == constants.CLOCK_SIMULATED:
113 starttime = self.config.get("starttime")
114 self.clock = SimulatedClock(self, starttime)
115 self.rpc_server = None
116 elif clock == constants.CLOCK_REAL:
117 wakeup_interval = self.config.get("wakeup-interval")
118 non_sched = self.config.get("non-schedulable-interval")
119 if mode == "opennebula":
120 fastforward = self.config.get("dry-run")
121 else:
122 fastforward = False
123 self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
124 if fastforward:
125
126 self.rpc_server = None
127 else:
128 self.rpc_server = RPCServer(self)
129
130
131 if mode == "opennebula":
132 host = self.config.get("one.host")
133 port = self.config.get("one.port")
134 rv = OpenNebulaXMLRPCClient.get_userpass_from_env()
135 if rv == None:
136 print "ONE_AUTH environment variable is not set"
137 exit(1)
138 else:
139 user, passw = rv[0], rv[1]
140 try:
141 OpenNebulaXMLRPCClientSingleton(host, port, user, passw)
142 except socket.error, e:
143 print "Unable to connect to OpenNebula"
144 print "Reason: %s" % e
145 exit(1)
146
147
148 if mode == "simulated":
149 resources = self.config.get("simul.resources")
150 if resources == "in-tracefile":
151 tracefile = self.config.get("tracefile")
152 site = Site.from_lwf_file(tracefile)
153 elif resources.startswith("file:"):
154 sitefile = resources.split(":")
155 site = Site.from_xml_file(sitefile)
156 else:
157 site = Site.from_resources_string(resources)
158
159 deploy_bandwidth = config.get("imagetransfer-bandwidth")
160 info_enact = SimulatedResourcePoolInfo(site)
161 vm_enact = SimulatedVMEnactment()
162 deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth)
163 elif mode == "opennebula":
164
165 info_enact = OpenNebulaResourcePoolInfo()
166 vm_enact = OpenNebulaVMEnactment()
167
168 deploy_enact = OpenNebulaDummyDeploymentEnactment()
169
170 if mode == "simulated":
171 preparation_type = self.config.get("lease-preparation")
172 elif mode == "opennebula":
173
174 preparation_type = constants.PREPARATION_UNMANAGED
175
176
177 if preparation_type == constants.PREPARATION_TRANSFER:
178 if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
179 resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
180 else:
181 resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
182 else:
183 resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
184
185
186 slottable = SlotTable(info_enact.get_resource_types())
187 for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
188 rt = slottable.create_resource_tuple_from_capacity(n.capacity)
189 slottable.add_node(n.id, rt)
190
191
192 admission = self.config.get("policy.admission")
193 admission = admission_class_mappings.get(admission, admission)
194 admission = import_class(admission)
195 admission = admission(slottable)
196
197 preemption = self.config.get("policy.preemption")
198 preemption = preemption_class_mappings.get(preemption, preemption)
199 preemption = import_class(preemption)
200 preemption = preemption(slottable)
201
202 host_selection = self.config.get("policy.host-selection")
203 host_selection = host_class_mappings.get(host_selection, host_selection)
204 host_selection = import_class(host_selection)
205 host_selection = host_selection(slottable)
206
207 self.policy = PolicyManager(admission, preemption, host_selection)
208
209
210 if preparation_type == constants.PREPARATION_UNMANAGED:
211 preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
212 elif preparation_type == constants.PREPARATION_TRANSFER:
213 preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)
214
215
216 mapper = self.config.get("mapper")
217 mapper = mapper_mappings.get(mapper, mapper)
218 mapper = import_class(mapper)
219 mapper = mapper(slottable, self.policy)
220
221
222
223 backfilling = self.config.get("backfilling")
224 if backfilling == constants.BACKFILLING_OFF:
225 max_in_future = 0
226 elif backfilling == constants.BACKFILLING_AGGRESSIVE:
227 max_in_future = 1
228 elif backfilling == constants.BACKFILLING_CONSERVATIVE:
229 max_in_future = -1
230 elif backfilling == constants.BACKFILLING_INTERMEDIATE:
231 max_in_future = self.config.get("backfilling-reservations")
232
233 vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future)
234
235
236 attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()])
237
238 self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs)
239
240 probes = self.config.get("accounting-probes")
241 probes = probes.split()
242 for probe in probes:
243 probe_class = probe_class_mappings.get(probe, probe)
244 probe_class = import_class(probe_class)
245 probe_obj = probe_class(self.accounting)
246 self.accounting.add_probe(probe_obj)
247
248
249 self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting)
250
251
252 if mode == "simulated":
253 if clock == constants.CLOCK_SIMULATED:
254
255 self.frontends = [TracefileFrontend(self.clock.get_start_time())]
256 elif clock == constants.CLOCK_REAL:
257
258 self.frontends = [RPCFrontend()]
259 elif mode == "opennebula":
260 self.frontends = [OpenNebulaFrontend()]
261
262 persistence_file = self.config.get("persistence-file")
263 if persistence_file == "none":
264 persistence_file = None
265 self.persistence = PersistenceManager(persistence_file)
266
267 self.logger = logging.getLogger("RM")
268
269
271 """Initializes logging
272
273 """
274
275 logger = logging.getLogger("")
276 if self.daemon:
277 handler = logging.FileHandler(self.config.get("logfile"))
278 else:
279 handler = logging.StreamHandler()
280 if sys.version_info[1] <= 4:
281 formatter = logging.Formatter('%(name)-7s %(message)s')
282 else:
283 formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
284 handler.setFormatter(formatter)
285 logger.addHandler(handler)
286 level = logging.getLevelName(self.config.get("loglevel"))
287 logger.setLevel(level)
288 logging.setLoggerClass(HaizeaLogger)
289
290
292 """Daemonizes the Haizea process.
293
294 Based on code in: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
295
296 """
297
298 try:
299 pid = os.fork()
300 if pid > 0:
301
302 sys.exit(0)
303 except OSError, e:
304 sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
305 sys.exit(1)
306
307
308 os.chdir(".")
309 os.umask(0)
310 os.setsid()
311
312
313 try:
314 pid = os.fork()
315 if pid > 0:
316
317 sys.exit(0)
318 except OSError, e:
319 sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
320 sys.exit(2)
321
322
323 si = file(DAEMON_STDIN, 'r')
324 so = file(DAEMON_STDOUT, 'a+')
325 se = file(DAEMON_STDERR, 'a+', 0)
326 pid = os.getpid()
327 sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
328 sys.stderr.flush()
329 file(self.pidfile,'w+').write("%i\n" % pid)
330
331
332 os.close(sys.stdin.fileno())
333 os.close(sys.stdout.fileno())
334 os.close(sys.stderr.fileno())
335 os.dup2(si.fileno(), sys.stdin.fileno())
336 os.dup2(so.fileno(), sys.stdout.fileno())
337 os.dup2(se.fileno(), sys.stderr.fileno())
338
360
362 """Stops the resource manager by stopping the clock"""
363 self.clock.stop()
364
391
393 """Process any new requests in the request frontend
394
395 Checks the request frontend to see if there are any new requests that
396 have to be processed. AR leases are sent directly to the schedule.
397 Best-effort leases are queued.
398
399 Arguments:
400 nexttime -- The next time at which the scheduler can allocate resources.
401 This is meant to be provided by the clock simply as a sanity
402 measure when running in real time (to avoid scheduling something
403 "now" to actually have "now" be in the past once the scheduling
404 function returns. i.e., nexttime has nothing to do with whether
405 there are resources available at that time or not.
406
407 """
408
409
410 requests = []
411 for frontend in self.frontends:
412 requests += frontend.get_accumulated_requests()
413 requests.sort(key=operator.attrgetter("submit_time"))
414
415
416 try:
417 self.logger.vdebug("Requesting leases")
418 for req in requests:
419 self.scheduler.request_lease(req)
420
421 self.logger.vdebug("Running scheduling function")
422 self.scheduler.schedule(nexttime)
423 except UnrecoverableError, exc:
424 self.__unrecoverable_error(exc)
425 except Exception, exc:
426 self.__unexpected_exception(exc)
427
438
449
464
478
482
486
488 """Prints status summary."""
489
490 leases = self.scheduler.leases.get_leases()
491 completed_leases = self.scheduler.completed_leases.get_leases()
492 self.logger.status("--- Haizea status summary ---")
493 self.logger.status("Number of leases (not including completed): %i" % len(leases))
494 self.logger.status("Completed leases: %i" % len(completed_leases))
495 self.logger.status("---- End summary ----")
496
498 """Loads persisted leases and scheduling information
499
500 This method does three things:
501 1. Recover persisted leases. Note that not all persisted leases
502 may be recoverable. For example, if a lease was scheduled
503 to start at a certain time, but that time passed while
504 Haizea was not running, the lease will simply be transitioned
505 to a failed state.
506 2. Recover the queue.
507 3. Recover the list of "future leases" as determined by
508 the backfilling algorithm.
509 """
510
511
512 leases = self.persistence.get_leases()
513 for lease in leases:
514
515 rrs = lease.preparation_rrs + lease.vm_rrs
516 for vmrr in lease.vm_rrs:
517 rrs += vmrr.pre_rrs + vmrr.post_rrs
518
519
520 for rr in rrs:
521 for restuple in rr.resources_in_pnode.values():
522 restuple.slottable = self.scheduler.slottable
523
524 self.logger.debug("Attempting to recover lease %i" % lease.id)
525 lease.print_contents()
526
527
528 load_rrs = False
529 lease_state = lease.get_state()
530 if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL):
531 self.logger.info("Recovered lease %i (already done)" % lease.id)
532 self.scheduler.completed_leases.add(lease)
533 elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING):
534 self.scheduler.leases.add(lease)
535 elif lease_state == Lease.STATE_QUEUED:
536 load_rrs = True
537 self.scheduler.leases.add(lease)
538 self.logger.info("Recovered lease %i (queued)" % lease.id)
539 elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY):
540
541 vmrr = lease.get_last_vmrr()
542 if len(vmrr.pre_rrs) > 0:
543 start = vmrr.pre_rrs[0].start
544 else:
545 start = vmrr.start
546 if self.clock.get_time() < start:
547 load_rrs = True
548 self.scheduler.leases.add(lease)
549 self.logger.info("Recovered lease %i" % lease.id)
550 else:
551 lease.set_state(Lease.STATE_FAIL)
552 self.scheduler.completed_leases.add(lease)
553 self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id)
554 elif lease_state == Lease.STATE_ACTIVE:
555 vmrr = lease.get_last_vmrr()
556 if self.clock.get_time() < self.clock.get_time():
557
558 load_rrs = True
559 self.scheduler.leases.add(lease)
560 self.logger.info("Recovered lease %i" % lease.id)
561 else:
562
563 lease.set_state(Lease.STATE_FAIL)
564 self.scheduler.completed_leases.add(lease)
565 self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id)
566 else:
567
568
569 lease.set_state(Lease.STATE_FAIL)
570 self.scheduler.completed_leases.add(lease)
571 self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state))
572
573
574 if load_rrs:
575 for rr in rrs:
576 if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED):
577 self.scheduler.slottable.add_reservation(rr)
578
579
580 queue = self.persistence.get_queue()
581 for lease_id in queue:
582 if self.scheduler.leases.has_lease(lease_id):
583 lease = self.scheduler.leases.get_lease(lease_id)
584 self.scheduler.queue.enqueue(lease)
585
586
587 future = self.persistence.get_future_leases()
588 for lease_id in future:
589 if self.scheduler.leases.has_lease(lease_id):
590 lease = self.scheduler.leases.get_lease(lease_id)
591 self.scheduler.vm_scheduler.future_leases.add(lease)
592
593
595 """Handles an unrecoverable error.
596
597 This method prints information on the unrecoverable error and makes Haizea panic.
598 """
599 self.logger.error("An unrecoverable error has happened.")
600 self.logger.error("Original exception:")
601 self.__print_exception(exc.exc, exc.get_traceback())
602 self.logger.error("Unrecoverable error traceback:")
603 self.__print_exception(exc, sys.exc_info()[2])
604 self.__panic()
605
607 """Handles an unrecoverable error.
608
609 This method prints information on the unrecoverable error and makes Haizea panic.
610 """
611 self.logger.error("An unexpected exception has happened.")
612 self.__print_exception(exc, sys.exc_info()[2])
613 self.__panic()
614
616 """Prints an exception's traceback to the log."""
617 tb = traceback.format_tb(exc_traceback)
618 for line in tb:
619 self.logger.error(line)
620 self.logger.error("Message: %s" % exc)
621
622
650
651
653 """Base class for the resource manager's clock.
654
655 The clock is in charge of periodically waking the resource manager so it
656 will process new requests and handle existing reservations. This is a
657 base class defining abstract methods.
658
659 """
663
665 """Return the current time"""
666 return abstract()
667
669 """Return the time at which the clock started ticking"""
670 return abstract()
671
673 """Return the next time at which resources could be scheduled.
674
675 The "next schedulable time" server sanity measure when running
676 in real time (to avoid scheduling something "now" to actually
677 have "now" be in the past once the scheduling function returns.
678 i.e., the "next schedulable time" has nothing to do with whether
679 there are resources available at that time or not.
680 """
681 return abstract()
682
684 """Start and run the clock. This function is, in effect,
685 the main loop of the resource manager."""
686 return abstract()
687
689 """Stop the clock.
690
691 Stopping the clock makes Haizea exit.
692 """
693 self.done = True
694
695
697 """Simulates the passage of time... really fast.
698
699 The simulated clock steps through time to produce an ideal schedule.
700 See the run() function for a description of how time is incremented
701 exactly in the simulated clock.
702
703 """
704
705 - def __init__(self, manager, starttime):
706 """Initialize the simulated clock, starting at the provided starttime"""
707 Clock.__init__(self, manager)
708 self.starttime = starttime
709 self.time = starttime
710 self.logger = logging.getLogger("CLOCK")
711 self.statusinterval = self.manager.config.get("status-message-interval")
712
714 """See docstring in base Clock class."""
715 return self.time
716
718 """See docstring in base Clock class."""
719 return self.starttime
720
722 """See docstring in base Clock class."""
723 return self.time
724
785
786
788 """Determines what is the next point in time to skip to.
789
790 At a given point in time, the next time is the earliest of the following:
791 * The arrival of the next lease request
792 * The start or end of a reservation (a "changepoint" in the slot table)
793 * A premature end of a lease
794 """
795
796
797 tracefrontend = self.__get_trace_frontend()
798 nextchangepoint = self.manager.get_next_changepoint()
799 nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
800 nextreqtime = tracefrontend.get_next_request_time()
801 self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
802 self.logger.debug("Next request time: %s" % nextreqtime)
803 self.logger.debug("Next premature end: %s" % nextprematureend)
804
805
806 prevtime = self.time
807
808
809
810 newtime = self.time
811
812
813 if nextchangepoint != None and nextreqtime == None:
814 newtime = nextchangepoint
815 elif nextchangepoint == None and nextreqtime != None:
816 newtime = nextreqtime
817 elif nextchangepoint != None and nextreqtime != None:
818 newtime = min(nextchangepoint, nextreqtime)
819
820 if nextprematureend != None:
821 newtime = min(nextprematureend, newtime)
822
823
824
825 if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
826 self.done = True
827
828
829
830 stopwhen = self.manager.config.get("stop-when")
831 besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
832 pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
833 if stopwhen == constants.STOPWHEN_BEDONE:
834 if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
835 self.done = True
836 elif stopwhen == constants.STOPWHEN_BESUBMITTED:
837 if len(pendingbesteffort) == 0:
838 self.done = True
839
840
841
842 if newtime == prevtime and self.done != True:
843 raise Exception, "Simulated clock has fallen into an infinite loop."
844
845 return newtime, self.done
846
848 """Gets the tracefile frontend from the resource manager"""
849 frontends = self.manager.frontends
850 tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
851 if len(tracef) != 1:
852 raise Exception, "The simulated clock can only work with a tracefile request frontend."
853 else:
854 return tracef[0]
855
856
858 """A realtime clock.
859
860 The real clock wakes up periodically to, in turn, tell the resource manager
861 to wake up. The real clock can also be run in a "fastforward" mode for
862 debugging purposes (however, unlike the simulated clock, the clock will
863 always skip a fixed amount of time into the future).
864 """
865 - def __init__(self, manager, quantum, non_sched, fastforward = False):
866 """Initializes the real clock.
867
868 Arguments:
869 manager -- the resource manager
870 quantum -- interval between clock wakeups
871 fastforward -- if True, the clock won't actually sleep
872 for the duration of the quantum."""
873 Clock.__init__(self, manager)
874 self.fastforward = fastforward
875 if not self.fastforward:
876 self.lastwakeup = None
877 else:
878 self.lastwakeup = round_datetime(now())
879 self.logger = logging.getLogger("CLOCK")
880 self.starttime = self.get_time()
881 self.nextschedulable = None
882 self.nextperiodicwakeup = None
883 self.quantum = TimeDelta(seconds=quantum)
884 self.non_sched = TimeDelta(seconds=non_sched)
885
887 """See docstring in base Clock class."""
888 if not self.fastforward:
889 return now()
890 else:
891 return self.lastwakeup
892
894 """See docstring in base Clock class."""
895 return self.starttime
896
898 """See docstring in base Clock class."""
899 return self.nextschedulable
900
902 """Runs the real clock through time.
903
904 The clock starts when run() is called. In each iteration of the main loop
905 it will do the following:
906 - Wake up the resource manager
907 - Determine if there will be anything to do before the next
908 time the clock will wake up (after the quantum has passed). Note
909 that this information is readily available on the slot table.
910 If so, set next-wakeup-time to (now + time until slot table
911 event). Otherwise, set it to (now + quantum)
912 - Sleep until next-wake-up-time
913
914 The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
915 foreground) or a SIGTERM signal is received.
916 """
917 self.logger.status("Starting clock")
918 self.manager.accounting.start(self.get_start_time())
919
920 try:
921 signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
922 signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
923 except ValueError, exc:
924
925
926
927 pass
928
929
930 while not self.done:
931 self.logger.status("Waking up to manage resources")
932
933
934
935
936 if not self.fastforward:
937 self.lastwakeup = round_datetime(self.get_time())
938 self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
939
940
941 self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
942
943
944 new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()
945 for n in new_nodes:
946 rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity)
947 self.manager.scheduler.slottable.add_node(n.id, rt)
948
949
950 self.manager.process_ending_reservations(self.lastwakeup)
951 self.manager.process_starting_reservations(self.lastwakeup)
952
953 self.manager.process_requests(self.nextschedulable)
954
955 self.manager.accounting.at_timestep(self.manager.scheduler)
956
957
958 time_now = now()
959 if self.lastwakeup + self.quantum <= time_now:
960 quantums = (time_now - self.lastwakeup) / self.quantum
961 quantums = int(ceil(quantums)) * self.quantum
962 self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
963 else:
964 self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
965
966
967 nextchangepoint = self.manager.get_next_changepoint()
968 if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
969
970 nextwakeup = nextchangepoint
971 self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
972 else:
973
974 nextwakeup = self.nextperiodicwakeup
975 self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
976
977
978
979
980
981
982
983
984 if self.manager.config._options.has_key("stop-when-no-more-leases"):
985 stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
986 if stop_when_no_more_leases and not self.manager.exists_more_leases():
987 self.done = True
988
989
990 if not self.done:
991 if not self.fastforward:
992 sleep((nextwakeup - now()).seconds)
993 else:
994 self.lastwakeup = nextwakeup
995
996 self.logger.status("Real clock has stopped")
997
998
999 self.manager.graceful_stop()
1000
1002 """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
1003
1004 sigstr = ""
1005 if signum == signal.SIGTERM:
1006 sigstr = " (SIGTERM)"
1007 elif signum == signal.SIGINT:
1008 sigstr = " (SIGINT)"
1009 self.logger.status("Received signal %i%s" %(signum, sigstr))
1010 self.done = True
1011
1012
1014 """Persistence manager.
1015
1016 The persistence manager is in charge of persisting leases, and some
1017 scheduling data, to disk. This allows Haizea to recover from crashes.
1018 """
1019
1021 """Constructor
1022
1023 Initializes the persistence manager. If the specified file
1024 does not exist, it is created. If the file is created, it
1025 is opened but the information is not recovered (this is
1026 the responsibility of the Manager class)
1027
1028 Arguments:
1029 file -- Persistence file. If None is specified, then
1030 persistence is disabled and Haizea will run entirely
1031 in-memory.
1032 """
1033 if file == None:
1034 self.disabled = True
1035 self.shelf = {}
1036 else:
1037 self.disabled = False
1038 file = os.path.expanduser(file)
1039 d = os.path.dirname(file)
1040 if not os.path.exists(d):
1041 os.makedirs(d)
1042 self.shelf = shelve.open(file, flag='c', protocol = -1)
1043
1045 """Persists a single lease to disk
1046
1047 Arguments:
1048 lease -- Lease to persist
1049 """
1050 if not self.disabled:
1051 self.shelf["lease-%i" % lease.id] = lease
1052 self.shelf.sync()
1053
1055 """Persists the queue to disk
1056
1057 Arguments:
1058 queue -- The queue
1059 """
1060 if not self.disabled:
1061 self.shelf["queue"] = [l.id for l in queue]
1062 self.shelf.sync()
1063
1065 """Persists the set of future leases
1066
1067 Arguments:
1068 leases -- "Future leases" (as determined by backfilling algorithm)
1069 """
1070 if not self.disabled:
1071 self.shelf["future"] = [l.id for l in leases]
1072 self.shelf.sync()
1073
1075 """Returns the leases persisted to disk.
1076
1077 """
1078 return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1079
1081 """Returns the queue persisted to disk.
1082
1083 """
1084 if self.shelf.has_key("queue"):
1085 return self.shelf["queue"]
1086 else:
1087 return []
1088
1090 """Returns the future leases persisted to disk.
1091
1092 """
1093 if self.shelf.has_key("future"):
1094 return self.shelf["future"]
1095 else:
1096 return []
1097
1099 """Closes the persistence manager.
1100
1101 Closing the persistence manager saves any remaining
1102 data to disk.
1103 """
1104 if not self.disabled:
1105 self.shelf.close()
1106