commit/galaxy-central: natefoo: Filesystem free space checking for the object store.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/096f4040e90d/ changeset: 096f4040e90d user: natefoo date: 2012-01-16 18:04:45 summary: Filesystem free space checking for the object store. affected #: 5 files diff -r d3b6789cbc90c65ca78a5205547a1b6d4bc39064 -r 096f4040e90d3516c8ad6093f4e2fcd618c43f6b lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -9,6 +9,7 @@ from galaxy.util.json import from_json_string from galaxy.util.expressions import ExpressionContext from galaxy.jobs.actions.post import ActionBox +from galaxy.exceptions import ObjectInvalid from sqlalchemy.sql.expression import and_, or_ @@ -321,9 +322,12 @@ # directory to be set before prepare is run, or else premature deletion # and job recovery fail. # Create the working dir if necessary - self.app.object_store.create(job, base_dir='job_work', dir_only=True, extra_dir=str(self.job_id)) - self.working_directory = self.app.object_store.get_filename(job, base_dir='job_work', dir_only=True, extra_dir=str(self.job_id)) - log.debug('(%s) Working directory for job is: %s' % (self.job_id, self.working_directory)) + try: + self.app.object_store.create(job, base_dir='job_work', dir_only=True, extra_dir=str(self.job_id)) + self.working_directory = self.app.object_store.get_filename(job, base_dir='job_work', dir_only=True, extra_dir=str(self.job_id)) + log.debug('(%s) Working directory for job is: %s' % (self.job_id, self.working_directory)) + except ObjectInvalid: + raise Exception('Unable to create job working directory, job failure') self.output_paths = None self.output_dataset_paths = None self.tool_provided_job_metadata = None diff -r d3b6789cbc90c65ca78a5205547a1b6d4bc39064 -r 096f4040e90d3516c8ad6093f4e2fcd618c43f6b lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -8,6 +8,7 @@ import time import random import shutil +import statvfs import logging import threading import subprocess @@ -40,7 +41,6 @@ def shutdown(self): self.running = False - self.extra_dirs = {} def exists(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ @@ -170,6 +170,12 @@ See `exists` method for the description of the fields. """ raise NotImplementedError() + + def get_store_usage_percent(self): + """ + Return the percentage indicating how full the store is + """ + raise NotImplementedError() ## def get_staging_command( id ): ## """ @@ -342,6 +348,9 @@ def get_object_url(self, obj, **kwargs): return None + def get_store_usage_percent(self): + st = os.statvfs(self.file_path) + return (float(st.f_blocks - st.f_bavail)/st.f_blocks) * 100 class CachingObjectStore(ObjectStore): @@ -831,6 +840,9 @@ log.warning("Trouble generating URL for dataset '%s': %s" % (rel_path, ex)) return None + def get_store_usage_percent(self): + return 0.0 + class DistributedObjectStore(ObjectStore): """ @@ -847,18 +859,29 @@ "'distributed_object_store_config_file')" self.backends = {} self.weighted_backend_ids = [] + self.original_weighted_backend_ids = [] + self.max_percent_full = {} + self.global_max_percent_full = 0.0 random.seed() self.__parse_distributed_config(config) + if self.global_max_percent_full or filter(lambda x: x is not None, self.max_percent_full.values()): + self.sleeper = Sleeper() + self.filesystem_monitor_thread = threading.Thread(target=self.__filesystem_monitor) + self.filesystem_monitor_thread.start() + log.info("Filesystem space monitor started") + def __parse_distributed_config(self, config): tree = util.parse_xml(self.distributed_config) root = tree.getroot() log.debug('Loading backends for distributed object store from %s' % self.distributed_config) + self.global_max_percent_full = float(root.get('maxpctfull', 0)) for elem in [ e for e in root if e.tag == 'backend' ]: id = elem.get('id') weight = int(elem.get('weight', 1)) + maxpctfull = float(elem.get('maxpctfull', 0)) if elem.get('type', 'disk'): path = None extra_dirs = {} @@ -869,6 +892,7 @@ type = sub.get('type') extra_dirs[type] = sub.get('path') self.backends[id] = DiskObjectStore(config, file_path=path, extra_dirs=extra_dirs) + self.max_percent_full[id] = maxpctfull log.debug("Loaded disk backend '%s' with weight %s and file_path: %s" % (id, weight, path)) if extra_dirs: log.debug(" Extra directories:") @@ -879,13 +903,26 @@ # sequence the number of times equalling weight, then randomly # choose a backend from that sequence at creation self.weighted_backend_ids.append(id) + self.original_weighted_backend_ids = self.weighted_backend_ids + + def __filesystem_monitor(self): + while self.running: + new_weighted_backend_ids = self.original_weighted_backend_ids + for id, backend in self.backends.items(): + maxpct = self.max_percent_full[id] or self.global_max_percent_full + pct = backend.get_store_usage_percent() + if pct > maxpct: + new_weighted_backend_ids = filter(lambda x: x != id, new_weighted_backend_ids) + self.weighted_backend_ids = new_weighted_backend_ids + self.sleeper.sleep(120) # Test free space every 2 minutes + + def shutdown(self): + super(DistributedObjectStore, self).shutdown() + self.sleeper.wake() def exists(self, obj, **kwargs): return self.__call_method('exists', obj, False, False, **kwargs) - #def store_id(self, obj, **kwargs): - # return self.__get_store_id_for(obj, **kwargs)[0] - def file_ready(self, obj, **kwargs): return self.__call_method('file_ready', obj, False, False, **kwargs) @@ -894,8 +931,11 @@ create() is the only method in which obj.object_store_id may be None """ if obj.object_store_id is None or not self.exists(obj, **kwargs): - if obj.object_store_id is None or obj.object_store_id not in self.backends: - obj.object_store_id = random.choice(self.weighted_backend_ids) + if obj.object_store_id is None or obj.object_store_id not in self.weighted_backend_ids: + try: + obj.object_store_id = random.choice(self.weighted_backend_ids) + except IndexError: + raise ObjectInvalid() object_session( obj ).add( obj ) object_session( obj ).flush() log.debug("Selected backend '%s' for creation of %s %s" % (obj.object_store_id, obj.__class__.__name__, obj.id)) diff -r d3b6789cbc90c65ca78a5205547a1b6d4bc39064 -r 096f4040e90d3516c8ad6093f4e2fcd618c43f6b lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1142,7 +1142,11 @@ return "tool_form.mako", dict( errors=errors, tool_state=state, incoming=incoming, error_message=error_message ) # If we've completed the last page we can execute the tool elif state.page == self.last_page: - _, out_data = self.execute( trans, incoming=params, history=history ) + try: + _, out_data = self.execute( trans, incoming=params, history=history ) + except Exception, e: + log.exception('Exception caught while attempting tool execution:') + return 'message.mako', dict( status='error', message='Error executing tool: %s' % str(e), refresh_frames=[] ) try: assert isinstance( out_data, odict ) return 'tool_executed.mako', dict( out_data=out_data ) diff -r d3b6789cbc90c65ca78a5205547a1b6d4bc39064 -r 096f4040e90d3516c8ad6093f4e2fcd618c43f6b lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -6,6 +6,7 @@ from galaxy.util.template import fill_template from galaxy.util.none_like import NoneDataset from galaxy.web import url_for +from galaxy.exceptions import ObjectInvalid import galaxy.tools from types import * @@ -295,7 +296,10 @@ # created in the "default" store, all others will be created in # the same store as the first. data.dataset.object_store_id = object_store_id - trans.app.object_store.create( data.dataset ) + try: + trans.app.object_store.create( data.dataset ) + except ObjectInvalid: + raise Exception('Unable to create output dataset: object store is full') object_store_id = data.dataset.object_store_id # these will be the same thing after the first output # This may not be neccesary with the new parent/child associations data.designation = name diff -r d3b6789cbc90c65ca78a5205547a1b6d4bc39064 -r 096f4040e90d3516c8ad6093f4e2fcd618c43f6b lib/galaxy/tools/actions/upload_common.py --- a/lib/galaxy/tools/actions/upload_common.py +++ b/lib/galaxy/tools/actions/upload_common.py @@ -5,6 +5,7 @@ from galaxy.datatypes import sniff from galaxy.util.json import to_json_string from galaxy.model.orm import eagerload_all +from galaxy.exceptions import ObjectInvalid import logging log = logging.getLogger( __name__ ) @@ -328,7 +329,10 @@ # Create an empty file immediately if not dataset.dataset.external_filename: dataset.dataset.object_store_id = object_store_id - trans.app.object_store.create( dataset.dataset ) + try: + trans.app.object_store.create( dataset.dataset ) + except ObjectInvalid: + raise Exception('Unable to create output dataset: object store is full') object_store_id = dataset.dataset.object_store_id trans.sa_session.add( dataset ) # open( dataset.file_name, "w" ).close() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket