[hg] galaxy 3069: Web UI functionality improved when adding cred...
details: http://www.bx.psu.edu/hg/galaxy/rev/9881b0df3252 changeset: 3069:9881b0df3252 user: Enis Afgan <afgane@gmail.com> date: Fri Oct 16 13:06:44 2009 -0400 description: Web UI functionality improved when adding credentials and configuring new UCIs. Added support for EC2 as a cloud provider; however, provider selection based on user credentials alone does not work and thus cannot verify EC2 functionality yet. diffstat: lib/galaxy/cloud/__init__.py | 729 ++++++++++++++++--------------- lib/galaxy/cloud/providers/ec2.py | 526 ++++++++++++++++++++++ lib/galaxy/cloud/providers/eucalyptus.py | 49 +- lib/galaxy/web/controllers/cloud.py | 87 ++- lib/galaxy/web/framework/helpers/__init__.py | 1 + templates/cloud/add_credentials.mako | 8 +- templates/cloud/configure_uci.mako | 116 +++++ 7 files changed, 1100 insertions(+), 416 deletions(-) diffs (1762 lines): diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/cloud/__init__.py --- a/lib/galaxy/cloud/__init__.py Wed Oct 14 19:20:11 2009 -0400 +++ b/lib/galaxy/cloud/__init__.py Fri Oct 16 13:06:44 2009 -0400 @@ -33,9 +33,9 @@ self.app = app if self.app.config.get_bool( "enable_cloud_execution", True ): # The dispatcher manager underlying cloud instances - self.provider = DefaultCloudProvider( app ) +# self.provider = CloudProvider( app ) # Monitor for updating status of cloud instances - self.cloud_monitor = CloudMonitor( self.app, self.provider ) + self.cloud_monitor = CloudMonitor( self.app ) # self.job_stop_queue = JobStopQueue( app, self.dispatcher ) else: self.job_queue = self.job_stop_queue = NoopCloudMonitor() @@ -93,7 +93,7 @@ CloudProvider. """ STOP_SIGNAL = object() - def __init__( self, app, provider ): + def __init__( self, app ): """Start the cloud manager""" self.app = app # Keep track of the pid that started the cloud manager, only it @@ -153,7 +153,7 @@ try: # log.debug( "Calling monitor_step" ) self.__monitor_step() - if cnt%30 == 0: # Run global update every 30 seconds + if cnt%30 == 0: # Run global update every 30 seconds (1 minute) self.provider.update() cnt = 0 except: @@ -471,18 +471,25 @@ uci.store[store_id].device = device uci.store[store_id].flush() - def set_store_status( self, store_id, status ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.store[store_id].status = status - uci.store[store_id].flush() - - def set_store_availability_zone( self, store_id, availability_zone ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.store[store_id].availability_zone = availability_zone - uci.store[store_id].flush() - + def set_store_status( self, vol_id, status ): + vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first() + vol.status = status + vol.flush() + + def set_store_availability_zone( self, availability_zone, vol_id=None ): + """ + Sets availability zone of storage volumes for either ALL volumes associated with current + UCI or for the volume whose volume ID (e.g., 'vol-39F80512') is provided as argument. + """ + if vol_id is not None: + vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).all() + else: + vol = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() + + for v in vol: + v.availability_zone = availability_zone + v.flush() + def set_store_volume_id( self, store_id, volume_id ): """ Given store ID associated with this UCI, set volume ID as it is registered @@ -493,13 +500,15 @@ uci.store[store_id].volume_id = volume_id uci.store[store_id].flush() - def set_store_instance( self, store_id, instance_id ): - """ Stores instance ID that given store volume is attached to. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.store[store_id].i_id = instance_id - uci.store[store_id].flush() - + def set_store_instance( self, vol_id, instance_id ): + """ + Stores instance ID that given store volume is attached to. Store volume ID should + be given in following format: 'vol-78943248' + """ + vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first() + vol.i_id = instance_id + vol.flush() + # --------- Getter methods ----------------- def get_instances_indexes( self, state=None ): @@ -584,17 +593,23 @@ uci.refresh() return uci.instance[instance_id].private_dns - def get_store_availability_zone( self, store_id ): + def get_uci_availability_zone( self ): + """ + Returns UCI's availability zone. + Because all of storage volumes associated with a given UCI must be in the same + availability zone, availability of a UCI is determined by availability zone of + any one storage volume. + """ uci = model.UCI.get( self.uci_id ) uci.refresh() - return uci.store[store_id].availability_zone + return uci.store[0].availability_zone - def get_store_size( self, store_id ): + def get_store_size( self, store_id=0 ): uci = model.UCI.get( self.uci_id ) uci.refresh() return uci.store[store_id].size - def get_store_volume_id( self, store_id ): + def get_store_volume_id( self, store_id=0 ): """ Given store ID associated with this UCI, get volume ID as it is registered on the cloud provider (e.g., 'vol-39890501') @@ -630,337 +645,337 @@ uci.state = 'deleted' # for bookkeeping reasons, mark as deleted but don't actually delete. uci.flush() -class JobWrapper( object ): - """ - Wraps a 'model.Job' with convience methods for running processes and - state management. - """ - def __init__(self, job, tool, queue ): - self.job_id = job.id - # This is immutable, we cache it for the scheduling policy to use if needed - self.session_id = job.session_id - self.tool = tool - self.queue = queue - self.app = queue.app - self.extra_filenames = [] - self.command_line = None - self.galaxy_lib_dir = None - # With job outputs in the working directory, we need the working - # directory to be set before prepare is run, or else premature deletion - # and job recovery fail. - self.working_directory = \ - os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) - self.output_paths = None - self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally - - def get_param_dict( self ): - """ - Restore the dictionary of parameters from the database. - """ - job = model.Job.get( self.job_id ) - param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) - param_dict = self.tool.params_from_strings( param_dict, self.app ) - return param_dict - - def prepare( self ): - """ - Prepare the job to run by creating the working directory and the - config files. - """ - mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner - if not os.path.exists( self.working_directory ): - os.mkdir( self.working_directory ) - # Restore parameters from the database - job = model.Job.get( self.job_id ) - incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) - incoming = self.tool.params_from_strings( incoming, self.app ) - # Do any validation that could not be done at job creation - self.tool.handle_unvalidated_param_values( incoming, self.app ) - # Restore input / output data lists - inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) - out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) - # These can be passed on the command line if wanted as $userId $userEmail - if job.history.user: # check for anonymous user! - userId = '%d' % job.history.user.id - userEmail = str(job.history.user.email) - else: - userId = 'Anonymous' - userEmail = 'Anonymous' - incoming['userId'] = userId - incoming['userEmail'] = userEmail - # Build params, done before hook so hook can use - param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) - # Certain tools require tasks to be completed prior to job execution - # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). - if self.tool.tool_type is not None: - out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) - # Run the before queue ("exec_before_job") hook - self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, - out_data=out_data, tool=self.tool, param_dict=incoming) - mapping.context.current.flush() - # Build any required config files - config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) - # FIXME: Build the param file (might return None, DEPRECATED) - param_filename = self.tool.build_param_file( param_dict, self.working_directory ) - # Build the job's command line - self.command_line = self.tool.build_command_line( param_dict ) - # FIXME: for now, tools get Galaxy's lib dir in their path - if self.command_line and self.command_line.startswith( 'python' ): - self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root - # We need command_line persisted to the db in order for Galaxy to re-queue the job - # if the server was stopped and restarted before the job finished - job.command_line = self.command_line - job.flush() - # Return list of all extra files - extra_filenames = config_filenames - if param_filename is not None: - extra_filenames.append( param_filename ) - self.param_dict = param_dict - self.extra_filenames = extra_filenames - return extra_filenames +#class JobWrapper( object ): +# """ +# Wraps a 'model.Job' with convience methods for running processes and +# state management. +# """ +# def __init__(self, job, tool, queue ): +# self.job_id = job.id +# # This is immutable, we cache it for the scheduling policy to use if needed +# self.session_id = job.session_id +# self.tool = tool +# self.queue = queue +# self.app = queue.app +# self.extra_filenames = [] +# self.command_line = None +# self.galaxy_lib_dir = None +# # With job outputs in the working directory, we need the working +# # directory to be set before prepare is run, or else premature deletion +# # and job recovery fail. +# self.working_directory = \ +# os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) +# self.output_paths = None +# self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally +# +# def get_param_dict( self ): +# """ +# Restore the dictionary of parameters from the database. +# """ +# job = model.Job.get( self.job_id ) +# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) +# param_dict = self.tool.params_from_strings( param_dict, self.app ) +# return param_dict +# +# def prepare( self ): +# """ +# Prepare the job to run by creating the working directory and the +# config files. +# """ +# mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner +# if not os.path.exists( self.working_directory ): +# os.mkdir( self.working_directory ) +# # Restore parameters from the database +# job = model.Job.get( self.job_id ) +# incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) +# incoming = self.tool.params_from_strings( incoming, self.app ) +# # Do any validation that could not be done at job creation +# self.tool.handle_unvalidated_param_values( incoming, self.app ) +# # Restore input / output data lists +# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) +# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) +# # These can be passed on the command line if wanted as $userId $userEmail +# if job.history.user: # check for anonymous user! +# userId = '%d' % job.history.user.id +# userEmail = str(job.history.user.email) +# else: +# userId = 'Anonymous' +# userEmail = 'Anonymous' +# incoming['userId'] = userId +# incoming['userEmail'] = userEmail +# # Build params, done before hook so hook can use +# param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) +# # Certain tools require tasks to be completed prior to job execution +# # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). +# if self.tool.tool_type is not None: +# out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) +# # Run the before queue ("exec_before_job") hook +# self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, +# out_data=out_data, tool=self.tool, param_dict=incoming) +# mapping.context.current.flush() +# # Build any required config files +# config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) +# # FIXME: Build the param file (might return None, DEPRECATED) +# param_filename = self.tool.build_param_file( param_dict, self.working_directory ) +# # Build the job's command line +# self.command_line = self.tool.build_command_line( param_dict ) +# # FIXME: for now, tools get Galaxy's lib dir in their path +# if self.command_line and self.command_line.startswith( 'python' ): +# self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root +# # We need command_line persisted to the db in order for Galaxy to re-queue the job +# # if the server was stopped and restarted before the job finished +# job.command_line = self.command_line +# job.flush() +# # Return list of all extra files +# extra_filenames = config_filenames +# if param_filename is not None: +# extra_filenames.append( param_filename ) +# self.param_dict = param_dict +# self.extra_filenames = extra_filenames +# return extra_filenames +# +# def fail( self, message, exception=False ): +# """ +# Indicate job failure by setting state and message on all output +# datasets. +# """ +# job = model.Job.get( self.job_id ) +# job.refresh() +# # if the job was deleted, don't fail it +# if not job.state == model.Job.states.DELETED: +# # Check if the failure is due to an exception +# if exception: +# # Save the traceback immediately in case we generate another +# # below +# job.traceback = traceback.format_exc() +# # Get the exception and let the tool attempt to generate +# # a better message +# etype, evalue, tb = sys.exc_info() +# m = self.tool.handle_job_failure_exception( evalue ) +# if m: +# message = m +# if self.app.config.outputs_to_working_directory: +# for dataset_path in self.get_output_fnames(): +# try: +# shutil.move( dataset_path.false_path, dataset_path.real_path ) +# log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) +# except ( IOError, OSError ), e: +# log.error( "fail(): Missing output file in working directory: %s" % e ) +# for dataset_assoc in job.output_datasets: +# dataset = dataset_assoc.dataset +# dataset.refresh() +# dataset.state = dataset.states.ERROR +# dataset.blurb = 'tool error' +# dataset.info = message +# dataset.set_size() +# dataset.flush() +# job.state = model.Job.states.ERROR +# job.command_line = self.command_line +# job.info = message +# job.flush() +# # If the job was deleted, just clean up +# self.cleanup() +# +# def change_state( self, state, info = False ): +# job = model.Job.get( self.job_id ) +# job.refresh() +# for dataset_assoc in job.output_datasets: +# dataset = dataset_assoc.dataset +# dataset.refresh() +# dataset.state = state +# if info: +# dataset.info = info +# dataset.flush() +# if info: +# job.info = info +# job.state = state +# job.flush() +# +# def get_state( self ): +# job = model.Job.get( self.job_id ) +# job.refresh() +# return job.state +# +# def set_runner( self, runner_url, external_id ): +# job = model.Job.get( self.job_id ) +# job.refresh() +# job.job_runner_name = runner_url +# job.job_runner_external_id = external_id +# job.flush() +# +# def finish( self, stdout, stderr ): +# """ +# Called to indicate that the associated command has been run. Updates +# the output datasets based on stderr and stdout from the command, and +# the contents of the output files. +# """ +# # default post job setup +# mapping.context.current.clear() +# job = model.Job.get( self.job_id ) +# # if the job was deleted, don't finish it +# if job.state == job.states.DELETED: +# self.cleanup() +# return +# elif job.state == job.states.ERROR: +# # Job was deleted by an administrator +# self.fail( job.info ) +# return +# if stderr: +# job.state = "error" +# else: +# job.state = 'ok' +# if self.app.config.outputs_to_working_directory: +# for dataset_path in self.get_output_fnames(): +# try: +# shutil.move( dataset_path.false_path, dataset_path.real_path ) +# log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) +# except ( IOError, OSError ): +# self.fail( "Job %s's output dataset(s) could not be read" % job.id ) +# return +# for dataset_assoc in job.output_datasets: +# #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur +# for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running +# dataset.blurb = 'done' +# dataset.peek = 'no peek' +# dataset.info = stdout + stderr +# dataset.set_size() +# if stderr: +# dataset.blurb = "error" +# elif dataset.has_data(): +# #if a dataset was copied, it won't appear in our dictionary: +# #either use the metadata from originating output dataset, or call set_meta on the copies +# #it would be quicker to just copy the metadata from the originating output dataset, +# #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() +# if not self.external_output_metadata.external_metadata_set_successfully( dataset ): +# # Only set metadata values if they are missing... +# dataset.set_meta( overwrite = False ) +# else: +# #load metadata from file +# #we need to no longer allow metadata to be edited while the job is still running, +# #since if it is edited, the metadata changed on the running output will no longer match +# #the metadata that was stored to disk for use via the external process, +# #and the changes made by the user will be lost, without warning or notice +# dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out ) +# if self.tool.is_multi_byte: +# dataset.set_multi_byte_peek() +# else: +# dataset.set_peek() +# else: +# dataset.blurb = "empty" +# dataset.flush() +# if stderr: +# dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR +# else: +# dataset_assoc.dataset.dataset.state = model.Dataset.states.OK +# dataset_assoc.dataset.dataset.flush() +# +# # Save stdout and stderr +# if len( stdout ) > 32768: +# log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) +# job.stdout = stdout[:32768] +# if len( stderr ) > 32768: +# log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) +# job.stderr = stderr[:32768] +# # custom post process setup +# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) +# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) +# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows +# param_dict = self.tool.params_from_strings( param_dict, self.app ) +# # Check for and move associated_files +# self.tool.collect_associated_files(out_data, self.working_directory) +# # Create generated output children and primary datasets and add to param_dict +# collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)} +# param_dict.update({'__collected_datasets__':collected_datasets}) +# # Certain tools require tasks to be completed after job execution +# # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). +# if self.tool.tool_type is not None: +# self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) +# # Call 'exec_after_process' hook +# self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, +# out_data=out_data, param_dict=param_dict, +# tool=self.tool, stdout=stdout, stderr=stderr ) +# # TODO +# # validate output datasets +# job.command_line = self.command_line +# mapping.context.current.flush() +# log.debug( 'job %d ended' % self.job_id ) +# self.cleanup() +# +# def cleanup( self ): +# # remove temporary files +# try: +# for fname in self.extra_filenames: +# os.remove( fname ) +# if self.working_directory is not None: +# shutil.rmtree( self.working_directory ) +# if self.app.config.set_metadata_externally: +# self.external_output_metadata.cleanup_external_metadata() +# except: +# log.exception( "Unable to cleanup job %d" % self.job_id ) +# +# def get_command_line( self ): +# return self.command_line +# +# def get_session_id( self ): +# return self.session_id +# +# def get_input_fnames( self ): +# job = model.Job.get( self.job_id ) +# filenames = [] +# for da in job.input_datasets: #da is JobToInputDatasetAssociation object +# if da.dataset: +# filenames.append( da.dataset.file_name ) +# #we will need to stage in metadata file names also +# #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.) +# for key, value in da.dataset.metadata.items(): +# if isinstance( value, model.MetadataFile ): +# filenames.append( value.file_name ) +# return filenames +# +# def get_output_fnames( self ): +# if self.output_paths is not None: +# return self.output_paths +# +# class DatasetPath( object ): +# def __init__( self, real_path, false_path = None ): +# self.real_path = real_path +# self.false_path = false_path +# def __str__( self ): +# if self.false_path is None: +# return self.real_path +# else: +# return self.false_path +# +# job = model.Job.get( self.job_id ) +# if self.app.config.outputs_to_working_directory: +# self.output_paths = [] +# for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]: +# false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) +# self.output_paths.append( DatasetPath( data.file_name, false_path ) ) +# else: +# self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ] +# return self.output_paths +# +# def check_output_sizes( self ): +# sizes = [] +# output_paths = self.get_output_fnames() +# for outfile in [ str( o ) for o in output_paths ]: +# sizes.append( ( outfile, os.stat( outfile ).st_size ) ) +# return sizes +# def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ): +# if tmp_dir is None: +# #this dir should should relative to the exec_dir +# tmp_dir = self.app.config.new_file_path +# if dataset_files_path is None: +# dataset_files_path = self.app.model.Dataset.file_path +# if config_root is None: +# config_root = self.app.config.root +# if datatypes_config is None: +# datatypes_config = self.app.config.datatypes_config +# job = model.Job.get( self.job_id ) +# return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds ) - def fail( self, message, exception=False ): - """ - Indicate job failure by setting state and message on all output - datasets. - """ - job = model.Job.get( self.job_id ) - job.refresh() - # if the job was deleted, don't fail it - if not job.state == model.Job.states.DELETED: - # Check if the failure is due to an exception - if exception: - # Save the traceback immediately in case we generate another - # below - job.traceback = traceback.format_exc() - # Get the exception and let the tool attempt to generate - # a better message - etype, evalue, tb = sys.exc_info() - m = self.tool.handle_job_failure_exception( evalue ) - if m: - message = m - if self.app.config.outputs_to_working_directory: - for dataset_path in self.get_output_fnames(): - try: - shutil.move( dataset_path.false_path, dataset_path.real_path ) - log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) - except ( IOError, OSError ), e: - log.error( "fail(): Missing output file in working directory: %s" % e ) - for dataset_assoc in job.output_datasets: - dataset = dataset_assoc.dataset - dataset.refresh() - dataset.state = dataset.states.ERROR - dataset.blurb = 'tool error' - dataset.info = message - dataset.set_size() - dataset.flush() - job.state = model.Job.states.ERROR - job.command_line = self.command_line - job.info = message - job.flush() - # If the job was deleted, just clean up - self.cleanup() - - def change_state( self, state, info = False ): - job = model.Job.get( self.job_id ) - job.refresh() - for dataset_assoc in job.output_datasets: - dataset = dataset_assoc.dataset - dataset.refresh() - dataset.state = state - if info: - dataset.info = info - dataset.flush() - if info: - job.info = info - job.state = state - job.flush() - - def get_state( self ): - job = model.Job.get( self.job_id ) - job.refresh() - return job.state - - def set_runner( self, runner_url, external_id ): - job = model.Job.get( self.job_id ) - job.refresh() - job.job_runner_name = runner_url - job.job_runner_external_id = external_id - job.flush() - - def finish( self, stdout, stderr ): - """ - Called to indicate that the associated command has been run. Updates - the output datasets based on stderr and stdout from the command, and - the contents of the output files. - """ - # default post job setup - mapping.context.current.clear() - job = model.Job.get( self.job_id ) - # if the job was deleted, don't finish it - if job.state == job.states.DELETED: - self.cleanup() - return - elif job.state == job.states.ERROR: - # Job was deleted by an administrator - self.fail( job.info ) - return - if stderr: - job.state = "error" - else: - job.state = 'ok' - if self.app.config.outputs_to_working_directory: - for dataset_path in self.get_output_fnames(): - try: - shutil.move( dataset_path.false_path, dataset_path.real_path ) - log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) - except ( IOError, OSError ): - self.fail( "Job %s's output dataset(s) could not be read" % job.id ) - return - for dataset_assoc in job.output_datasets: - #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur - for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running - dataset.blurb = 'done' - dataset.peek = 'no peek' - dataset.info = stdout + stderr - dataset.set_size() - if stderr: - dataset.blurb = "error" - elif dataset.has_data(): - #if a dataset was copied, it won't appear in our dictionary: - #either use the metadata from originating output dataset, or call set_meta on the copies - #it would be quicker to just copy the metadata from the originating output dataset, - #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() - if not self.external_output_metadata.external_metadata_set_successfully( dataset ): - # Only set metadata values if they are missing... - dataset.set_meta( overwrite = False ) - else: - #load metadata from file - #we need to no longer allow metadata to be edited while the job is still running, - #since if it is edited, the metadata changed on the running output will no longer match - #the metadata that was stored to disk for use via the external process, - #and the changes made by the user will be lost, without warning or notice - dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out ) - if self.tool.is_multi_byte: - dataset.set_multi_byte_peek() - else: - dataset.set_peek() - else: - dataset.blurb = "empty" - dataset.flush() - if stderr: - dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR - else: - dataset_assoc.dataset.dataset.state = model.Dataset.states.OK - dataset_assoc.dataset.dataset.flush() - - # Save stdout and stderr - if len( stdout ) > 32768: - log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) - job.stdout = stdout[:32768] - if len( stderr ) > 32768: - log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) - job.stderr = stderr[:32768] - # custom post process setup - inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) - out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) - param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows - param_dict = self.tool.params_from_strings( param_dict, self.app ) - # Check for and move associated_files - self.tool.collect_associated_files(out_data, self.working_directory) - # Create generated output children and primary datasets and add to param_dict - collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)} - param_dict.update({'__collected_datasets__':collected_datasets}) - # Certain tools require tasks to be completed after job execution - # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). - if self.tool.tool_type is not None: - self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) - # Call 'exec_after_process' hook - self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, - out_data=out_data, param_dict=param_dict, - tool=self.tool, stdout=stdout, stderr=stderr ) - # TODO - # validate output datasets - job.command_line = self.command_line - mapping.context.current.flush() - log.debug( 'job %d ended' % self.job_id ) - self.cleanup() - - def cleanup( self ): - # remove temporary files - try: - for fname in self.extra_filenames: - os.remove( fname ) - if self.working_directory is not None: - shutil.rmtree( self.working_directory ) - if self.app.config.set_metadata_externally: - self.external_output_metadata.cleanup_external_metadata() - except: - log.exception( "Unable to cleanup job %d" % self.job_id ) - - def get_command_line( self ): - return self.command_line - - def get_session_id( self ): - return self.session_id - - def get_input_fnames( self ): - job = model.Job.get( self.job_id ) - filenames = [] - for da in job.input_datasets: #da is JobToInputDatasetAssociation object - if da.dataset: - filenames.append( da.dataset.file_name ) - #we will need to stage in metadata file names also - #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.) - for key, value in da.dataset.metadata.items(): - if isinstance( value, model.MetadataFile ): - filenames.append( value.file_name ) - return filenames - - def get_output_fnames( self ): - if self.output_paths is not None: - return self.output_paths - - class DatasetPath( object ): - def __init__( self, real_path, false_path = None ): - self.real_path = real_path - self.false_path = false_path - def __str__( self ): - if self.false_path is None: - return self.real_path - else: - return self.false_path - - job = model.Job.get( self.job_id ) - if self.app.config.outputs_to_working_directory: - self.output_paths = [] - for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]: - false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) - self.output_paths.append( DatasetPath( data.file_name, false_path ) ) - else: - self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ] - return self.output_paths - - def check_output_sizes( self ): - sizes = [] - output_paths = self.get_output_fnames() - for outfile in [ str( o ) for o in output_paths ]: - sizes.append( ( outfile, os.stat( outfile ).st_size ) ) - return sizes - def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ): - if tmp_dir is None: - #this dir should should relative to the exec_dir - tmp_dir = self.app.config.new_file_path - if dataset_files_path is None: - dataset_files_path = self.app.model.Dataset.file_path - if config_root is None: - config_root = self.app.config.root - if datatypes_config is None: - datatypes_config = self.app.config.datatypes_config - job = model.Job.get( self.job_id ) - return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds ) - -class DefaultCloudProvider( object ): +class CloudProvider( object ): def __init__( self, app ): self.app = app self.cloud_provider = {} diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/cloud/providers/ec2.py --- a/lib/galaxy/cloud/providers/ec2.py Wed Oct 14 19:20:11 2009 -0400 +++ b/lib/galaxy/cloud/providers/ec2.py Fri Oct 16 13:06:44 2009 -0400 @@ -0,0 +1,526 @@ +import subprocess, threading, os, errno, time, datetime +from Queue import Queue, Empty +from datetime import datetime + +from galaxy import model # Database interaction class +from galaxy.model import mapping +from galaxy.datatypes.data import nice_size +from Queue import Queue +from sqlalchemy import or_ + +import galaxy.eggs +galaxy.eggs.require("boto") +from boto.ec2.connection import EC2Connection +from boto.ec2.regioninfo import RegionInfo + +import logging +log = logging.getLogger( __name__ ) + +class EC2CloudProvider( object ): + """ + Amazon EC2-based cloud provider implementation for managing instances. + """ + STOP_SIGNAL = object() + def __init__( self, app ): + self.zone = "us-east-1a" + self.key_pair = "galaxy-keypair" + self.queue = Queue() + + #TODO: Use multiple threads to process requests? + self.threads = [] + nworkers = 5 + log.info( "Starting EC2 cloud controller workers" ) + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.threads.append( worker ) + log.debug( "%d EC2 cloud workers ready", nworkers ) + + def run_next( self ): + """Run the next job, waiting until one is available if necessary""" + cnt = 0 + while 1: + + uci_wrapper = self.queue.get() +# uci = uci_wrapper.get_uci() + log.debug( '[%d] uci name: %s' % ( cnt, uci_wrapper.get_name() ) ) + uci_state = uci_wrapper.get_state() + if uci_state is self.STOP_SIGNAL: + return + try: + if uci_state=="new": + log.debug( "Calling create UCI" ) + self.createUCI( uci_wrapper ) + elif uci_state=="deleting": + self.deleteUCI( uci_wrapper ) + elif uci_state=="submitted": + log.debug( "Calling start UCI" ) + self.startUCI( uci_wrapper ) + elif uci_state=="shutting-down": + self.stopUCI( uci_wrapper ) + except: + log.exception( "Uncaught exception executing request." ) + cnt += 1 + + def get_connection( self, uci_wrapper ): + """ + Establishes EC2 cloud connection using user's credentials associated with given UCI + """ + log.debug( '##### Establishing EC2 cloud connection' ) + conn = EC2Connection( uci_wrapper.get_access_key(), uci_wrapper.get_secret_key() ) + return conn + + def set_keypair( self, uci_wrapper, conn ): + """ + Generate keypair using user's default credentials + """ + log.debug( "Getting user's keypair" ) + kp = conn.get_key_pair( self.key_pair ) + instances = uci_wrapper.get_instances_indexes() + + try: + for inst in instances: + log.debug("inst: '%s'" % inst ) + uci_wrapper.set_key_pair( inst, kp.name ) + return kp.name + except AttributeError: # No keypair under this name exists so create it + log.info( "No keypair found, creating keypair '%s'" % self.key_pair ) + kp = conn.create_key_pair( self.key_pair ) + for inst in instances: + uci_wrapper.set_key_pair( inst, kp.name, kp.material ) + + return kp.name + + def get_mi_id( self, type ): + """ + Get appropriate machine image (mi) based on instance size. + TODO: Dummy method - need to implement logic + For valid sizes, see http://aws.amazon.com/ec2/instance-types/ + """ + return model.CloudImage.filter( model.CloudImage.table.c.id==2 ).first().image_id + +# def get_instances( self, uci ): +# """ +# Get objects of instances that are pending or running and are connected to uci object +# """ +# instances = trans.sa_session.query( model.CloudInstance ) \ +# .filter_by( user=user, uci_id=uci.id ) \ +# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \ +# .first() +# #.all() #TODO: return all but need to edit calling method(s) to handle list +# +# instances = uci.instance +# +# return instances + + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads in EC2 cloud manager" ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) + log.info( "EC2 cloud manager stopped" ) + + def put( self, uci_wrapper ): + # Get rid of UCI from state description + state = uci_wrapper.get_state() + uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) + self.queue.put( uci_wrapper ) + + def createUCI( self, uci_wrapper ): + """ + Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider + and registers relevant information in Galaxy database. + """ + conn = self.get_connection( uci_wrapper ) + # Temporary code - need to ensure user selects zone at UCI creation time! + if uci_wrapper.get_uci_availability_zone()=='': + log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone ) + uci_wrapper.set_store_availability_zone( self.zone ) + + #TODO: check if volume associated with UCI already exists (if server crashed for example) and don't recreate it + log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_uci_availability_zone() ) + # Because only 1 storage volume may be created at UCI config time, index of this storage volume in local Galaxy DB w.r.t + # current UCI is 0, so reference it in following methods + vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone(), snapshot=None ) + uci_wrapper.set_store_volume_id( 0, vol.id ) + + # Wait for a while to ensure volume was created + vol_status = vol.status + for i in range( 30 ): + if vol_status is not "available": + log.debug( 'Updating volume status; current status: %s' % vol_status ) + vol_status = vol.status + time.sleep(3) + if i is 29: + log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) ) + conn.delete_volume( vol.id ) + uci_wrapper.change_state( uci_state='error' ) + return + + uci_wrapper.change_state( uci_state='available' ) + uci_wrapper.set_store_status( vol.id, vol_status ) + + def deleteUCI( self, uci_wrapper ): + """ + Deletes UCI. NOTE that this implies deletion of any and all data associated + with this UCI from the cloud. All data will be deleted. + """ + conn = self.get_connection( uci_wrapper ) + vl = [] # volume list + count = 0 # counter for checking if all volumes assoc. w/ UCI were deleted + + # Get all volumes assoc. w/ UCI, delete them from cloud as well as in local DB + vl = uci_wrapper.get_all_stores() + deletedList = [] + failedList = [] + for v in vl: + log.debug( "Deleting volume with id='%s'" % v.volume_id ) + if conn.delete_volume( v.volume_id ): + deletedList.append( v.volume_id ) + v.delete() + v.flush() + count += 1 + else: + failedList.append( v.volume_id ) + + # Delete UCI if all of associated + log.debug( "count=%s, len(vl)=%s" % (count, len( vl ) ) ) + if count == len( vl ): + uci_wrapper.delete() + else: + log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \ + MANUAL intervention and processing needed." % ( failedList, deletedList ) ) + uci_wrapper.change_state( uci_state="error" ) + + def addStorageToUCI( self, name ): + """ Adds more storage to specified UCI + TODO""" + + def dummyStartUCI( self, uci_wrapper ): + + uci = uci_wrapper.get_uci() + log.debug( "Would be starting instance '%s'" % uci.name ) + uci_wrapper.change_state( 'pending' ) +# log.debug( "Sleeping a bit... (%s)" % uci.name ) +# time.sleep(20) +# log.debug( "Woke up! (%s)" % uci.name ) + + def startUCI( self, uci_wrapper ): + """ + Starts instance(s) of given UCI on the cloud. + """ + conn = self.get_connection( uci_wrapper ) +# + self.set_keypair( uci_wrapper, conn ) + i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of *new* i_indexes associated with this UCI + log.debug( "Starting instances with IDs: '%s' associated with UCI '%s' " % ( uci_wrapper.get_name(), i_indexes ) ) + + for i_index in i_indexes: + mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) ) + log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) + uci_wrapper.set_mi( i_index, mi_id ) + + # Check if galaxy security group exists (and create it if it does not) + log.debug( '***** Setting up security group' ) + security_group = 'galaxyWeb' + sgs = conn.get_all_security_groups() # security groups + gsgt = False # galaxy security group test + for sg in sgs: + if sg.name == security_group: + gsgt = True + # If security group does not exist, create it + if not gsgt: + gSecurityGroup = conn.create_security_group(security_group, 'Security group for Galaxy.') + gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port + gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port + # Start an instance + log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() ) + log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s, security_groups=[%s], instance_type=%s, placement=%s )' + % ( mi_id, uci_wrapper.get_key_pair_name( i_index ), [security_group], uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) ) + reservation = conn.run_instances( image_id=mi_id, + key_name=uci_wrapper.get_key_pair_name( i_index ), + security_groups=[security_group], + instance_type=uci_wrapper.get_type( i_index ), + placement=uci_wrapper.get_uci_availability_zone() ) + # Record newly available instance data into local Galaxy database + l_time = datetime.utcnow() + uci_wrapper.set_launch_time( l_time, i_index=i_index ) # format_time( reservation.i_indexes[0].launch_time ) ) + if not uci_wrapper.uci_launch_time_set(): + uci_wrapper.set_uci_launch_time( l_time ) + uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) + # TODO: if more than a single instance will be started through single reservation, change this reference to element [0] + i_id = str( reservation.instances[0]).split(":")[1] + uci_wrapper.set_instance_id( i_index, i_id ) + s = reservation.instances[0].state + uci_wrapper.change_state( s, i_id, s ) + log.debug( "Instance of UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) + + + +# # Wait until instance gets running and then update the DB +# while s!="running": +# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) ) +# time.sleep( 15 ) +# s = reservation.i_indexes[0].update() +# +# # Update instance data in local DB +# uci.instance[0].state = s +# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name +# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name +# uci.instance[0].flush() +# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state +# # and does not connect or wait on connection between instance and volume to be established +# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all() +# vols = [] +# for v in vl: +# vols.append( v.volume_id ) +# try: +# volumes = conn.get_all_volumes( vols ) +# for i, v in enumerate( volumes ): +# uci.store[i].i_id = v.instance_id +# uci.store[i].status = v.status +# uci.store[i].device = v.device +# uci.store[i].flush() +# except BotoServerError: +# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." ) +# +# uci.state = s +# uci.flush() + + + def stopUCI( self, uci_wrapper): + """ + Stops all of cloud instances associated with given UCI. + """ + conn = self.get_connection( uci_wrapper ) + + # Get all instances associated with given UCI + il = uci_wrapper.get_instances_ids() # instance list +# log.debug( 'List of instances being terminated: %s' % il ) + rl = conn.get_all_instances( il ) # Reservation list associated with given instances + +# tState = conn.terminate_instances( il ) +# # TODO: Need to update instance stop time (for all individual instances) +# stop_time = datetime.utcnow() +# uci_wrapper.set_stop_time( stop_time ) + + # Initiate shutdown of all instances under given UCI + cnt = 0 + stopped = [] + notStopped = [] + for r in rl: + for inst in r.instances: + log.debug( "Sending stop signal to instance '%s' associated with reservation '%s'." % ( inst, r ) ) + inst.stop() + uci_wrapper.set_stop_time( datetime.utcnow(), i_id=inst.id ) + uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() ) + stopped.append( inst ) + +# uci_wrapper.change_state( uci_state='available' ) + uci_wrapper.reset_uci_launch_time() + +# # Wait for all instances to actually terminate and update local DB +# terminated=0 +# while terminated!=len( rl ): +# for i, r in enumerate( rl ): +# log.debug( "r state: %s" % r.instances[0].state ) +# state = r.instances[0].update() +# if state=='terminated': +# uci.instance[i].state = state +# uci.instance[i].flush() +# terminated += 1 +# time.sleep ( 5 ) +# +# # Reset UCI state +# uci.state = 'available' +# uci.launch_time = None +# uci.flush() +# + log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() ) + + + +# dbInstances = get_instances( trans, uci ) #TODO: handle list! +# +# # Get actual cloud instance object +# cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) +# +# # TODO: Detach persistent storage volume(s) from instance and update volume data in local database +# stores = get_stores( trans, uci ) +# for i, store in enumerate( stores ): +# log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) ) +# mntDevice = store.device +# volStat = None +## Detaching volume does not work with Eucalyptus Public Cloud, so comment it out +## try: +## volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice ) +## except: +## log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id ) +# store.attach_time = None +# store.device = None +# store.i_id = None +# store.status = volStat +# log.debug ( '***** volume status: %s' % volStat ) +# +# +# # Stop the instance and update status in local database +# cloudInstance.stop() +# dbInstances.stop_time = datetime.utcnow() +# while cloudInstance.state != 'terminated': +# log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) +# time.sleep(3) +# cloudInstance.update() +# dbInstances.state = cloudInstance.state +# +# # Reset relevant UCI fields +# uci.state = 'available' +# uci.launch_time = None +# +# # Persist +# session = trans.sa_session +## session.save_or_update( stores ) +# session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable? +# session.save_or_update( uci ) +# session.flush() +# trans.log_event( "User stopped cloud instance '%s'" % uci.name ) +# trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) + + def update( self ): + """ + Runs a global status update on all storage volumes and all instances whose UCI is in + 'running', 'pending', or 'shutting-down' state. + Reason behind this method is to sync state of local DB and real-world resources + """ + log.debug( "Running general status update for EPC UCIs." ) + instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all() + for inst in instances: + log.debug( "Running general status update on instance '%s'" % inst.instance_id ) + self.updateInstance( inst ) + + stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all() + for store in stores: + log.debug( "Running general status update on store '%s'" % store.volume_id ) + self.updateStore( store ) + + def updateInstance( self, inst ): + + # Get credentials associated wit this instance + uci_id = inst.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Get connection + conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key ) + # Get reservations handle for given instance + rl= conn.get_all_instances( [inst.instance_id] ) + # Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query + # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. As a result, below code simply + # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference... + if len( rl ) == 0: + log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id ) + inst.state = 'terminated' + uci.state = 'available' + uci.launch_time = None + inst.flush() + uci.flush() + # Update instance status in local DB with info from cloud provider + for r in rl: + for i, cInst in enumerate( r.instances ): + s = cInst.update() + log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) ) + if s != inst.state: + inst.state = s + inst.flush() + if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available' + uci.state = 'available' + uci.flush() + if s != uci.state and s != 'terminated': + # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. + uci.state = s + uci.flush() + if cInst.public_dns_name != inst.public_dns: + inst.public_dns = cInst.public_dns_name + inst.flush() + if cInst.private_dns_name != inst.private_dns: + inst.private_dns = cInst.private_dns_name + inst.flush() + + def updateStore( self, store ): + # Get credentials associated wit this store + uci_id = store.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Get connection + conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key ) + # Get reservations handle for given store + vl = conn.get_all_volumes( [store.volume_id] ) +# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) ) + # Update store status in local DB with info from cloud provider + if store.status != vl[0].status: + store.status = vl[0].status + store.flush() + if store.i_id != vl[0].instance_id: + store.i_id = vl[0].instance_id + store.flush() + if store.attach_time != vl[0].attach_time: + store.attach_time = vl[0].attach_time + store.flush() + if store.device != vl[0].device: + store.device = vl[0].device + store.flush() + +# def updateUCI( self, uci ): +# """ +# Runs a global status update on all storage volumes and all instances that are +# associated with specified UCI +# """ +# conn = self.get_connection( uci ) +# +# # Update status of storage volumes +# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() +# vols = [] +# for v in vl: +# vols.append( v.volume_id ) +# try: +# volumes = conn.get_all_volumes( vols ) +# for i, v in enumerate( volumes ): +# uci.store[i].i_id = v.instance_id +# uci.store[i].status = v.status +# uci.store[i].device = v.device +# uci.store[i].flush() +# except: +# log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name ) +# pass +# +# # Update status of instances +# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() +# instanceList = [] +# for i in il: +# instanceList.append( i.instance_id ) +# log.debug( 'instanceList: %s' % instanceList ) +# try: +# reservations = conn.get_all_instances( instanceList ) +# for i, r in enumerate( reservations ): +# uci.instance[i].state = r.instances[0].update() +# log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) ) +# uci.state = uci.instance[i].state +# uci.instance[i].public_dns = r.instances[0].dns_name +# uci.instance[i].private_dns = r.instances[0].private_dns_name +# uci.instance[i].flush() +# uci.flush() +# except: +# log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name ) +# pass + + # --------- Helper methods ------------ + + def format_time( time ): + dict = {'T':' ', 'Z':''} + for i, j in dict.iteritems(): + time = time.replace(i, j) + return time + \ No newline at end of file diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/cloud/providers/eucalyptus.py --- a/lib/galaxy/cloud/providers/eucalyptus.py Wed Oct 14 19:20:11 2009 -0400 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Fri Oct 16 13:06:44 2009 -0400 @@ -22,7 +22,6 @@ """ STOP_SIGNAL = object() def __init__( self, app ): - log.debug( "Using eucalyptus as default cloud provider." ) self.zone = "epc" self.key_pair = "galaxy-keypair" self.queue = Queue() @@ -35,7 +34,7 @@ worker = threading.Thread( target=self.run_next ) worker.start() self.threads.append( worker ) - log.debug( "%d cloud workers ready", nworkers ) + log.debug( "%d eucalyptus cloud workers ready", nworkers ) def run_next( self ): """Run the next job, waiting until one is available if necessary""" @@ -65,12 +64,9 @@ def get_connection( self, uci_wrapper ): """ - Establishes EC2 connection using user's default credentials + Establishes eucalyptus cloud connection using user's credentials associated with given UCI """ - log.debug( '##### Establishing cloud connection' ) - # Amazon EC2 - #conn = EC2Connection( uci_wrapper.get_access_key(), uci_wrapper.get_secret_key() ) - + log.debug( '##### Establishing eucalyptus cloud connection' ) # Eucalyptus Public Cloud # TODO: Add option in Galaxy config file to specify these values (i.e., for locally managed Eucalyptus deployments) euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" ) @@ -141,32 +137,20 @@ """ conn = self.get_connection( uci_wrapper ) # Temporary code - need to ensure user selects zone at UCI creation time! - if uci_wrapper.get_store_availability_zone( 0 )=='': - log.info( "Availability zone for storage volume was not selected, using default zone: %s" % self.zone ) - uci_wrapper.set_store_availability_zone( 0, self.zone ) + if uci_wrapper.get_uci_availability_zone()=='': + log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone ) + uci_wrapper.set_store_availability_zone( self.zone ) #TODO: check if volume associated with UCI already exists (if server crashed for example) and don't recreate it - log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_store_availability_zone( 0 ) ) - vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_store_availability_zone( 0 ), snapshot=None ) - uci_wrapper.set_store_volume_id( 0, vol.id ) - - # Wait for a while to ensure volume was created -# vol_status = vol.status -# for i in range( 30 ): -# if vol_status is not "u'available'": -# log.debug( 'Updating volume status; current status: %s' % vol_status ) -# vol_status = vol.status -# time.sleep(3) -# if i is 29: -# log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) ) -# conn.delete_volume( vol.id ) -# uci.state = 'error' -# uci.flush() -# return + log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_uci_availability_zone() ) + # Because only 1 storage volume may be created at UCI config time, index of this storage volume in local Galaxy DB w.r.t + # current UCI is 0, so reference it in following methods + vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone(), snapshot=None ) + uci_wrapper.set_store_volume_id( 0, vol.id ) # EPC does not allow creation of storage volumes (it deletes one as soon as it is created, so manually set uci_state here) uci_wrapper.change_state( uci_state='available' ) - uci_wrapper.set_store_status( 0, vol.status ) + uci_wrapper.set_store_status( vol.id, vol.status ) def deleteUCI( self, uci_wrapper ): """ @@ -214,7 +198,7 @@ def startUCI( self, uci_wrapper ): """ - Starts an instance of named UCI on the cloud. + Starts instance(s) of given UCI on the cloud. """ conn = self.get_connection( uci_wrapper ) # @@ -250,11 +234,12 @@ if not uci_wrapper.uci_launch_time_set(): uci_wrapper.set_uci_launch_time( l_time ) uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) + # TODO: if more than a single instance will be started through single reservation, change this reference to element [0] i_id = str( reservation.instances[0]).split(":")[1] uci_wrapper.set_instance_id( i_index, i_id ) - s = reservation.instances[0].state # TODO: once more than a single instance will be started through single reservation, change this + s = reservation.instances[0].state uci_wrapper.change_state( s, i_id, s ) - log.debug( "UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) + log.debug( "Instance of UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) @@ -291,7 +276,7 @@ def stopUCI( self, uci_wrapper): """ - Stops all of cloud instances associated with named UCI. + Stops all of cloud instances associated with given UCI. """ conn = self.get_connection( uci_wrapper ) diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/web/controllers/cloud.py --- a/lib/galaxy/web/controllers/cloud.py Wed Oct 14 19:20:11 2009 -0400 +++ b/lib/galaxy/web/controllers/cloud.py Fri Oct 16 13:06:44 2009 -0400 @@ -14,7 +14,10 @@ from galaxy.workflow.modules import * from galaxy.model.mapping import desc from galaxy.model.orm import * -from datetime import datetime +from datetime import datetime, timedelta + +pkg_resources.require( "WebHelpers" ) +from webhelpers import * # Required for Cloud tab import galaxy.eggs @@ -116,7 +119,7 @@ @web.expose @web.require_login( "start Galaxy cloud instance" ) - def start( self, trans, id, type='small' ): + def start( self, trans, id, type='m1.small' ): """ Start a new cloud resource instance """ @@ -311,28 +314,39 @@ Configure and add new cloud instance to user's instance pool """ inst_error = vol_error = cred_error = None + error = {} user = trans.get_user() # TODO: Hack until present user w/ bullet list w/ registered credentials - storedCreds = trans.sa_session.query( model.CloudUserCredentials ) \ - .filter_by( user=user ).all() - credsMatch = False - for cred in storedCreds: - if cred.name == credName: - credsMatch = True + storedCreds = trans.sa_session.query( model.CloudUserCredentials ).filter_by( user=user ).all() + if len( storedCreds ) == 0: + return trans.show_error_message( "You must register credentials before configuring a Galaxy instance." ) + + providersToZones = {} + for storedCred in storedCreds: + if storedCred.provider_name == 'ec2': + ec2_zones = ['us-east-1a', 'us-east-1b', 'us-east-1c', 'us-east-1d'] + providersToZones[storedCred.name] = ec2_zones + elif storedCred.provider_name == 'eucalyptus': + providersToZones[storedCred.name] = ['epc'] if instanceName: # Create new user configured instance try: - if len( instanceName ) > 255: - inst_error = "Instance name exceeds maximum allowable length." - elif trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first(): - inst_error = "An instance with that name already exist." - elif int( volSize ) > 1000: - vol_error = "Volume size cannot exceed 1000GB. You must specify an integer between 1 and 1000." - elif int( volSize ) < 1: - vol_error = "Volume size cannot be less than 1GB. You must specify an integer between 1 and 1000." - elif not credsMatch: - cred_error = "You specified unknown credentials." + if trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first(): + error['inst_error'] = "An instance with that name already exist." + elif instanceName=='' or len( instanceName ) > 255: + error['inst_error'] = "Instance name must be between 1 and 255 characters long." + elif credName=='': + error['cred_error'] = "You must select credentials." + elif volSize == '': + error['vol_error'] = "You must specify volume size as an integer value between 1 and 1000." + elif ( int( volSize ) < 1 ) or ( int( volSize ) > 1000 ): + error['vol_error'] = "Volume size must be integer value between 1 and 1000." +# elif type( volSize ) != type( 1 ): # Check if volSize is int +# log.debug( "volSize='%s'" % volSize ) +# error['vol_error'] = "Volume size must be integer value between 1 and 1000." + elif zone=='': + error['zone_error'] = "You must select zone where this UCI will be registered." else: # Capture user configured instance information uci = model.UCI() @@ -341,7 +355,8 @@ trans.app.model.CloudUserCredentials.table.c.name==credName ).first() uci.user= user uci.total_size = volSize # This is OK now because new instance is being created. - uci.state = "newUCI" + uci.state = "newUCI" + storage = model.CloudStore() storage.user = user storage.uci = uci @@ -361,6 +376,16 @@ except AttributeError, ae: inst_error = "No registered cloud images. You must contact administrator to add some before proceeding." log.debug("AttributeError: %s " % str( ae ) ) + + #TODO: based on user credentials (i.e., provider) selected, zone options will be different (e.g., EC2: us-east-1a vs EPC: epc) + + return trans.fill_template( "cloud/configure_uci.mako", + instanceName = instanceName, + credName = storedCreds, + volSize = volSize, + zone = zone, + error = error, + providersToZones = providersToZones ) return trans.show_form( web.FormBuilder( web.url_for(), "Configure new instance", submit_text="Add" ) @@ -445,9 +470,9 @@ elif ( ( providerName.lower()!='ec2' ) and ( providerName.lower()!='eucalyptus' ) ): error['provider_error'] = "You specified an unsupported cloud provider." elif accessKey=='' or len( accessKey ) > 255: - error['access_key_error'] = "Access key much be between 1 and 255 characters long." + error['access_key_error'] = "Access key must be between 1 and 255 characters long." elif secretKey=='' or len( secretKey ) > 255: - error['secret_key_error'] = "Secret key much be between 1 and 255 characters long." + error['secret_key_error'] = "Secret key must be between 1 and 255 characters long." else: # Create new user stored credentials credentials = model.CloudUserCredentials() @@ -523,7 +548,7 @@ # Display the management page trans.set_message( "Credentials '%s' deleted." % stored.name ) return self.list( trans ) - + @web.expose @web.require_login( "edit workflows" ) def editor( self, trans, id=None ): @@ -965,7 +990,23 @@ ids_in_menu=ids_in_menu ) ## ---- Utility methods ------------------------------------------------------- + +def get_UCIs_state( trans ): + user = trans.get_user() + instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.state != "deleted" ).all() + insd = {} # instance name-state dict + for inst in instances: + insd[inst.name] = inst.state + +def get_UCIs_time_ago( trans ): + user = trans.get_user() + instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).all() + intad = {} # instance name-time-ago dict + for inst in instances: + if inst.launch_time != None: + intad[inst.name] = str(date.distance_of_time_in_words (inst.launch_time, date.datetime.utcnow() ) ) + def get_stored_credentials( trans, id, check_ownership=True ): """ Get StoredUserCredentials from the database by id, verifying ownership. @@ -1020,7 +1061,7 @@ # Looks good return live -def get_mi( trans, size='small' ): +def get_mi( trans, size='m1.small' ): """ Get appropriate machine image (mi) based on instance size. TODO: Dummy method - need to implement logic diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/web/framework/helpers/__init__.py --- a/lib/galaxy/web/framework/helpers/__init__.py Wed Oct 14 19:20:11 2009 -0400 +++ b/lib/galaxy/web/framework/helpers/__init__.py Fri Oct 16 13:06:44 2009 -0400 @@ -4,6 +4,7 @@ from webhelpers import * from datetime import datetime, timedelta +from galaxy.util.json import to_json_string # If the date is more than one week ago, then display the actual date instead of in words def time_ago( x ): diff -r 6aab50510e43 -r 9881b0df3252 templates/cloud/add_credentials.mako --- a/templates/cloud/add_credentials.mako Wed Oct 14 19:20:11 2009 -0400 +++ b/templates/cloud/add_credentials.mako Fri Oct 16 13:06:44 2009 -0400 @@ -28,7 +28,7 @@ <div class="${cls}"> <label>Credentials name:</label> <div class="form-row-input"> - <input type="text" name="credName" value="Unnamed credentials" size="40"> + <input type="text" name="credName" value="${credName}" size="40"> </div> %if error.has_key('cred_error'): <div class="form-row-error-message">${error['cred_error']}</div> @@ -45,7 +45,7 @@ <div class="${cls}"> <label>Cloud provider name:</label> <div class="form-row-input"> - <select name="providerName"> + <select name="providerName" style="width:40em"> <option value="ec2">Amazon EC2</option> <option value="eucalyptus">Eucalpytus Public Cloud (EPC)</option> </select> @@ -64,7 +64,7 @@ <div class="${cls}"> <label>Access key:</label> <div class="form-row-input"> - <input type="text" name="accessKey" value="" size="40"> + <input type="text" name="accessKey" value="${accessKey}" size="40"> </div> %if error.has_key('access_key_error'): <div class="form-row-error-message">${error['access_key_error']}</div> @@ -81,7 +81,7 @@ <div class="${cls}"> <label>Secret key:</label> <div class="form-row-input"> - <input type="password" name="secretKey" value="" size="40"> + <input type="password" name="secretKey" value="${secretKey}" size="40"> </div> %if error.has_key('secret_key_error'): <div class="form-row-error-message">${error['secret_key_error']}</div> diff -r 6aab50510e43 -r 9881b0df3252 templates/cloud/configure_uci.mako --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/templates/cloud/configure_uci.mako Fri Oct 16 13:06:44 2009 -0400 @@ -0,0 +1,116 @@ +<% _=n_ %> +<%inherit file="/base.mako"/> +<%def name="title()">Configure new UCI</%def> + +<%def name="javascripts()"> +${parent.javascripts()} +<script type="text/javascript"> + +var providers_zones = ${h.to_json_string(providersToZones)}; + +$(function(){ + $("input:text:first").focus(); + + $("#credName").change(function() { + var zones = providers_zones[ $(this).val() ]; + var zones_select = $("#zones"); + + zones_select.children().remove(); + + for (var i in zones) { + var zone = zones[i]; + var new_option = $('<option value="' + zone + '">' + zone + '</option>'); + new_option.appendTo(zones_select); + } + + }); +}) +</script> +</%def> + +%if header: + ${header} +%endif + +<div class="form"> + <div class="form-title">Configure new Galaxy instance</div> + <div class="form-body"> + <form name="Configure new UCI" action="${h.url_for( action='configureNew' )}" method="post" > + + <% + cls = "form-row" + if error.has_key('inst_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Instance name:</label> + <div class="form-row-input"> + <input type="text" name="instanceName" value="${instanceName}" size="40"> + </div> + %if error.has_key('inst_error'): + <div class="form-row-error-message">${error['inst_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + <% + cls = "form-row" + if error.has_key('cred_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Name of registered credentials to use:</label> + <div class="form-row-input"> + <select id="credName" name="credName" style="width:40em"> + <option value="">Select Credential...</option> + % for cred in credName: + <option value="${cred.name}">${cred.name}</option> + %endfor + </select> + </div> + %if error.has_key('cred_error'): + <div class="form-row-error-message">${error['cred_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + + <% + cls = "form-row" + if error.has_key('vol_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Permanent storage size (1-1000GB)<br/>NOTE: you will be able to add more storage later:</label> + <div class="form-row-input"> + <input type="text" name="volSize" value="${volSize}" size="40"> + </div> + %if error.has_key('vol_error'): + <div class="form-row-error-message">${error['vol_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + <% + cls = "form-row" + if error.has_key('zone_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Zone to create storage in</label> + <div class="form-row-input"> + <select id="zones" name="zone" style="width:40em"> + </select> + </div> + %if error.has_key('zone_error'): + <div class="form-row-error-message">${error['zone_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + + <div class="form-row"><input type="submit" value="Add"></div> + + </form> + </div> +</div>
participants (1)
-
Greg Von Kuster