1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import haizea.common.constants as constants
20 from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
21 from haizea.core.scheduler.slottable import ResourceReservation
22 from haizea.core.scheduler import MigrationResourceReservation, InconsistentLeaseStateError
23 from haizea.core.leases import Lease, Capacity, UnmanagedSoftwareEnvironment
24 from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
25 from haizea.common.utils import estimate_transfer_time, get_config
26 from mx.DateTime import TimeDelta
27
28 import bisect
29 import logging
32 - def __init__(self, slottable, resourcepool, deployment_enact):
52
53 - def schedule(self, lease, vmrr, earliest):
60
62 if type(lease.software) == UnmanagedSoftwareEnvironment:
63 return []
64
65
66
67 last_vmrr = lease.get_last_vmrr()
68 vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
69
70 mustmigrate = False
71 for vnode in vnode_migrations:
72 if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
73 mustmigrate = True
74 break
75
76 if not mustmigrate:
77 return []
78
79 if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
80 start = nexttime
81 end = nexttime
82 res = {}
83 migr_rr = DiskImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
84 migr_rr.state = ResourceReservation.STATE_SCHEDULED
85 return [migr_rr]
86
87
88 migrations = []
89 while len(vnode_migrations) > 0:
90 pnodes = set()
91 migration = {}
92 for vnode in vnode_migrations:
93 origin = vnode_migrations[vnode][0]
94 dest = vnode_migrations[vnode][1]
95 if not origin in pnodes and not dest in pnodes:
96 migration[vnode] = vnode_migrations[vnode]
97 pnodes.add(origin)
98 pnodes.add(dest)
99 for vnode in migration:
100 del vnode_migrations[vnode]
101 migrations.append(migration)
102
103
104 start = max(last_vmrr.post_rrs[-1].end, nexttime)
105 bandwidth = self.resourcepool.info.get_migration_bandwidth()
106 migr_rrs = []
107 for m in migrations:
108 mb_to_migrate = lease.software.image_size * len(m.keys())
109 migr_time = estimate_transfer_time(mb_to_migrate, bandwidth)
110 end = start + migr_time
111 res = {}
112 for (origin,dest) in m.values():
113 resorigin = Capacity([constants.RES_NETOUT])
114 resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
115 resdest = Capacity([constants.RES_NETIN])
116 resdest.set_quantity(constants.RES_NETIN, bandwidth)
117 res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
118 res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)
119 migr_rr = DiskImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
120 migr_rr.state = ResourceReservation.STATE_SCHEDULED
121 migr_rrs.append(migr_rr)
122 start = end
123
124 return migr_rrs
125
138
190
197
199 self.__remove_files(lease)
200
201
203 config = get_config()
204 reusealg = config.get("diskimage-reuse")
205 avoidredundant = config.get("avoid-redundant-transfers")
206 is_ready = False
207
208 musttransfer = {}
209 mustpool = {}
210 nodeassignment = vmrr.nodes
211 start = lease.start.requested
212 end = lease.start.requested + lease.duration.requested
213 for (vnode, pnode) in nodeassignment.items():
214 lease_id = lease.id
215 self.logger.debug("Scheduling image transfer of '%s' for vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
216
217 if reusealg == constants.REUSE_IMAGECACHES:
218 if self.resourcepool.exists_reusable_image(pnode, lease.software.image_id, start):
219 self.logger.debug("No need to schedule an image transfer (reusing an image in pool)")
220 mustpool[vnode] = pnode
221 else:
222 self.logger.debug("Need to schedule a transfer.")
223 musttransfer[vnode] = pnode
224 else:
225 self.logger.debug("Need to schedule a transfer.")
226 musttransfer[vnode] = pnode
227
228 if len(musttransfer) == 0:
229 is_ready = True
230 else:
231 try:
232 transfer_rrs = self.__schedule_imagetransfer_edf(lease, musttransfer, earliest)
233 except NotSchedulableException, exc:
234 raise
235
236
237
238 if reusealg == constants.REUSE_IMAGECACHES:
239 for (vnode, pnode) in mustpool.items():
240 self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, start)
241 self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
242
243 return transfer_rrs, is_ready
244
245
247 config = get_config()
248 reusealg = config.get("diskimage-reuse")
249 avoidredundant = config.get("avoid-redundant-transfers")
250
251 is_ready = False
252
253 transfer_rrs = []
254 musttransfer = {}
255 piggybacking = []
256 for (vnode, pnode) in vmrr.nodes.items():
257 earliest_type = earliest[pnode].type
258 if earliest_type == ImageTransferEarliestStartingTime.EARLIEST_REUSE:
259
260 self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode))
261 self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.software.image_id, lease.id, vnode, vmrr.end)
262 self.resourcepool.add_diskimage(pnode, lease.software.image_id, lease.software.image_size, lease.id, vnode)
263 elif earliest_type == ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK:
264
265 transfer_rr = earliest[pnode].piggybacking_on
266 transfer_rr.piggyback(lease.id, vnode, pnode)
267 self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transfer_rr.lease.id))
268 piggybacking.append(transfer_rr)
269 else:
270
271 musttransfer[vnode] = pnode
272 self.logger.debug("Must transfer V%i->P%i." % (vnode, pnode))
273
274 if len(musttransfer)>0:
275 transfer_rrs = self.__schedule_imagetransfer_fifo(lease, musttransfer, earliest)
276
277 if len(musttransfer)==0 and len(piggybacking)==0:
278 is_ready = True
279
280 return transfer_rrs, is_ready
281
282
316
354
355
357 config = get_config()
358 force_transfer_time = config.get("force-imagetransfer-time")
359 if force_transfer_time != None:
360 return force_transfer_time
361 else:
362 return estimate_transfer_time(lease.software.image_size, bandwidth)
363
364
366
367
368 if len(self.transfers) == 0:
369 return nexttime
370 elif nexttime + required_duration <= self.transfers[0].start:
371 return nexttime
372 else:
373 for i in xrange(len(self.transfers) - 1):
374 if self.transfers[i].end != self.transfers[i+1].start:
375 hole_duration = self.transfers[i+1].start - self.transfers[i].end
376 if hole_duration >= required_duration:
377 return self.transfers[i].end
378 return self.transfers[-1].end
379
380
382
383
384 if len(self.transfers) == 0:
385 return deadline - required_duration
386 elif self.transfers[-1].end + required_duration <= deadline:
387 return deadline - required_duration
388 else:
389 for i in xrange(len(self.transfers) - 1, 0, -1):
390 if self.transfers[i].start != self.transfers[i-1].end:
391 hole_duration = self.transfers[i].start - self.transfers[i-1].end
392 if hole_duration >= required_duration:
393 return self.transfers[i].start - required_duration
394 return self.transfers[0].start - required_duration
395
397 toremove = []
398 for t in self.transfers:
399 for pnode in t.transfers:
400 leases = [l for l, v in t.transfers[pnode]]
401 if lease in leases:
402 newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease]
403 t.transfers[pnode] = newtransfers
404
405 a = sum([len(l) for l in t.transfers.values()])
406 if a == 0:
407 toremove.append(t)
408 for t in toremove:
409 self.transfers.remove(t)
410
411 return toremove
412
416
417 @staticmethod
434
435 @staticmethod
437 sched.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id)
438 lease.print_contents()
439 lease_state = lease.get_state()
440 if lease_state == Lease.STATE_PREPARING:
441 lease.set_state(Lease.STATE_READY)
442 rr.state = ResourceReservation.STATE_DONE
443 for physnode in rr.transfers:
444 vnodes = rr.transfers[physnode]
445
446
447
448
449
450
451
452
453
454
455 maxend = None
456
457 sched._add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
458 else:
459 raise InconsistentLeaseStateError(lease, doing = "ending a file transfer")
460
461 sched.transfers.remove(rr)
462 lease.print_contents()
463 sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
464 sched.logger.info("Completed image transfer for lease %i" % (lease.id))
465
473
489
490 - def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
491 self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
492
493 config = get_config()
494 reusealg = config.get("diskimage-reuse")
495 if reusealg == constants.REUSE_IMAGECACHES:
496 maxcachesize = config.get("diskimage-cache-size")
497 else:
498 maxcachesize = None
499
500 pnode = self.resourcepool.get_node(pnode_id)
501
502 if reusealg == constants.REUSE_NONE:
503 for (lease_id, vnode) in vnodes:
504 self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
505 elif reusealg == constants.REUSE_IMAGECACHES:
506
507
508
509 if pnode.exists_reusable_image(diskimage_id):
510 for (lease_id, vnode) in vnodes:
511 pnode.add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
512 else:
513 if maxcachesize == constants.CACHESIZE_UNLIMITED:
514 can_add_to_cache = True
515 else:
516
517 cachesize = pnode.get_reusable_images_size()
518 reqsize = cachesize + diskimage_size
519 if reqsize > maxcachesize:
520
521 desiredsize = maxcachesize - diskimage_size
522 self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (pnode_id, reqsize, desiredsize))
523 pnode.print_files()
524 success = pnode.purge_downto(maxcachesize)
525 if not success:
526 can_add_to_cache = False
527 else:
528 can_add_to_cache = True
529 else:
530 can_add_to_cache = True
531
532 if can_add_to_cache:
533 self.resourcepool.add_reusable_image(pnode_id, diskimage_id, diskimage_size, vnodes, timeout)
534 else:
535
536
537 self.logger.debug("Unable to add to pool. Must create individual disk images directly instead.")
538
539
540
541 for (lease_id, vnode) in vnodes:
542 self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
543
544 pnode.print_files()
545
547 - def __init__(self, lease, res, start=None, end=None):
553
554 - def print_contents(self, loglevel="VDEBUG"):
555 ResourceReservation.print_contents(self, loglevel)
556 logger = logging.getLogger("LEASES")
557 logger.log(loglevel, "Type : FILE TRANSFER")
558 logger.log(loglevel, "Deadline : %s" % self.deadline)
559 logger.log(loglevel, "File : %s" % self.file)
560 logger.log(loglevel, "Transfers : %s" % self.transfers)
561
562 - def piggyback(self, lease_id, vnode, physnode):
563 if self.transfers.has_key(physnode):
564 self.transfers[physnode].append((lease_id, vnode))
565 else:
566 self.transfers[physnode] = [(lease_id, vnode)]
567
570
573
583
585 - def __init__(self, lease, start, end, res, vmrr, transfers):
587
589 logger = logging.getLogger("LEASES")
590 logger.log(loglevel, "Type : DISK IMAGE MIGRATION")
591 logger.log(loglevel, "Transfers : %s" % self.transfers)
592 ResourceReservation.print_contents(self, loglevel)
593