[hg] galaxy 3077: Cleaned up code a bit (i.e., deleted temporary...
details: http://www.bx.psu.edu/hg/galaxy/rev/be631ed97541 changeset: 3077:be631ed97541 user: Enis Afgan <afgane@gmail.com> date: Thu Oct 29 11:46:43 2009 -0400 description: Cleaned up code a bit (i.e., deleted temporary code/comments). diffstat: lib/galaxy/cloud/__init__.py | 701 +------------------------------------ lib/galaxy/cloud/providers/ec2.py | 93 +---- lib/galaxy/cloud/providers/eucalyptus.py | 100 +---- lib/galaxy/web/controllers/cloud.py | 235 ------------ 4 files changed, 23 insertions(+), 1106 deletions(-) diffs (1457 lines): diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/cloud/__init__.py --- a/lib/galaxy/cloud/__init__.py Wed Oct 28 16:35:36 2009 -0400 +++ b/lib/galaxy/cloud/__init__.py Thu Oct 29 11:46:43 2009 -0400 @@ -47,40 +47,11 @@ self.provider = CloudProvider( app ) # Monitor for updating status of cloud instances self.cloud_monitor = CloudMonitor( self.app, self.provider ) -# self.job_stop_queue = JobStopQueue( app, self.dispatcher ) else: self.job_queue = self.job_stop_queue = NoopCloudMonitor() def shutdown( self ): self.cloud_monitor.shutdown() -# self.job_stop_queue.shutdown() - -# def createUCI( self, user, name, storage_size, zone=None): -# """ -# Createse User Configured Instance (UCI). Essentially, creates storage volume. -# """ -# self.provider.createUCI( user, name, storage_size, zone ) -# -# def deleteUCI( self, name ): -# """ -# Deletes UCI. NOTE that this implies deletion of any and all data associated -# with this UCI from the cloud. All data will be deleted. -# """ -# -# def addStorageToUCI( self, name ): -# """ Adds more storage to specified UCI """ -# -# def startUCI( self, name, type ): -# """ -# Starts an instance of named UCI on the cloud. This implies, mounting of -# storage and starting Galaxy instance. -# """ -# -# def stopUCI( self, name ): -# """ -# Stops cloud instance associated with named UCI. This also implies -# stopping of Galaxy and unmounting of the file system. -# """ class Sleeper( object ): """ @@ -110,9 +81,7 @@ # Keep track of the pid that started the cloud manager, only it # has valid threads self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True -# self.queue = Queue() - + # Contains requests that are waiting (only use from monitor thread) self.waiting = [] @@ -123,33 +92,6 @@ self.monitor_thread = threading.Thread( target=self.__monitor ) self.monitor_thread.start() log.info( "Cloud manager started" ) -# if app.config.get_bool( 'enable_job_recovery', True ): -# self.__check_jobs_at_startup() - - def __check_jobs_at_startup( self ): - """ - Checks all jobs that are in the 'new', 'queued' or 'running' state in - the database and requeues or cleans up as necessary. Only run as the - job manager starts. - """ - model = self.app.model - for job in model.Job.filter( model.Job.c.state==model.Job.states.NEW ).all(): - if job.tool_id not in self.app.toolbox.tools_by_id: - log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) - JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) - else: - log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id ) - self.queue.put( ( job.id, job.tool_id ) ) - for job in model.Job.filter( (model.Job.c.state == model.Job.states.RUNNING) | (model.Job.c.state == model.Job.states.QUEUED) ).all(): - if job.tool_id not in self.app.toolbox.tools_by_id: - log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) - JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) - elif job.job_runner_name is None: - log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id ) - self.queue.put( ( job.id, job.tool_id ) ) - else: - job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self ) - self.dispatcher.recover( job, job_wrapper ) def __monitor( self ): """ @@ -164,7 +106,7 @@ try: # log.debug( "Calling monitor_step" ) self.__monitor_step() - if cnt%30 == 0: # Run global update every 30 seconds (1 minute) + if cnt%30 == 0: # Run global update every 30 iterations (1 minute) self.provider.update() cnt = 0 except: @@ -186,166 +128,24 @@ """ # Get an orm (object relational mapping) session session = mapping.Session() - # Pull all new jobs from the queue at once new_requests = [] -# new_instances = [] -# new_UCIs = [] -# stop_UCIs = [] -# delete_UCIs = [] - -# for r in session.query( model.cloud_instance ).filter( model.cloud_instance.s.state == model.cloud_instance.states.NEW ).all(): -# new_instances - + for r in session.query( model.UCI ) \ - .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, #"newUCI", - model.UCI.c.state==uci_states.SUBMITTED_UCI, #"submittedUCI", - model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, #"shutting-downUCI", + .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, + model.UCI.c.state==uci_states.SUBMITTED_UCI, + model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, model.UCI.c.state==uci_states.DELETING_UCI ) ) \ .all(): uci_wrapper = UCIwrapper( r ) new_requests.append( uci_wrapper ) -# log.debug( 'new_requests: %s' % new_requests ) + for uci_wrapper in new_requests: session.clear() -# log.debug( 'r.name: %s, state: %s' % ( r.name, r.state ) ) -# session.save_or_update( r ) -# session.flush() self.provider.put( uci_wrapper ) # Done with the session mapping.Session.remove() - -# for r in session.query( model.UCI ).filter( model.UCI.c.state == "submitted" ).all(): -# new_instances.append( r ) -# for r in new_instances: -# self.provider.startUCI( r ) -# -# for r in session.query( model.UCI ).filter( model.UCI.c.state == "shutting-down" ).all(): -# stop_UCIs.append( r ) -# for r in stop_UCIs: -# self.provider.stopUCI( r ) -# -# for r in session.query( model.UCI ).filter( model.UCI.c.state == "deleting" ).all(): -# delete_UCIs.append( r ) -# for r in delete_UCIs: -# self.provider.deleteUCI( r ) - - - -# if self.track_jobs_in_database: -# for j in session.query( model.Job ).options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ).filter( model.Job.c.state == model.Job.states.NEW ).all(): -# job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self ) -# new_jobs.append( job ) -# else: -# try: -# while 1: -# message = self.queue.get_nowait() -# if message is self.STOP_SIGNAL: -# return -# # Unpack the message -# job_id, tool_id = message -# # Create a job wrapper from it -# job_entity = session.query( model.Job ).get( job_id ) -# job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self ) -# # Append to watch queue -# new_jobs.append( job ) -# except Empty: -# pass -# # Iterate over new and waiting jobs and look for any that are -# # ready to run -# new_waiting = [] -# for job in ( new_jobs + self.waiting ): -# try: -# # Clear the session for each job so we get fresh states for -# # job and all datasets -# session.clear() -# # Get the real job entity corresponding to the wrapper (if we -# # are tracking in the database this is probably cached in -# # the session from the origianl query above) -# job_entity = session.query( model.Job ).get( job.job_id ) -# # Check the job's dependencies, requeue if they're not done -# job_state = self.__check_if_ready_to_run( job, job_entity ) -# if job_state == JOB_WAIT: -# if not self.track_jobs_in_database: -# new_waiting.append( job ) -# elif job_state == JOB_ERROR: -# log.info( "job %d ended with an error" % job.job_id ) -# elif job_state == JOB_INPUT_ERROR: -# log.info( "job %d unable to run: one or more inputs in error state" % job.job_id ) -# elif job_state == JOB_INPUT_DELETED: -# log.info( "job %d unable to run: one or more inputs deleted" % job.job_id ) -# elif job_state == JOB_READY: -# # If special queuing is enabled, put the ready jobs in the special queue -# if self.use_policy : -# self.squeue.put( job ) -# log.debug( "job %d put in policy queue" % job.job_id ) -# else: # or dispatch the job directly -# self.dispatcher.put( job ) -# log.debug( "job %d dispatched" % job.job_id) -# elif job_state == JOB_DELETED: -# msg = "job %d deleted by user while still queued" % job.job_id -# job.info = msg -# log.debug( msg ) -# elif job_state == JOB_ADMIN_DELETED: -# job.fail( job_entity.info ) -# log.info( "job %d deleted by admin while still queued" % job.job_id ) -# else: -# msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id ) -# job.info = msg -# log.error( msg ) -# except Exception, e: -# job.info = "failure running job %d: %s" % ( job.job_id, str( e ) ) -# log.exception( "failure running job %d" % job.job_id ) -# # Update the waiting list -# self.waiting = new_waiting -# # If special (e.g. fair) scheduling is enabled, dispatch all jobs -# # currently in the special queue -# if self.use_policy : -# while 1: -# try: -# sjob = self.squeue.get() -# self.dispatcher.put( sjob ) -# log.debug( "job %d dispatched" % sjob.job_id ) -# except Empty: -# # squeue is empty, so stop dispatching -# break -# except Exception, e: # if something else breaks while dispatching -# job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) ) -# log.exception( "failure running job %d" % sjob.job_id ) -# # Done with the session -# mapping.Session.remove() - - def __check_if_ready_to_run( self, job_wrapper, job ): - """ - Check if a job is ready to run by verifying that each of its input - datasets is ready (specifically in the OK state). If any input dataset - has an error, fail the job and return JOB_INPUT_ERROR. If any input - dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all - input datasets are in OK state, return JOB_READY indicating that the - job can be dispatched. Otherwise, return JOB_WAIT indicating that input - datasets are still being prepared. - """ - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - for dataset_assoc in job.input_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - job_wrapper.fail( "input data %d is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT - return JOB_READY - + def put( self, job_id, tool ): """Add a job to the queue (by job identifier)""" if not self.track_jobs_in_database: @@ -355,13 +155,11 @@ def shutdown( self ): """Attempts to gracefully shut down the worker thread""" if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing + # We're not the real queue, do nothing return else: log.info( "sending stop signal to worker thread" ) self.running = False -# if not self.track_jobs_in_database: -# self.queue.put( self.STOP_SIGNAL ) self.sleeper.wake() log.info( "cloud manager stopped" ) self.dispatcher.shutdown() @@ -649,11 +447,6 @@ def get_all_stores( self ): """ Returns all storage volumes' database objects associated with this UCI. """ return model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() -# svs = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() -# svl = [] # storage volume list -# for sv in svs: -# svl.append( sv.volume_id ) -# return svl def get_uci( self ): """ Returns database object for given UCI. """ @@ -679,336 +472,6 @@ uci.state = 'deleted' # for bookkeeping reasons, mark as deleted but don't actually delete. uci.flush() -#class JobWrapper( object ): -# """ -# Wraps a 'model.Job' with convience methods for running processes and -# state management. -# """ -# def __init__(self, job, tool, queue ): -# self.job_id = job.id -# # This is immutable, we cache it for the scheduling policy to use if needed -# self.session_id = job.session_id -# self.tool = tool -# self.queue = queue -# self.app = queue.app -# self.extra_filenames = [] -# self.command_line = None -# self.galaxy_lib_dir = None -# # With job outputs in the working directory, we need the working -# # directory to be set before prepare is run, or else premature deletion -# # and job recovery fail. -# self.working_directory = \ -# os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) -# self.output_paths = None -# self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally -# -# def get_param_dict( self ): -# """ -# Restore the dictionary of parameters from the database. -# """ -# job = model.Job.get( self.job_id ) -# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) -# param_dict = self.tool.params_from_strings( param_dict, self.app ) -# return param_dict -# -# def prepare( self ): -# """ -# Prepare the job to run by creating the working directory and the -# config files. -# """ -# mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner -# if not os.path.exists( self.working_directory ): -# os.mkdir( self.working_directory ) -# # Restore parameters from the database -# job = model.Job.get( self.job_id ) -# incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) -# incoming = self.tool.params_from_strings( incoming, self.app ) -# # Do any validation that could not be done at job creation -# self.tool.handle_unvalidated_param_values( incoming, self.app ) -# # Restore input / output data lists -# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) -# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) -# # These can be passed on the command line if wanted as $userId $userEmail -# if job.history.user: # check for anonymous user! -# userId = '%d' % job.history.user.id -# userEmail = str(job.history.user.email) -# else: -# userId = 'Anonymous' -# userEmail = 'Anonymous' -# incoming['userId'] = userId -# incoming['userEmail'] = userEmail -# # Build params, done before hook so hook can use -# param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) -# # Certain tools require tasks to be completed prior to job execution -# # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). -# if self.tool.tool_type is not None: -# out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) -# # Run the before queue ("exec_before_job") hook -# self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, -# out_data=out_data, tool=self.tool, param_dict=incoming) -# mapping.context.current.flush() -# # Build any required config files -# config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) -# # FIXME: Build the param file (might return None, DEPRECATED) -# param_filename = self.tool.build_param_file( param_dict, self.working_directory ) -# # Build the job's command line -# self.command_line = self.tool.build_command_line( param_dict ) -# # FIXME: for now, tools get Galaxy's lib dir in their path -# if self.command_line and self.command_line.startswith( 'python' ): -# self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root -# # We need command_line persisted to the db in order for Galaxy to re-queue the job -# # if the server was stopped and restarted before the job finished -# job.command_line = self.command_line -# job.flush() -# # Return list of all extra files -# extra_filenames = config_filenames -# if param_filename is not None: -# extra_filenames.append( param_filename ) -# self.param_dict = param_dict -# self.extra_filenames = extra_filenames -# return extra_filenames -# -# def fail( self, message, exception=False ): -# """ -# Indicate job failure by setting state and message on all output -# datasets. -# """ -# job = model.Job.get( self.job_id ) -# job.refresh() -# # if the job was deleted, don't fail it -# if not job.state == model.Job.states.DELETED: -# # Check if the failure is due to an exception -# if exception: -# # Save the traceback immediately in case we generate another -# # below -# job.traceback = traceback.format_exc() -# # Get the exception and let the tool attempt to generate -# # a better message -# etype, evalue, tb = sys.exc_info() -# m = self.tool.handle_job_failure_exception( evalue ) -# if m: -# message = m -# if self.app.config.outputs_to_working_directory: -# for dataset_path in self.get_output_fnames(): -# try: -# shutil.move( dataset_path.false_path, dataset_path.real_path ) -# log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) -# except ( IOError, OSError ), e: -# log.error( "fail(): Missing output file in working directory: %s" % e ) -# for dataset_assoc in job.output_datasets: -# dataset = dataset_assoc.dataset -# dataset.refresh() -# dataset.state = dataset.states.ERROR -# dataset.blurb = 'tool error' -# dataset.info = message -# dataset.set_size() -# dataset.flush() -# job.state = model.Job.states.ERROR -# job.command_line = self.command_line -# job.info = message -# job.flush() -# # If the job was deleted, just clean up -# self.cleanup() -# -# def change_state( self, state, info = False ): -# job = model.Job.get( self.job_id ) -# job.refresh() -# for dataset_assoc in job.output_datasets: -# dataset = dataset_assoc.dataset -# dataset.refresh() -# dataset.state = state -# if info: -# dataset.info = info -# dataset.flush() -# if info: -# job.info = info -# job.state = state -# job.flush() -# -# def get_state( self ): -# job = model.Job.get( self.job_id ) -# job.refresh() -# return job.state -# -# def set_runner( self, runner_url, external_id ): -# job = model.Job.get( self.job_id ) -# job.refresh() -# job.job_runner_name = runner_url -# job.job_runner_external_id = external_id -# job.flush() -# -# def finish( self, stdout, stderr ): -# """ -# Called to indicate that the associated command has been run. Updates -# the output datasets based on stderr and stdout from the command, and -# the contents of the output files. -# """ -# # default post job setup -# mapping.context.current.clear() -# job = model.Job.get( self.job_id ) -# # if the job was deleted, don't finish it -# if job.state == job.states.DELETED: -# self.cleanup() -# return -# elif job.state == job.states.ERROR: -# # Job was deleted by an administrator -# self.fail( job.info ) -# return -# if stderr: -# job.state = "error" -# else: -# job.state = 'ok' -# if self.app.config.outputs_to_working_directory: -# for dataset_path in self.get_output_fnames(): -# try: -# shutil.move( dataset_path.false_path, dataset_path.real_path ) -# log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) -# except ( IOError, OSError ): -# self.fail( "Job %s's output dataset(s) could not be read" % job.id ) -# return -# for dataset_assoc in job.output_datasets: -# #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur -# for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running -# dataset.blurb = 'done' -# dataset.peek = 'no peek' -# dataset.info = stdout + stderr -# dataset.set_size() -# if stderr: -# dataset.blurb = "error" -# elif dataset.has_data(): -# #if a dataset was copied, it won't appear in our dictionary: -# #either use the metadata from originating output dataset, or call set_meta on the copies -# #it would be quicker to just copy the metadata from the originating output dataset, -# #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() -# if not self.external_output_metadata.external_metadata_set_successfully( dataset ): -# # Only set metadata values if they are missing... -# dataset.set_meta( overwrite = False ) -# else: -# #load metadata from file -# #we need to no longer allow metadata to be edited while the job is still running, -# #since if it is edited, the metadata changed on the running output will no longer match -# #the metadata that was stored to disk for use via the external process, -# #and the changes made by the user will be lost, without warning or notice -# dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out ) -# if self.tool.is_multi_byte: -# dataset.set_multi_byte_peek() -# else: -# dataset.set_peek() -# else: -# dataset.blurb = "empty" -# dataset.flush() -# if stderr: -# dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR -# else: -# dataset_assoc.dataset.dataset.state = model.Dataset.states.OK -# dataset_assoc.dataset.dataset.flush() -# -# # Save stdout and stderr -# if len( stdout ) > 32768: -# log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) -# job.stdout = stdout[:32768] -# if len( stderr ) > 32768: -# log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) -# job.stderr = stderr[:32768] -# # custom post process setup -# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) -# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) -# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows -# param_dict = self.tool.params_from_strings( param_dict, self.app ) -# # Check for and move associated_files -# self.tool.collect_associated_files(out_data, self.working_directory) -# # Create generated output children and primary datasets and add to param_dict -# collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)} -# param_dict.update({'__collected_datasets__':collected_datasets}) -# # Certain tools require tasks to be completed after job execution -# # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). -# if self.tool.tool_type is not None: -# self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) -# # Call 'exec_after_process' hook -# self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, -# out_data=out_data, param_dict=param_dict, -# tool=self.tool, stdout=stdout, stderr=stderr ) -# # TODO -# # validate output datasets -# job.command_line = self.command_line -# mapping.context.current.flush() -# log.debug( 'job %d ended' % self.job_id ) -# self.cleanup() -# -# def cleanup( self ): -# # remove temporary files -# try: -# for fname in self.extra_filenames: -# os.remove( fname ) -# if self.working_directory is not None: -# shutil.rmtree( self.working_directory ) -# if self.app.config.set_metadata_externally: -# self.external_output_metadata.cleanup_external_metadata() -# except: -# log.exception( "Unable to cleanup job %d" % self.job_id ) -# -# def get_command_line( self ): -# return self.command_line -# -# def get_session_id( self ): -# return self.session_id -# -# def get_input_fnames( self ): -# job = model.Job.get( self.job_id ) -# filenames = [] -# for da in job.input_datasets: #da is JobToInputDatasetAssociation object -# if da.dataset: -# filenames.append( da.dataset.file_name ) -# #we will need to stage in metadata file names also -# #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.) -# for key, value in da.dataset.metadata.items(): -# if isinstance( value, model.MetadataFile ): -# filenames.append( value.file_name ) -# return filenames -# -# def get_output_fnames( self ): -# if self.output_paths is not None: -# return self.output_paths -# -# class DatasetPath( object ): -# def __init__( self, real_path, false_path = None ): -# self.real_path = real_path -# self.false_path = false_path -# def __str__( self ): -# if self.false_path is None: -# return self.real_path -# else: -# return self.false_path -# -# job = model.Job.get( self.job_id ) -# if self.app.config.outputs_to_working_directory: -# self.output_paths = [] -# for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]: -# false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) -# self.output_paths.append( DatasetPath( data.file_name, false_path ) ) -# else: -# self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ] -# return self.output_paths -# -# def check_output_sizes( self ): -# sizes = [] -# output_paths = self.get_output_fnames() -# for outfile in [ str( o ) for o in output_paths ]: -# sizes.append( ( outfile, os.stat( outfile ).st_size ) ) -# return sizes -# def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ): -# if tmp_dir is None: -# #this dir should should relative to the exec_dir -# tmp_dir = self.app.config.new_file_path -# if dataset_files_path is None: -# dataset_files_path = self.app.model.Dataset.file_path -# if config_root is None: -# config_root = self.app.config.root -# if datatypes_config is None: -# datatypes_config = self.app.config.datatypes_config -# job = model.Job.get( self.job_id ) -# return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds ) - class CloudProvider( object ): def __init__( self, app ): import providers.eucalyptus @@ -1018,63 +481,11 @@ self.cloud_provider = {} self.cloud_provider["eucalyptus"] = providers.eucalyptus.EucalyptusCloudProvider( app ) self.cloud_provider["ec2"] = providers.ec2.EC2CloudProvider( app ) - -# start_cloud_provider = None -# if app.config.start_job_runners is not None: -# start_cloud_provider.extend( app.config.start_job_runners.split(",") ) -# for provider_name in start_cloud_provider: -# self.provider_name = app.config.cloud_provider -# if self.provider_name == "eucalyptus": -# import providers.eucalyptus -# self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app ) -# elif self.provider_name == "ec2": -# import providers.ec2 -# self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app ) -# else: -# log.error( "Unable to start unknown cloud provider: %s" %self.provider_name ) - + def put( self, uci_wrapper ): """ Put given request for UCI manipulation into provider's request queue.""" -# log.debug( "Adding UCI '%s' manipulation request into cloud manager's queue." % uci_wrapper.name ) self.cloud_provider[uci_wrapper.get_provider_type()].put( uci_wrapper ) - - - - def createUCI( self, uci ): - """ - Createse User Configured Instance (UCI). Essentially, creates storage volume. - """ - log.debug( "Creating UCI '%s'" % uci.name ) - self.cloud_provider[self.provider_name].createUCI( uci ) - - def deleteUCI( self, uci ): - """ - Deletes UCI. NOTE that this implies deletion of any and all data associated - with this UCI from the cloud. All data will be deleted. - """ - log.debug( "Deleting UCI '%s'" % uci.name ) - self.cloud_provider[self.provider_name].deleteUCI( uci ) - - def addStorageToUCI( self, uci ): - """ Adds more storage to specified UCI """ - - def startUCI( self, uci ): - """ - Starts an instance of named UCI on the cloud. This implies, mounting of - storage and starting Galaxy instance. - """ - log.debug( "Starting UCI '%s'" % uci.name ) - self.cloud_provider[self.provider_name].startUCI( uci ) - - def stopUCI( self, uci ): - """ - Stops cloud instance associated with named UCI. This also implies - stopping of Galaxy and unmounting of the file system. - """ - log.debug( "Stopping UCI '%s'" % uci.name ) - self.cloud_provider[self.provider_name].stopUCI( uci ) - def update( self ): """ Runs a global status update across all providers for all UCIs in state other than 'terminated' and 'available'. @@ -1084,102 +495,10 @@ # log.debug( "Running global update for provider: '%s'" % provider ) self.cloud_provider[provider].update() - def recover( self, job, job_wrapper ): - runner_name = ( job.job_runner_name.split(":", 1) )[0] - log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) - self.cloud_provider[runner_name].recover( job, job_wrapper ) - def shutdown( self ): for runner in self.cloud_provider.itervalues(): runner.shutdown() -class JobStopQueue( object ): - """ - A queue for jobs which need to be terminated prematurely. - """ - STOP_SIGNAL = object() - def __init__( self, app, dispatcher ): - self.app = app - self.dispatcher = dispatcher - - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - - # Contains jobs that are waiting (only use from monitor thread) - self.waiting = [] - - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.monitor_thread = threading.Thread( target=self.monitor ) - self.monitor_thread.start() - log.info( "job stopper started" ) - - def monitor( self ): - """ - Continually iterate the waiting jobs, stop any that are found. - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def monitor_step( self ): - """ - Called repeatedly by `monitor` to stop jobs. - """ - # Pull all new jobs from the queue at once - jobs = [] - try: - while 1: - ( job_id, error_msg ) = self.queue.get_nowait() - if job_id is self.STOP_SIGNAL: - return - # Append to watch queue - jobs.append( ( job_id, error_msg ) ) - except Empty: - pass - - for job_id, error_msg in jobs: - job = model.Job.get( job_id ) - job.refresh() - # if desired, error the job so we can inform the user. - if error_msg is not None: - job.state = job.states.ERROR - job.info = error_msg - else: - job.state = job.states.DELETED - job.flush() - # if job is in JobQueue or FooJobRunner's put method, - # job_runner_name will be unset and the job will be dequeued due to - # state change above - if job.job_runner_name is not None: - # tell the dispatcher to stop the job - self.dispatcher.stop( job ) - - def put( self, job_id, error_msg=None ): - self.queue.put( ( job_id, error_msg ) ) - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - self.queue.put( ( self.STOP_SIGNAL, None ) ) - self.sleeper.wake() - log.info( "job stopper stopped" ) - class NoopCloudMonitor( object ): """ Implements the CloudMonitor interface but does nothing diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/cloud/providers/ec2.py --- a/lib/galaxy/cloud/providers/ec2.py Wed Oct 28 16:35:36 2009 -0400 +++ b/lib/galaxy/cloud/providers/ec2.py Thu Oct 29 11:46:43 2009 -0400 @@ -56,7 +56,6 @@ self.key_pair = "galaxy-keypair" self.queue = Queue() - #TODO: Use multiple threads to process requests? self.threads = [] nworkers = 5 log.info( "Starting EC2 cloud controller workers" ) @@ -72,19 +71,17 @@ while 1: uci_wrapper = self.queue.get() -# uci = uci_wrapper.get_uci() - log.debug( '[%d] uci type: %s' % ( cnt, uci_wrapper.get_name() ) ) uci_state = uci_wrapper.get_state() if uci_state is self.STOP_SIGNAL: return try: - if uci_state==uci_states.NEW: # "new": + if uci_state==uci_states.NEW: self.createUCI( uci_wrapper ) - elif uci_state==uci_states.DELETING: #"deleting": + elif uci_state==uci_states.DELETING: self.deleteUCI( uci_wrapper ) - elif uci_state==uci_states.SUBMITTED: #"submitted": + elif uci_state==uci_states.SUBMITTED: self.startUCI( uci_wrapper ) - elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down": + elif uci_state==uci_states.SHUTTING_DOWN: self.stopUCI( uci_wrapper ) except: log.exception( "Uncaught exception executing request." ) @@ -113,7 +110,6 @@ try: kp = conn.get_key_pair( self.key_pair ) for inst in instances: -# log.debug("inst: '%s'" % inst ) uci_wrapper.set_key_pair( inst, kp.name ) return kp.name except boto.exception.EC2ResponseError, e: # No keypair under this name exists so create it @@ -135,22 +131,7 @@ For valid sizes, see http://aws.amazon.com/ec2/instance-types/ """ return model.CloudImage.filter( model.CloudImage.table.c.id==2 ).first().image_id - -# def get_instances( self, uci ): -# """ -# Get objects of instances that are pending or running and are connected to uci object -# """ -# instances = trans.sa_session.query( model.CloudInstance ) \ -# .filter_by( user=user, uci_id=uci.id ) \ -# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \ -# .first() -# #.all() #TODO: return all but need to edit calling method(s) to handle list -# -# instances = uci.instance -# -# return instances - - + def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" log.info( "sending stop signal to worker threads in EC2 cloud manager" ) @@ -170,7 +151,6 @@ and registers relevant information in Galaxy database. """ conn = self.get_connection( uci_wrapper ) - # Temporary code - need to ensure user selects zone at UCI creation time! if uci_wrapper.get_uci_availability_zone()=='': log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone ) uci_wrapper.set_store_availability_zone( self.zone ) @@ -262,12 +242,11 @@ for i_index in i_indexes: mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) ) -# log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) uci_wrapper.set_mi( i_index, mi_id ) # Check if galaxy security group exists (and create it if it does not) -# log.debug( '***** Setting up security group' ) security_group = 'galaxyWeb' + log.debug( "Setting up '%s' security group." % security_group ) sgs = conn.get_all_security_groups() # security groups gsgt = False # galaxy security group test for sg in sgs: @@ -303,39 +282,6 @@ uci_wrapper.change_state( s, i_id, s ) log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) - - -# # Wait until instance gets running and then update the DB -# while s!="running": -# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) ) -# time.sleep( 15 ) -# s = reservation.i_indexes[0].update() -# -# # Update instance data in local DB -# uci.instance[0].state = s -# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name -# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name -# uci.instance[0].flush() -# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state -# # and does not connect or wait on connection between instance and volume to be established -# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all() -# vols = [] -# for v in vl: -# vols.append( v.volume_id ) -# try: -# volumes = conn.get_all_volumes( vols ) -# for i, v in enumerate( volumes ): -# uci.store[i].i_id = v.instance_id -# uci.store[i].status = v.status -# uci.store[i].device = v.device -# uci.store[i].flush() -# except BotoServerError: -# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." ) -# -# uci.state = s -# uci.flush() - - def stopUCI( self, uci_wrapper): """ Stops all of cloud instances associated with given UCI. @@ -344,14 +290,8 @@ # Get all instances associated with given UCI il = uci_wrapper.get_instances_ids() # instance list -# log.debug( 'List of instances being terminated: %s' % il ) rl = conn.get_all_instances( il ) # Reservation list associated with given instances - -# tState = conn.terminate_instances( il ) -# # TODO: Need to update instance stop time (for all individual instances) -# stop_time = datetime.utcnow() -# uci_wrapper.set_stop_time( stop_time ) - + # Initiate shutdown of all instances under given UCI cnt = 0 stopped = [] @@ -364,26 +304,7 @@ uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() ) stopped.append( inst ) -# uci_wrapper.change_state( uci_state='available' ) uci_wrapper.reset_uci_launch_time() - -# # Wait for all instances to actually terminate and update local DB -# terminated=0 -# while terminated!=len( rl ): -# for i, r in enumerate( rl ): -# log.debug( "r state: %s" % r.instances[0].state ) -# state = r.instances[0].update() -# if state=='terminated': -# uci.instance[i].state = state -# uci.instance[i].flush() -# terminated += 1 -# time.sleep ( 5 ) -# -# # Reset UCI state -# uci.state = 'available' -# uci.launch_time = None -# uci.flush() -# log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() ) diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/cloud/providers/eucalyptus.py --- a/lib/galaxy/cloud/providers/eucalyptus.py Wed Oct 28 16:35:36 2009 -0400 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Thu Oct 29 11:46:43 2009 -0400 @@ -56,7 +56,6 @@ self.key_pair = "galaxy-keypair" self.queue = Queue() - #TODO: Use multiple threads to process requests? self.threads = [] nworkers = 5 log.info( "Starting eucalyptus cloud controller workers" ) @@ -72,19 +71,18 @@ while 1: uci_wrapper = self.queue.get() -# uci = uci_wrapper.get_uci() log.debug( '[%d] uci type: %s' % ( cnt, uci_wrapper.get_name() ) ) uci_state = uci_wrapper.get_state() if uci_state is self.STOP_SIGNAL: return try: - if uci_state==uci_states.NEW: # "new": + if uci_state==uci_states.NEW: self.createUCI( uci_wrapper ) - elif uci_state==uci_states.DELETING: #"deleting": + elif uci_state==uci_states.DELETING: self.deleteUCI( uci_wrapper ) - elif uci_state==uci_states.SUBMITTED: #"submitted": + elif uci_state==uci_states.SUBMITTED: self.startUCI( uci_wrapper ) - elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down": + elif uci_state==uci_states.SHUTTING_DOWN: self.stopUCI( uci_wrapper ) except: log.exception( "Uncaught exception executing request." ) @@ -134,22 +132,7 @@ """ log.debug( "image id: '%s'" % model.CloudImage.get( 1 ).image_id ) return model.CloudImage.get( 1 ).image_id - -# def get_instances( self, uci ): -# """ -# Get objects of instances that are pending or running and are connected to uci object -# """ -# instances = trans.sa_session.query( model.CloudInstance ) \ -# .filter_by( user=user, uci_id=uci.id ) \ -# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \ -# .first() -# #.all() #TODO: return all but need to edit calling method(s) to handle list -# -# instances = uci.instance -# -# return instances - - + def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" log.info( "sending stop signal to worker threads in eucalyptus cloud manager" ) @@ -169,7 +152,6 @@ and registers relevant information in Galaxy database. """ conn = self.get_connection( uci_wrapper ) - # Temporary code - need to ensure user selects zone at UCI creation time! if uci_wrapper.get_uci_availability_zone()=='': log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone ) uci_wrapper.set_store_availability_zone( self.zone ) @@ -245,21 +227,7 @@ mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) ) log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) uci_wrapper.set_mi( i_index, mi_id ) - - # log.debug( '***** Setting up security group' ) - # If not existent, setup galaxy security group - # try: - # gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.') - # gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port - # gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port - # except: - # pass - # sgs = conn.get_all_security_groups() - # for i in range( len( sgs ) ): - # if sgs[i].name == "galaxy": - # sg.append( sgs[i] ) - # break # only 1 security group w/ this type can exist, so continue - + log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() ) log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) reservation = conn.run_instances( image_id=mi_id, key_name=uci_wrapper.get_key_pair_name( i_index ) ) @@ -276,39 +244,6 @@ uci_wrapper.change_state( s, i_id, s ) log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) - - -# # Wait until instance gets running and then update the DB -# while s!="running": -# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) ) -# time.sleep( 15 ) -# s = reservation.i_indexes[0].update() -# -# # Update instance data in local DB -# uci.instance[0].state = s -# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name -# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name -# uci.instance[0].flush() -# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state -# # and does not connect or wait on connection between instance and volume to be established -# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all() -# vols = [] -# for v in vl: -# vols.append( v.volume_id ) -# try: -# volumes = conn.get_all_volumes( vols ) -# for i, v in enumerate( volumes ): -# uci.store[i].i_id = v.instance_id -# uci.store[i].status = v.status -# uci.store[i].device = v.device -# uci.store[i].flush() -# except BotoServerError: -# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." ) -# -# uci.state = s -# uci.flush() - - def stopUCI( self, uci_wrapper): """ Stops all of cloud instances associated with given UCI. @@ -332,30 +267,9 @@ uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() ) stopped.append( inst ) -# uci_wrapper.change_state( uci_state='available' ) uci_wrapper.reset_uci_launch_time() - -# # Wait for all instances to actually terminate and update local DB -# terminated=0 -# while terminated!=len( rl ): -# for i, r in enumerate( rl ): -# log.debug( "r state: %s" % r.instances[0].state ) -# state = r.instances[0].update() -# if state=='terminated': -# uci.instance[i].state = state -# uci.instance[i].flush() -# terminated += 1 -# time.sleep ( 5 ) -# -# # Reset UCI state -# uci.state = 'available' -# uci.launch_time = None -# uci.flush() -# log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() ) - - # dbInstances = get_instances( trans, uci ) #TODO: handle list! # # # Get actual cloud instance object @@ -377,7 +291,6 @@ # store.i_id = None # store.status = volStat # log.debug ( '***** volume status: %s' % volStat ) -# # # # Stop the instance and update status in local database # cloudInstance.stop() @@ -495,7 +408,6 @@ region=euca_region, path=uci.credentials.provider.path )# Get reservations handle for given store vl = conn.get_all_volumes( [store.volume_id] ) -# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) ) # Update store status in local DB with info from cloud provider if store.status != vl[0].status: # In case something failed during creation of UCI but actual storage volume was created and yet diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/web/controllers/cloud.py --- a/lib/galaxy/web/controllers/cloud.py Wed Oct 28 16:35:36 2009 -0400 +++ b/lib/galaxy/web/controllers/cloud.py Thu Oct 29 11:46:43 2009 -0400 @@ -12,7 +12,6 @@ from galaxy.util.odict import odict from galaxy.util.bunch import Bunch from galaxy.util.topsort import topsort, topsort_levels, CycleError -from galaxy.workflow.modules import * from galaxy.model.mapping import desc from galaxy.model.orm import * from datetime import datetime, timedelta @@ -69,12 +68,6 @@ Render cloud main page (management of cloud resources) """ user = trans.get_user() -# pendingInstances = trans.sa_session.query( model.UCI ) \ -# .filter_by( user=user, state="pending" ) \ -# .all() -# -# for i inupdate_in range( len ( pendingInstances ) ): -# stance_state( trans, pendingInstances[i].id ) cloudCredentials = trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user ) \ @@ -104,7 +97,6 @@ .all() # Check after update there are instances in pending state; if so, display message - # TODO: Auto-refresh once instance is running pendingInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ .filter( or_( model.UCI.c.state==uci_states.PENDING, #"pending" , \ @@ -128,7 +120,6 @@ liveInstances = liveInstances, prevInstances = prevInstances ) - @web.expose @web.require_login( "use Galaxy cloud" ) def makeDefault( self, trans, id=None ): """ @@ -337,8 +328,6 @@ inst_error = "No registered cloud images. You must contact administrator to add some before proceeding." log.debug("AttributeError: %s " % str( ae ) ) - #TODO: based on user credentials (i.e., provider) selected, zone options will be different (e.g., EC2: us-east-1a vs EPC: epc) - return trans.fill_template( "cloud/configure_uci.mako", instanceName = instanceName, credName = storedCreds, @@ -427,8 +416,6 @@ session.flush() # Log and display the management page trans.set_message( "Credential '%s' edited." % credentials.name ) -# if defaultCred: -# self.makeDefault( trans, credentials.id) return self.list( trans ) @web.expose @@ -507,8 +494,6 @@ # Log and display the management page trans.log_event( "User added new credentials" ) trans.set_message( "Credential '%s' created" % credentials.name ) -# if defaultCred: -# self.makeDefault( trans, credentials.id) return self.list( trans ) providers = trans.sa_session.query( model.CloudProvider ).filter_by( user=user ).all() @@ -521,13 +506,6 @@ providers = providers ) -# return trans.show_form( -# web.FormBuilder( web.url_for(), "Add credentials", submit_text="Add" ) -# .add_text( "credName", "Credentials name", value="Unnamed credentials", error=cred_error ) -# .add_text( "providerName", "Cloud provider name", value="ec2 or eucalyptus", error=provider_error ) -# .add_text( "accessKey", "Access key", value='', error=accessKey_error ) -# .add_password( "secretKey", "Secret key", value='', error=secretKey_error ) ) - @web.expose @web.require_login( "view credentials" ) def view( self, trans, id=None ): @@ -770,18 +748,6 @@ # Looks good return stored -def get_default_credentials( trans, check_ownership=True ): - """ - Get a StoredUserCredntials from the database by 'default' setting, verifying ownership. - """ - user = trans.get_user() - # Load credentials from database - stored = trans.sa_session.query( model.CloudUserCredentials ) \ - .filter_by (user=user, defaultCred=True) \ - .first() - - return stored - def get_uci( trans, id, check_ownership=True ): """ Get a UCI object from the database by id, verifying ownership. @@ -891,204 +857,3 @@ # Actually, probably return key_pair to calling method and store name & key from there... return key_pair.name - -def update_instance_state( trans, id ): - """ - Update state of instances associated with given UCI id and store state in local database. Also update - state of the given UCI. - """ - uci = get_uci( trans, id ) - # Get list of instances associated with given uci as they are stored in local database - dbInstances = get_instances( trans, uci ) # TODO: handle list (currently only 1 instance can correspond to 1 UCI) - oldState = dbInstances.state - # Establish connection with cloud - conn = get_connection( trans ) - # Get actual cloud instance object - cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) - # Update instance status - cloudInstance.update() - dbInstances.state = cloudInstance.state - log.debug( "Updating instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) - # Update state of UCI (TODO: once more than 1 instance is assoc. w/ 1 UCI, this will be need to be updated differently) - uci.state = dbInstances.state - # Persist - session = trans.sa_session - session.save_or_update( dbInstances ) - session.save_or_update( uci ) - session.flush() - - # If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS) - if oldState==instance_states.PENDING and dbInstances.state==instance_states.RUNNING: - update_instance( trans, dbInstances, cloudInstance, conn, uci ) - - -def update_instance( trans, dbInstance, cloudInstance, conn, uci ): - """ - Update instance: connect EBS volume, mount file system, start Galaxy, and update local DB w/ DNS info - - Keyword arguments: - trans -- current transaction - dbInstance -- object of 'instance' as it is stored in local database - cloudInstance -- object of 'instance' as it resides in the cloud. Functions supported by the cloud API can be - instantiated directly on this object. - conn -- cloud connection object - uci -- UCI object - """ - dbInstance.public_dns = cloudInstance.dns_name - dbInstance.private_dns = cloudInstance.private_dns_name - - # Attach storage volume(s) to instance - stores = get_stores( trans, uci ) - for i, store in enumerate( stores ): - log.debug( "Attaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstance.instance_id ) ) - mntDevice = '/dev/sdb'+str(i) - volStat = conn.attach_volume( store.volume_id, dbInstance.instance_id, mntDevice ) - store.attach_time = datetime.utcnow() - store.device = mntDevice - store.i_id = dbInstance.instance_id - store.status = volStat - log.debug ( '***** volume status: %s' % volStat ) - - # Wait until instances have attached and add file system - - - - # TODO: mount storage through ZFS - # TODO: start Galaxy - - # Persist - session = trans.sa_session - session.save_or_update( dbInstance ) - session.flush() - -def attach_ordered_steps( workflow, steps ): - ordered_steps = order_workflow_steps( steps ) - if ordered_steps: - workflow.has_cycles = False - for i, step in enumerate( ordered_steps ): - step.order_index = i - workflow.steps.append( step ) - else: - workflow.has_cycles = True - workflow.steps = steps - -def edgelist_for_workflow_steps( steps ): - """ - Create a list of tuples representing edges between `WorkflowSteps` based - on associated `WorkflowStepConnection`s - """ - edges = [] - steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) ) - for step in steps: - edges.append( ( steps_to_index[step], steps_to_index[step] ) ) - for conn in step.input_connections: - edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) ) - return edges - -def order_workflow_steps( steps ): - """ - Perform topological sort of the steps, return ordered or None - """ - try: - edges = edgelist_for_workflow_steps( steps ) - node_order = topsort( edges ) - return [ steps[i] for i in node_order ] - except CycleError: - return None - -def order_workflow_steps_with_levels( steps ): - try: - return topsort_levels( edgelist_for_workflow_steps( steps ) ) - except CycleError: - return None - -class FakeJob( object ): - """ - Fake job object for datasets that have no creating_job_associations, - they will be treated as "input" datasets. - """ - def __init__( self, dataset ): - self.is_fake = True - self.id = "fake_%s" % dataset.id - -def get_job_dict( trans ): - """ - Return a dictionary of Job -> [ Dataset ] mappings, for all finished - active Datasets in the current history and the jobs that created them. - """ - history = trans.get_history() - # Get the jobs that created the datasets - warnings = set() - jobs = odict() - for dataset in history.active_datasets: - # FIXME: Create "Dataset.is_finished" - if dataset.state in ( 'new', 'running', 'queued' ): - warnings.add( "Some datasets still queued or running were ignored" ) - continue - - #if this hda was copied from another, we need to find the job that created the origial hda - job_hda = dataset - while job_hda.copied_from_history_dataset_association: - job_hda = job_hda.copied_from_history_dataset_association - - if not job_hda.creating_job_associations: - jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ] - - for assoc in job_hda.creating_job_associations: - job = assoc.job - if job in jobs: - jobs[ job ].append( ( assoc.name, dataset ) ) - else: - jobs[ job ] = [ ( assoc.name, dataset ) ] - return jobs, warnings - -def cleanup_param_values( inputs, values ): - """ - Remove 'Data' values from `param_values`, along with metadata cruft, - but track the associations. - """ - associations = [] - names_to_clean = [] - # dbkey is pushed in by the framework - if 'dbkey' in values: - del values['dbkey'] - root_values = values - # Recursively clean data inputs and dynamic selects - def cleanup( prefix, inputs, values ): - for key, input in inputs.items(): - if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ): - if input.is_dynamic: - values[key] = UnvalidatedValue( values[key] ) - if isinstance( input, DataToolParameter ): - tmp = values[key] - values[key] = None - # HACK: Nested associations are not yet working, but we - # still need to clean them up so we can serialize - # if not( prefix ): - if tmp: #this is false for a non-set optional dataset - associations.append( ( tmp.hid, prefix + key ) ) - # Cleanup the other deprecated crap associated with datasets - # as well. Worse, for nested datasets all the metadata is - # being pushed into the root. FIXME: MUST REMOVE SOON - key = prefix + key + "_" - for k in root_values.keys(): - if k.startswith( key ): - del root_values[k] - elif isinstance( input, Repeat ): - group_values = values[key] - for i, rep_values in enumerate( group_values ): - rep_index = rep_values['__index__'] - prefix = "%s_%d|" % ( key, rep_index ) - cleanup( prefix, input.inputs, group_values[i] ) - elif isinstance( input, Conditional ): - group_values = values[input.name] - current_case = group_values['__current_case__'] - prefix = "%s|" % ( key ) - cleanup( prefix, input.cases[current_case].inputs, group_values ) - cleanup( "", inputs, values ) - return associations - - - - -
participants (1)
-
Greg Von Kuster