[hg] galaxy 1591: James' changes allowing the server to run w/o ...
details: http://www.bx.psu.edu/hg/galaxy/rev/c4644668afff changeset: 1591:c4644668afff user: Nate Coraor <nate@bx.psu.edu> date: Fri Oct 31 10:22:46 2008 -0400 description: James' changes allowing the server to run w/o a job runner, and nginx-specific performance improvements. 11 file(s) affected in this change: lib/galaxy/app.py lib/galaxy/config.py lib/galaxy/jobs/__init__.py lib/galaxy/model/__init__.py lib/galaxy/tools/__init__.py lib/galaxy/tools/actions/upload.py lib/galaxy/tools/parameters/__init__.py lib/galaxy/tools/parameters/basic.py lib/galaxy/web/controllers/root.py lib/galaxy/web/framework/base.py universe_wsgi.ini.sample diffs (576 lines): diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/app.py --- a/lib/galaxy/app.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/app.py Fri Oct 31 10:22:46 2008 -0400 @@ -31,9 +31,11 @@ #Load datatype converters self.datatypes_registry.load_datatype_converters( self.toolbox ) # Start the job queue - job_dispatcher = jobs.DefaultJobDispatcher( self ) - self.job_queue = jobs.JobQueue( self, job_dispatcher ) - self.job_stop_queue = jobs.JobStopQueue( self, job_dispatcher ) + self.job_manager = jobs.JobManager( self ) + # FIXME: These are exposed directly for backward compatibility + self.job_queue = self.job_manager.job_queue + self.job_stop_queue = self.job_manager.job_stop_queue + # Heartbeat and memdump for thread / heap profiling self.heartbeat = None self.memdump = None # Start the heartbeat process if configured and available @@ -48,7 +50,6 @@ if memdump.Memdump: self.memdump = memdump.Memdump() def shutdown( self ): - self.job_stop_queue.shutdown() - self.job_queue.shutdown() + self.job_manager.shutdown() if self.heartbeat: self.heartbeat.shutdown() diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/config.py --- a/lib/galaxy/config.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/config.py Fri Oct 31 10:22:46 2008 -0400 @@ -64,6 +64,11 @@ self.bugs_email = kwargs.get( 'bugs_email', None ) self.blog_url = kwargs.get( 'blog_url', None ) self.screencasts_url = kwargs.get( 'screencasts_url', None ) + # Configuration options for taking advantage of nginx features + self.nginx_x_accel_redirect_base = kwargs.get( 'nginx_x_accel_redirect_base', False ) + self.nginx_upload_location = kwargs.get( 'nginx_upload_store', False ) + if self.nginx_upload_location: + self.nginx_upload_location = os.path.abspath( self.nginx_upload_location ) # Parse global_conf and save the parser global_conf = kwargs.get( 'global_conf', None ) global_conf_parser = ConfigParser.ConfigParser() @@ -78,6 +83,11 @@ self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' ) def get( self, key, default ): return self.config_dict.get( key, default ) + def get_bool( self, key, default ): + if key in self.config_dict: + return string_as_bool( key ) + else: + return default def check( self ): # Check that required directories exist for path in self.root, self.file_path, self.tool_path, self.tool_data_path, self.template_path, self.job_working_directory: diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/jobs/__init__.py Fri Oct 31 10:22:46 2008 -0400 @@ -16,6 +16,27 @@ # States for running a job. These are NOT the same as data states JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted' + +class JobManager( object ): + """ + Highest level interface to job management. + + TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. + This should be decoupled. + """ + def __init__( self, app ): + self.app = app + if self.app.config.get_bool( "enable_job_running", True ): + # The dispatcher launches the underlying job runners + self.dispatcher = DefaultJobDispatcher( app ) + # Queues for starting and stopping jobs + self.job_queue = JobQueue( app, self.dispatcher ) + self.job_stop_queue = JobStopQueue( app, self.dispatcher ) + else: + self.job_queue = self.job_stop_queue = NoopQueue() + def shutdown( self ): + self.job_queue.shutdown() + self.job_stop_queue.shutdown() class Sleeper( object ): """ @@ -594,49 +615,11 @@ pass for job in jobs: - # jobs in a non queued/running/new state do not need to be stopped - if job.state not in [ model.Job.states.QUEUED, model.Job.states.RUNNING, model.Job.states.NEW ]: - return - # job has multiple datasets that aren't parent/child and not all of them are deleted. - if not self.check_if_output_datasets_deleted( job.id ): - return - self.mark_deleted( job.id ) # job is in JobQueue or FooJobRunner, will be dequeued due to state change above if job.job_runner_name is None: return # tell the dispatcher to stop the job self.dispatcher.stop( job ) - - def check_if_output_datasets_deleted( self, job_id ): - job = model.Job.get( job_id ) - for dataset_assoc in job.output_datasets: - dataset = dataset_assoc.dataset - dataset.refresh() - #only the originator of the job can delete a dataset to cause - #cancellation of the job, no need to loop through history_associations - if not dataset.deleted: - return False - return True - - def mark_deleted( self, job_id ): - job = model.Job.get( job_id ) - job.refresh() - job.state = job.states.DELETED - job.info = "Job output deleted by user before job completed." - job.flush() - for dataset_assoc in job.output_datasets: - dataset = dataset_assoc.dataset - dataset.refresh() - dataset.deleted = True - dataset.state = dataset.states.DISCARDED - dataset.dataset.flush() - for dataset in dataset.dataset.history_associations: - #propagate info across shared datasets - dataset.deleted = True - dataset.blurb = 'deleted' - dataset.peek = 'Job deleted' - dataset.info = 'Job output deleted by user before job completed' - dataset.flush() def put( self, job ): self.queue.put( job ) @@ -652,3 +635,12 @@ self.queue.put( self.STOP_SIGNAL ) self.sleeper.wake() log.info( "job stopper stopped" ) + +class NoopQueue( object ): + """ + Implements the JobQueue / JobStopQueue interface but does nothing + """ + def put( self, *args ): + return + def shutdown( self ): + return \ No newline at end of file diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/model/__init__.py Fri Oct 31 10:22:46 2008 -0400 @@ -85,7 +85,35 @@ tool = app.toolbox.tools_by_id[self.tool_id] param_dict = tool.params_from_strings( param_dict, app ) return param_dict - + def check_if_output_datasets_deleted( self ): + """ + Return true if all of the output datasets associated with this job are + in the deleted state + """ + for dataset_assoc in self.output_datasets: + dataset = dataset_assoc.dataset + # only the originator of the job can delete a dataset to cause + # cancellation of the job, no need to loop through history_associations + if not dataset.deleted: + return False + return True + def mark_deleted( self ): + """ + Mark this job as deleted, and mark any output datasets as discarded. + """ + self.state = Job.states.DELETED + self.info = "Job output deleted by user before job completed." + for dataset_assoc in self.output_datasets: + dataset = dataset_assoc.dataset + dataset.deleted = True + dataset.state = dataset.states.DISCARDED + for dataset in dataset.dataset.history_associations: + # propagate info across shared datasets + dataset.deleted = True + dataset.blurb = 'deleted' + dataset.peek = 'Job deleted' + dataset.info = 'Job output deleted by user before job completed' + class JobParameter( object ): def __init__( self, name, value ): self.name = name diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/tools/__init__.py Fri Oct 31 10:22:46 2008 -0400 @@ -797,7 +797,7 @@ # Deal with the 'test' element and see if it's value changed test_param_key = group_prefix + input.test_param.name test_param_error = None - test_incoming = incoming.get( test_param_key, None ) + test_incoming = get_incoming_value( incoming, test_param_key, None ) if test_param_key not in incoming \ and "__force_update__" + test_param_key not in incoming \ and update_only: @@ -878,7 +878,7 @@ except: pass if not incoming_value_generated: - incoming_value = incoming.get( key, None ) + incoming_value = get_incoming_value( incoming, key, None ) value, error = check_param( trans, input, incoming_value, context ) if input.dependent_params and state[ input.name ] != value: # We need to keep track of changed dependency parametrs ( parameters @@ -1362,3 +1362,12 @@ else: return val +def get_incoming_value( incoming, key, default ): + if "__" + key + "__is_composite" in incoming: + composite_keys = incoming["__" + key + "__keys"].split() + value = dict() + for composite_key in composite_keys: + value[composite_key] = incoming[key + "_" + composite_key] + return value + else: + return incoming.get( key, default ) \ No newline at end of file diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/tools/actions/upload.py --- a/lib/galaxy/tools/actions/upload.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/tools/actions/upload.py Fri Oct 31 10:22:46 2008 -0400 @@ -26,12 +26,22 @@ temp_name = "" data_list = [] - if 'filename' in dir( data_file ): + if 'local_filename' in dir( data_file ): + # Use the existing file try: file_name = data_file.filename file_name = file_name.split( '\\' )[-1] file_name = file_name.split( '/' )[-1] - data_list.append( self.add_file( trans, data_file.file, file_name, file_type, dbkey, space_to_tab=space_to_tab ) ) + data_list.append( self.add_file( trans, data_file.local_filename, file_name, file_type, dbkey, space_to_tab=space_to_tab ) ) + except Exception, e: + return self.upload_empty( trans, "Error:", str( e ) ) + elif 'filename' in dir( data_file ): + try: + file_name = data_file.filename + file_name = file_name.split( '\\' )[-1] + file_name = file_name.split( '/' )[-1] + temp_name = sniff.stream_to_file( data_file.file ) + data_list.append( self.add_file( trans, temp_name, file_name, file_type, dbkey, space_to_tab=space_to_tab ) ) except Exception, e: return self.upload_empty( trans, "Error:", str( e ) ) if url_paste not in [ None, "" ]: @@ -41,7 +51,8 @@ line = line.rstrip( '\r\n' ) if line: try: - data_list.append( self.add_file( trans, urllib.urlopen( line ), line, file_type, dbkey, info="uploaded url", space_to_tab=space_to_tab ) ) + temp_name = sniff.stream_to_file( urllib.urlopen( line ) ) + data_list.append( self.add_file( trans, temp_name, line, file_type, dbkey, info="uploaded url", space_to_tab=space_to_tab ) ) except Exception, e: return self.upload_empty( trans, "Error:", str( e ) ) else: @@ -53,7 +64,8 @@ break if is_valid: try: - data_list.append( self.add_file( trans, StringIO.StringIO( url_paste ), 'Pasted Entry', file_type, dbkey, info="pasted entry", space_to_tab=space_to_tab ) ) + temp_name = sniff.stream_to_file( StringIO.StringIO( url_paste ) ) + data_list.append( self.add_file( trans, temp_name, 'Pasted Entry', file_type, dbkey, info="pasted entry", space_to_tab=space_to_tab ) ) except Exception, e: return self.upload_empty( trans, "Error:", str( e ) ) else: @@ -77,9 +89,8 @@ trans.app.model.flush() return dict( output=data ) - def add_file( self, trans, file_obj, file_name, file_type, dbkey, info=None, space_to_tab=False ): + def add_file( self, trans, temp_name, file_name, file_type, dbkey, info=None, space_to_tab=False ): data_type = None - temp_name = sniff.stream_to_file( file_obj ) # See if we have an empty file if not os.path.getsize( temp_name ) > 0: diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/tools/parameters/__init__.py --- a/lib/galaxy/tools/parameters/__init__.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/tools/parameters/__init__.py Fri Oct 31 10:22:46 2008 -0400 @@ -16,9 +16,7 @@ value = incoming_value error = None try: - if param.name == 'file_data': - pass - elif value is not None or isinstance(param, DataToolParameter): + if value is not None or isinstance(param, DataToolParameter): # Convert value from HTML representation value = param.from_html( value, trans, param_values ) # Allow the value to be converted if neccesary diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/tools/parameters/basic.py --- a/lib/galaxy/tools/parameters/basic.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/tools/parameters/basic.py Fri Oct 31 10:22:46 2008 -0400 @@ -2,12 +2,13 @@ Basic tool parameters. """ -import logging, string, sys, os +import logging, string, sys, os, os.path from elementtree.ElementTree import XML, Element from galaxy import config, datatypes, util from galaxy.web import form_builder +from galaxy.util.bunch import Bunch import validation, dynamic_options @@ -294,6 +295,23 @@ self.name = elem.get( 'name' ) def get_html_field( self, trans=None, value=None, other_values={} ): return form_builder.FileField( self.name ) + def from_html( self, value, trans=None, other_values={} ): + # Middleware or proxies may encode files in special ways (TODO: this + # should be pluggable) + if type( value ) == dict: + upload_location = self.tool.app.config.nginx_upload_location + assert upload_location, \ + "Request appears to have been processed by nginx_upload_module \ + but Galaxy is not configured to recgonize it" + # Check that the file is in the right location + local_filename = os.path.abspath( value['path'] ) + assert local_filename.startswith( upload_location ), \ + "Filename provided by nginx is not in correct directory" + value = Bunch( + filename = value["name"], + local_filename = local_filename + ) + return value def get_required_enctype( self ): """ File upload elements require the multipart/form-data encoding diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/web/controllers/root.py --- a/lib/galaxy/web/controllers/root.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/web/controllers/root.py Fri Oct 31 10:22:46 2008 -0400 @@ -266,6 +266,28 @@ return trans.fill_template( "/dataset/edit_attributes.mako", data=data, datatypes=ldatatypes, err=None ) + def __delete_dataset( self, trans, id ): + data = self.app.model.HistoryDatasetAssociation.get( id ) + if data: + # Walk up parent datasets to find the containing history + topmost_parent = data + while topmost_parent.parent: + topmost_parent = topmost_parent.parent + assert topmost_parent in trans.history.datasets, "Data does not belong to current history" + # Mark deleted and cleanup + data.mark_deleted() + data.clear_associated_files() + trans.log_event( "Dataset id %s marked as deleted" % str(id) ) + if data.parent_id is None and len( data.creating_job_associations ) > 0: + # Mark associated job for deletion + job = data.creating_job_associations[0].job + if job.state not in [ model.Job.states.QUEUED, model.Job.states.RUNNING, model.Job.states.NEW ]: + return + # Are *all* of the job's other output datasets deleted? + if job.check_if_output_datasets_deleted(): + job.mark_deleted() + self.app.model.flush() + @web.expose def delete( self, trans, id = None, show_deleted_on_refresh = False, **kwd): if id: @@ -276,26 +298,10 @@ history = trans.get_history() for id in dataset_ids: try: - int( id ) + id = int( id ) except: continue - data = self.app.model.HistoryDatasetAssociation.get( id ) - if data: - # Walk up parent datasets to find the containing history - topmost_parent = data - while topmost_parent.parent: - topmost_parent = topmost_parent.parent - assert topmost_parent in history.datasets, "Data does not belong to current history" - # Mark deleted and cleanup - data.mark_deleted() - data.clear_associated_files() - self.app.model.flush() - trans.log_event( "Dataset id %s marked as deleted" % str(id) ) - if data.parent_id is None: - try: - self.app.job_stop_queue.put( data.creating_job_associations[0].job ) - except IndexError: - pass # upload tool will cause this since it doesn't have a job + self.__delete_dataset( trans, id ) return self.history( trans, show_deleted = show_deleted_on_refresh ) @web.expose @@ -305,24 +311,7 @@ int( id ) except: return "Dataset id '%s' is invalid" %str( id ) - history = trans.get_history() - data = self.app.model.HistoryDatasetAssociation.get( id ) - if data: - # Walk up parent datasets to find the containing history - topmost_parent = data - while topmost_parent.parent: - topmost_parent = topmost_parent.parent - assert topmost_parent in history.datasets, "Data does not belong to current history" - # Mark deleted and cleanup - data.mark_deleted() - data.clear_associated_files() - self.app.model.flush() - trans.log_event( "Dataset id %s marked as deleted async" % str(id) ) - if data.parent_id is None: - try: - self.app.job_stop_queue.put( data.creating_job_associations[0].job ) - except IndexError: - pass # upload tool will cause this since it doesn't have a job + self.__delete_dataset( trans, id ) return "OK" ## ---- History management ----------------------------------------------- diff -r 7e94f40ee9e8 -r c4644668afff lib/galaxy/web/framework/base.py --- a/lib/galaxy/web/framework/base.py Thu Oct 30 17:34:46 2008 -0400 +++ b/lib/galaxy/web/framework/base.py Fri Oct 31 10:22:46 2008 -0400 @@ -5,6 +5,7 @@ import socket import types import logging +import os.path import sys from Cookie import SimpleCookie @@ -132,16 +133,16 @@ if callable( body ): # Assume the callable is another WSGI application to run return body( environ, start_response ) + elif isinstance( body, types.FileType ): + # Stream the file back to the browser + return send_file( start_response, trans, body ) else: start_response( trans.response.wsgi_status(), trans.response.wsgi_headeritems() ) return self.make_body_iterable( trans, body ) def make_body_iterable( self, trans, body ): - if isinstance( body, types.FileType ): - # Stream the file back to the browser - return iterate_file( body ) - elif isinstance( body, ( types.GeneratorType, list, tuple ) ): + if isinstance( body, ( types.GeneratorType, list, tuple ) ): # Recursively stream the iterable return flatten( body ) elif isinstance( body, basestring ): @@ -302,6 +303,20 @@ CHUNK_SIZE = 2**16 +def send_file( start_response, trans, body ): + # If configured use X-Accel-Redirect header for nginx + base = trans.app.config.nginx_x_accel_redirect_base + if base: + trans.response.headers['X-Accel-Redirect'] = \ + base + os.path.abspath( body.name ) + body = [ "" ] + # Fall back on sending the file in chunks + else: + body = iterate_file( body ) + start_response( trans.response.wsgi_status(), + trans.response.wsgi_headeritems() ) + return body + def iterate_file( file ): """ Progressively return chunks from `file`. diff -r 7e94f40ee9e8 -r c4644668afff universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample Thu Oct 30 17:34:46 2008 -0400 +++ b/universe_wsgi.ini.sample Fri Oct 31 10:22:46 2008 -0400 @@ -34,7 +34,7 @@ # Database connection database_file = database/universe.sqlite # You may use a SQLAlchemy connection string to specify an external database instead -## database_connection = postgres:///galaxy_test +## database_connection = postgres:///galaxy ## database_engine_option_echo = true ## database_engine_option_echo_pool = true ## database_engine_option_pool_size = 10 @@ -89,12 +89,15 @@ # Write thread status periodically to 'heartbeat.log' (careful, uses disk space rapidly!) ## use_heartbeat = True +# Enable the memory debugging interface (careful, negatively impacts server performance) +## use_memdump = True + # Profiling middleware (cProfile based) ## use_profile = True -# Mail -smtp_server = coltrane.bx.psu.edu -error_email_to = galaxy-bugs@bx.psu.edu +# For use by 'report this error' link on error-state datasets +#smtp_server = smtp.example.org +#error_email_to = galaxy-bugs@example.org # Use the new iframe / javascript based layout use_new_layout = true @@ -120,29 +123,34 @@ ## wiki_url: replaces the default galaxy main wiki ## bugs_email: replaces the default galaxy bugs email list #brand = Private local mirror -#wiki_url=/path/to/my/local/wiki -#bugs_email=mailto:bugmaster@this.site.com +#wiki_url = /path/to/my/local/wiki +#bugs_email = mailto:galaxy-bugs@example.org # ---- Job Runners ---------------------------------------------------------- # Clustering Galaxy is not a straightforward process and requires a lot of -# pre-configuration. See the ClusteringGalaxy Wiki before attempting to set any -# of these options. If running normally (without a cluster), do not change -# anything in this section. +# pre-configuration. See the ClusteringGalaxy Wiki before attempting to set +# any of these options: +# +# http://g2.trac.bx.psu.edu/wiki/ClusteringGalaxy +# +# If running normally (without a cluster), do not change anything in this +# section. # start_job_runners: Comma-separated list of job runners to start. local is # always started. If left commented, no jobs will be run on the cluster, even # if a cluster URL is explicitly defined in the [galaxy:tool_runners] section -# below. The only runner currently available is 'pbs'. +# below. The runners currently available are 'pbs' and 'sge'. #start_job_runners = pbs # default_cluster_job_runner: The URL for the default runner to use when a tool # doesn't explicity define a runner below. For help on the cluster URL format, -# see the ClusteringGalaxy Wiki. Leave commented if not using a cluster job runner. +# see the ClusteringGalaxy Wiki. Leave commented if not using a cluster job +# runner. #default_cluster_job_runner = pbs:/// # The PBS options are described in detail in the Galaxy Configuration section of -# the ClusteringGalaxy Wiki +# the ClusteringGalaxy Wiki, and are only necessary when using file staging. #pbs_application_server = #pbs_stage_path = #pbs_dataset_server = @@ -152,8 +160,6 @@ [galaxy:tool_runners] biomart = local:/// -blat2wig = pbs:///blast -blat_wrapper = pbs:///blast encode_db1 = local:/// encode_import_all_latest_datasets1 = local:/// encode_import_chromatin_and_chromosomes1 = local:/// @@ -161,13 +167,8 @@ encode_import_genes_and_transcripts1 = local:/// encode_import_multi-species_sequence_analysis1 = local:/// encode_import_transcription_regulation1 = local:/// -generate_coverage_report = pbs:///blast hbvar = local:/// -hist_high_quality_score = pbs:///blast -megablast_wrapper = pbs:///blast -megablast_xml_parser = pbs:///blast microbial_import1 = local:/// -quality_score_distribution = pbs:///blast ucsc_table_direct1 = local:/// ucsc_table_direct_archaea1 = local:/// ucsc_table_direct_test1 = local:///
participants (1)
-
Greg Von Kuster