Package haizea :: Package core :: Package scheduler :: Module resourcepool
[hide private]
[frames] | no frames]

Source Code for Module haizea.core.scheduler.resourcepool

  1  # -------------------------------------------------------------------------- # 
  2  # Copyright 2006-2009, University of Chicago                                 # 
  3  # Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   # 
  4  # Complutense de Madrid (dsa-research.org)                                   # 
  5  #                                                                            # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); you may    # 
  7  # not use this file except in compliance with the License. You may obtain    # 
  8  # a copy of the License at                                                   # 
  9  #                                                                            # 
 10  # http://www.apache.org/licenses/LICENSE-2.0                                 # 
 11  #                                                                            # 
 12  # Unless required by applicable law or agreed to in writing, software        # 
 13  # distributed under the License is distributed on an "AS IS" BASIS,          # 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   # 
 15  # See the License for the specific language governing permissions and        # 
 16  # limitations under the License.                                             # 
 17  # -------------------------------------------------------------------------- # 
 18   
 19  from haizea.common.utils import vnodemapstr 
 20  import haizea.common.constants as constants 
 21  import haizea.core.enact.actions as actions 
 22  from haizea.core.scheduler import EnactmentError 
 23  import logging  
24 25 26 -class ResourcePool(object):
27 - def __init__(self, info_enact, vm_enact, deploy_enact):
28 self.logger = logging.getLogger("RPOOL") 29 30 self.info = info_enact 31 self.vm = vm_enact 32 # TODO: Ideally, deployment enactment shouldn't be here, specially since 33 # it already "hangs" below the deployment modules. For now, 34 # it does no harm, though. 35 self.deployment = deploy_enact 36 37 self.nodes = self.info.get_nodes()
38
39 - def start_vms(self, lease, rr):
40 start_action = actions.VMEnactmentStartAction() 41 start_action.from_rr(rr) 42 43 for (vnode, pnode) in rr.nodes.items(): 44 node = self.get_node(pnode) 45 #diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id) 46 start_action.vnodes[vnode].pnode = node.enactment_info 47 #start_action.vnodes[vnode].diskimage = diskimage.filename 48 start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode] 49 50 try: 51 self.vm.start(start_action) 52 except EnactmentError, exc: 53 self.logger.error("Enactment of start VM failed: %s" % exc.message) 54 raise
55
56 - def stop_vms(self, lease, rr):
57 stop_action = actions.VMEnactmentStopAction() 58 stop_action.from_rr(rr) 59 try: 60 self.vm.stop(stop_action) 61 except EnactmentError, exc: 62 self.logger.error("Enactment of end VM failed: %s" % exc.message) 63 raise
64
65 - def suspend_vms(self, lease, rr):
66 # Add memory image files 67 for vnode in rr.vnodes: 68 pnode = rr.vmrr.nodes[vnode] 69 self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources[vnode].get_quantity(constants.RES_MEM)) 70 71 # Enact suspend 72 suspend_action = actions.VMEnactmentSuspendAction() 73 suspend_action.from_rr(rr) 74 try: 75 self.vm.suspend(suspend_action) 76 except EnactmentError, exc: 77 self.logger.error("Enactment of suspend VM failed: %s" % exc.message) 78 raise
79
80 - def verify_suspend(self, lease, rr):
81 verify_suspend_action = actions.VMEnactmentConfirmSuspendAction() 82 verify_suspend_action.from_rr(rr) 83 self.vm.verify_suspend(verify_suspend_action)
84
85 - def resume_vms(self, lease, rr):
86 # Remove memory image files 87 for vnode in rr.vnodes: 88 pnode = rr.vmrr.nodes[vnode] 89 self.remove_ramfile(pnode, lease.id, vnode) 90 91 # Enact resume 92 resume_action = actions.VMEnactmentResumeAction() 93 resume_action.from_rr(rr) 94 try: 95 self.vm.resume(resume_action) 96 except EnactmentError, exc: 97 self.logger.error("Enactment of resume VM failed: %s" % exc.message) 98 raise
99
100 - def verify_resume(self, lease, rr):
101 verify_resume_action = actions.VMEnactmentConfirmResumeAction() 102 verify_resume_action.from_rr(rr) 103 self.vm.verify_resume(verify_resume_action)
104
105 - def refresh_nodes(self):
106 new_nodes = self.info.refresh() 107 for node in new_nodes: 108 self.nodes[node.id] = node 109 return new_nodes
110
111 - def get_nodes(self):
112 return self.nodes.values()
113 114 # An auxiliary node is a host whose resources are going to be scheduled, but 115 # where no VMs are actually going to run. For example, a disk image repository node.
116 - def get_aux_nodes(self):
117 # TODO: We're only asking the deployment enactment module for auxiliary nodes. 118 # There might be a scenario where the info enactment module also reports 119 # auxiliary nodes. 120 return self.deployment.get_aux_nodes()
121
122 - def get_num_nodes(self):
123 return len(self.nodes)
124
125 - def get_node(self, node_id):
126 return self.nodes[node_id]
127
128 - def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
129 self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode)) 130 131 self.logger.vdebug("Files BEFORE:") 132 self.get_node(pnode).print_files() 133 134 imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id) 135 img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id) 136 self.get_node(pnode).add_file(img) 137 138 self.logger.vdebug("Files AFTER:") 139 self.get_node(pnode).print_files() 140 141 return img
142
143 - def remove_diskimage(self, pnode, lease, vnode):
144 node = self.get_node(pnode) 145 node.print_files() 146 147 self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode)) 148 node.remove_diskimage(lease, vnode) 149 150 node.print_files()
151
152 - def add_ramfile(self, pnode, lease_id, vnode, size):
153 node = self.get_node(pnode) 154 self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode)) 155 node.print_files() 156 f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode) 157 node.add_file(f) 158 node.print_files()
159
160 - def remove_ramfile(self, pnode, lease_id, vnode):
161 node = self.get_node(pnode) 162 self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode)) 163 node.print_files() 164 node.remove_ramfile(lease_id, vnode) 165 node.print_files()
166
167 - def get_max_disk_usage(self):
168 return max([n.get_disk_usage() for n in self.nodes.values()])
169
170 -class ResourcePoolNode(object):
171 - def __init__(self, node_id, hostname, capacity):
172 self.logger = logging.getLogger("RESOURCEPOOL") 173 self.id = node_id 174 self.hostname = hostname 175 self.capacity = capacity 176 self.files = [] 177 178 # enactment-specific information 179 self.enactment_info = None
180
181 - def get_capacity(self):
182 return self.capacity
183
184 - def add_file(self, f):
185 self.files.append(f)
186
187 - def get_diskimage(self, lease_id, vnode, diskimage_id):
188 image = [f for f in self.files if isinstance(f, DiskImageFile) and 189 f.diskimage_id == diskimage_id and 190 f.lease_id == lease_id and 191 f.vnode == vnode] 192 if len(image) == 0: 193 return None 194 elif len(image) == 1: 195 return image[0] 196 elif len(image) > 1: 197 self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.id)) 198 return image[0]
199
200 - def remove_diskimage(self, lease_id, vnode):
201 image = [f for f in self.files if isinstance(f, DiskImageFile) and 202 f.lease_id == lease_id and 203 f.vnode == vnode] 204 if len(image) > 0: 205 image = image[0] 206 self.files.remove(image)
207
208 - def remove_ramfile(self, lease_id, vnode):
209 ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode] 210 if len(ramfile) > 0: 211 ramfile = ramfile[0] 212 self.files.remove(ramfile)
213 214
215 - def get_disk_usage(self):
216 return sum([f.filesize for f in self.files])
217 218
219 - def get_diskimages(self):
220 return [f for f in self.files if isinstance(f, DiskImageFile)]
221
222 - def print_files(self):
223 images = "" 224 if len(self.files) > 0: 225 images = ", ".join([str(img) for img in self.files]) 226 self.logger.vdebug("Node %i files: %iMB %s" % (self.id, self.get_disk_usage(), images))
227
228 - def xmlrpc_marshall(self):
229 # Convert to something we can send through XMLRPC 230 h = {} 231 h["id"] = self.id 232 h["hostname"] = self.hostname 233 h["cpu"] = self.capacity.get_quantity(constants.RES_CPU) 234 h["mem"] = self.capacity.get_quantity(constants.RES_MEM) 235 236 return h
237
238 239 240 -class File(object):
241 - def __init__(self, filename, filesize):
242 self.filename = filename 243 self.filesize = filesize
244
245 -class DiskImageFile(File):
246 - def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
247 File.__init__(self, filename, filesize) 248 self.lease_id = lease_id 249 self.vnode = vnode 250 self.diskimage_id = diskimage_id
251
252 - def __str__(self):
253 return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
254
255 256 -class RAMImageFile(File):
257 - def __init__(self, filename, filesize, lease_id, vnode):
258 File.__init__(self, filename, filesize) 259 self.lease_id = lease_id 260 self.vnode = vnode
261
262 - def __str__(self):
263 return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
264
265 -class ResourcePoolWithReusableImages(ResourcePool):
266 - def __init__(self, info_enact, vm_enact, deploy_enact):
267 ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact) 268 269 self.nodes = dict([(id,ResourcePoolNodeWithReusableImages.from_node(node)) for id, node in self.nodes.items()])
270
271 - def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
272 self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode)) 273 274 self.logger.vdebug("Files BEFORE:") 275 self.get_node(pnode).print_files() 276 277 imagefile = "reusable-%s" % diskimage_id 278 img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout) 279 for (lease_id, vnode) in mappings: 280 img.add_mapping(lease_id, vnode) 281 282 self.get_node(pnode).add_reusable_image(img) 283 284 self.logger.vdebug("Files AFTER:") 285 self.get_node(pnode).print_files() 286 287 return img
288
289 - def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
290 self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
291
292 - def remove_diskimage(self, pnode_id, lease, vnode):
293 ResourcePool.remove_diskimage(self, pnode_id, lease, vnode) 294 self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id)) 295 for img in self.get_node(pnode_id).get_reusable_images(): 296 if (lease, vnode) in img.mappings: 297 img.mappings.remove((lease, vnode)) 298 self.get_node(pnode_id).print_files()
299 # Keep image around, even if it isn't going to be used 300 # by any VMs. It might be reused later on. 301 # It will be purged if space has to be made available 302 # for other images 303
304 - def get_nodes_with_reusable_image(self, diskimage_id, after = None):
305 return [n.id for n in self.get_nodes() if n.exists_reusable_image(diskimage_id, after=after)]
306
307 - def exists_reusable_image(self, pnode_id, diskimage_id, after):
308 return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
309
310 311 -class ResourcePoolNodeWithReusableImages(ResourcePoolNode):
312 - def __init__(self, node_id, hostname, capacity):
313 ResourcePoolNode.__init__(self, node_id, hostname, capacity) 314 self.reusable_images = []
315 316 @classmethod
317 - def from_node(cls, n):
318 node = cls(n.id, n.hostname, n.capacity) 319 node.enactment_info = n.enactment_info 320 return node
321
322 - def add_reusable_image(self, f):
323 self.reusable_images.append(f)
324
325 - def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
326 for f in self.reusable_images: 327 if f.diskimage_id == diskimage_id: 328 f.add_mapping(lease_id, vnode) 329 f.update_timeout(timeout) 330 break # Ugh 331 self.print_files()
332
333 - def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
334 images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id] 335 if after != None: 336 images = [i for i in images if i.timeout >= after] 337 if lease_id != None and vnode != None: 338 images = [i for i in images if i.has_mapping(lease_id, vnode)] 339 if len(images)>0: 340 return images[0] 341 else: 342 return None
343
344 - def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
345 entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode) 346 if entry == None: 347 return False 348 else: 349 return True
350
351 - def get_reusable_images(self):
352 return self.reusable_images
353
354 - def get_reusable_images_size(self):
355 return sum([f.filesize for f in self.reusable_images])
356
358 unused = [img for img in self.reusable_images if not img.has_mappings()] 359 if len(unused) == 0: 360 return 0 361 else: 362 i = iter(unused) 363 oldest = i.next() 364 for img in i: 365 if img.timeout < oldest.timeout: 366 oldest = img 367 self.reusable_images.remove(oldest) 368 return 1
369
370 - def purge_downto(self, target):
371 done = False 372 while not done: 373 removed = self.purge_oldest_unused_image() 374 if removed==0: 375 done = True 376 success = False 377 elif removed == 1: 378 if self.get_reusable_images_size() <= target: 379 done = True 380 success = True 381 return success
382
383 - def print_files(self):
384 ResourcePoolNode.print_files(self) 385 images = "" 386 if len(self.reusable_images) > 0: 387 images = ", ".join([str(img) for img in self.reusable_images]) 388 self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.id, self.get_reusable_images_size(), images))
389
390 -class ReusableDiskImageFile(File):
391 - def __init__(self, filename, filesize, diskimage_id, timeout):
392 File.__init__(self, filename, filesize) 393 self.diskimage_id = diskimage_id 394 self.mappings = set([]) 395 self.timeout = timeout
396
397 - def add_mapping(self, lease_id, vnode):
398 self.mappings.add((lease_id, vnode))
399
400 - def has_mapping(self, lease_id, vnode):
401 return (lease_id, vnode) in self.mappings
402
403 - def has_mappings(self):
404 return len(self.mappings) > 0
405
406 - def update_timeout(self, timeout):
407 if timeout > self.timeout: 408 self.timeout = timeout
409
410 - def is_expired(self, curTime):
411 if self.timeout == None: 412 return False 413 elif self.timeout > curTime: 414 return True 415 else: 416 return False
417
418 - def __str__(self):
419 if self.timeout == None: 420 timeout = "NOTIMEOUT" 421 else: 422 timeout = self.timeout 423 return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
424