details: http://www.bx.psu.edu/hg/galaxy/rev/be631ed97541
changeset: 3077:be631ed97541
user: Enis Afgan <afgane(a)gmail.com>
date: Thu Oct 29 11:46:43 2009 -0400
description:
Cleaned up code a bit (i.e., deleted temporary code/comments).
diffstat:
lib/galaxy/cloud/__init__.py | 701 +------------------------------------
lib/galaxy/cloud/providers/ec2.py | 93 +----
lib/galaxy/cloud/providers/eucalyptus.py | 100 +----
lib/galaxy/web/controllers/cloud.py | 235 ------------
4 files changed, 23 insertions(+), 1106 deletions(-)
diffs (1457 lines):
diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/cloud/__init__.py
--- a/lib/galaxy/cloud/__init__.py Wed Oct 28 16:35:36 2009 -0400
+++ b/lib/galaxy/cloud/__init__.py Thu Oct 29 11:46:43 2009 -0400
@@ -47,40 +47,11 @@
self.provider = CloudProvider( app )
# Monitor for updating status of cloud instances
self.cloud_monitor = CloudMonitor( self.app, self.provider )
-# self.job_stop_queue = JobStopQueue( app, self.dispatcher )
else:
self.job_queue = self.job_stop_queue = NoopCloudMonitor()
def shutdown( self ):
self.cloud_monitor.shutdown()
-# self.job_stop_queue.shutdown()
-
-# def createUCI( self, user, name, storage_size, zone=None):
-# """
-# Createse User Configured Instance (UCI). Essentially, creates storage volume.
-# """
-# self.provider.createUCI( user, name, storage_size, zone )
-#
-# def deleteUCI( self, name ):
-# """
-# Deletes UCI. NOTE that this implies deletion of any and all data associated
-# with this UCI from the cloud. All data will be deleted.
-# """
-#
-# def addStorageToUCI( self, name ):
-# """ Adds more storage to specified UCI """
-#
-# def startUCI( self, name, type ):
-# """
-# Starts an instance of named UCI on the cloud. This implies, mounting of
-# storage and starting Galaxy instance.
-# """
-#
-# def stopUCI( self, name ):
-# """
-# Stops cloud instance associated with named UCI. This also implies
-# stopping of Galaxy and unmounting of the file system.
-# """
class Sleeper( object ):
"""
@@ -110,9 +81,7 @@
# Keep track of the pid that started the cloud manager, only it
# has valid threads
self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
-# self.queue = Queue()
-
+
# Contains requests that are waiting (only use from monitor thread)
self.waiting = []
@@ -123,33 +92,6 @@
self.monitor_thread = threading.Thread( target=self.__monitor )
self.monitor_thread.start()
log.info( "Cloud manager started" )
-# if app.config.get_bool( 'enable_job_recovery', True ):
-# self.__check_jobs_at_startup()
-
- def __check_jobs_at_startup( self ):
- """
- Checks all jobs that are in the 'new', 'queued' or 'running' state in
- the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
- """
- model = self.app.model
- for job in model.Job.filter( model.Job.c.state==model.Job.states.NEW ).all():
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
- else:
- log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
- self.queue.put( ( job.id, job.tool_id ) )
- for job in model.Job.filter( (model.Job.c.state == model.Job.states.RUNNING) | (model.Job.c.state == model.Job.states.QUEUED) ).all():
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
- elif job.job_runner_name is None:
- log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
- self.queue.put( ( job.id, job.tool_id ) )
- else:
- job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self )
- self.dispatcher.recover( job, job_wrapper )
def __monitor( self ):
"""
@@ -164,7 +106,7 @@
try:
# log.debug( "Calling monitor_step" )
self.__monitor_step()
- if cnt%30 == 0: # Run global update every 30 seconds (1 minute)
+ if cnt%30 == 0: # Run global update every 30 iterations (1 minute)
self.provider.update()
cnt = 0
except:
@@ -186,166 +128,24 @@
"""
# Get an orm (object relational mapping) session
session = mapping.Session()
- # Pull all new jobs from the queue at once
new_requests = []
-# new_instances = []
-# new_UCIs = []
-# stop_UCIs = []
-# delete_UCIs = []
-
-# for r in session.query( model.cloud_instance ).filter( model.cloud_instance.s.state == model.cloud_instance.states.NEW ).all():
-# new_instances
-
+
for r in session.query( model.UCI ) \
- .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, #"newUCI",
- model.UCI.c.state==uci_states.SUBMITTED_UCI, #"submittedUCI",
- model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, #"shutting-downUCI",
+ .filter( or_( model.UCI.c.state==uci_states.NEW_UCI,
+ model.UCI.c.state==uci_states.SUBMITTED_UCI,
+ model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI,
model.UCI.c.state==uci_states.DELETING_UCI ) ) \
.all():
uci_wrapper = UCIwrapper( r )
new_requests.append( uci_wrapper )
-# log.debug( 'new_requests: %s' % new_requests )
+
for uci_wrapper in new_requests:
session.clear()
-# log.debug( 'r.name: %s, state: %s' % ( r.name, r.state ) )
-# session.save_or_update( r )
-# session.flush()
self.provider.put( uci_wrapper )
# Done with the session
mapping.Session.remove()
-
-# for r in session.query( model.UCI ).filter( model.UCI.c.state == "submitted" ).all():
-# new_instances.append( r )
-# for r in new_instances:
-# self.provider.startUCI( r )
-#
-# for r in session.query( model.UCI ).filter( model.UCI.c.state == "shutting-down" ).all():
-# stop_UCIs.append( r )
-# for r in stop_UCIs:
-# self.provider.stopUCI( r )
-#
-# for r in session.query( model.UCI ).filter( model.UCI.c.state == "deleting" ).all():
-# delete_UCIs.append( r )
-# for r in delete_UCIs:
-# self.provider.deleteUCI( r )
-
-
-
-# if self.track_jobs_in_database:
-# for j in session.query( model.Job ).options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ).filter( model.Job.c.state == model.Job.states.NEW ).all():
-# job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self )
-# new_jobs.append( job )
-# else:
-# try:
-# while 1:
-# message = self.queue.get_nowait()
-# if message is self.STOP_SIGNAL:
-# return
-# # Unpack the message
-# job_id, tool_id = message
-# # Create a job wrapper from it
-# job_entity = session.query( model.Job ).get( job_id )
-# job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self )
-# # Append to watch queue
-# new_jobs.append( job )
-# except Empty:
-# pass
-# # Iterate over new and waiting jobs and look for any that are
-# # ready to run
-# new_waiting = []
-# for job in ( new_jobs + self.waiting ):
-# try:
-# # Clear the session for each job so we get fresh states for
-# # job and all datasets
-# session.clear()
-# # Get the real job entity corresponding to the wrapper (if we
-# # are tracking in the database this is probably cached in
-# # the session from the origianl query above)
-# job_entity = session.query( model.Job ).get( job.job_id )
-# # Check the job's dependencies, requeue if they're not done
-# job_state = self.__check_if_ready_to_run( job, job_entity )
-# if job_state == JOB_WAIT:
-# if not self.track_jobs_in_database:
-# new_waiting.append( job )
-# elif job_state == JOB_ERROR:
-# log.info( "job %d ended with an error" % job.job_id )
-# elif job_state == JOB_INPUT_ERROR:
-# log.info( "job %d unable to run: one or more inputs in error state" % job.job_id )
-# elif job_state == JOB_INPUT_DELETED:
-# log.info( "job %d unable to run: one or more inputs deleted" % job.job_id )
-# elif job_state == JOB_READY:
-# # If special queuing is enabled, put the ready jobs in the special queue
-# if self.use_policy :
-# self.squeue.put( job )
-# log.debug( "job %d put in policy queue" % job.job_id )
-# else: # or dispatch the job directly
-# self.dispatcher.put( job )
-# log.debug( "job %d dispatched" % job.job_id)
-# elif job_state == JOB_DELETED:
-# msg = "job %d deleted by user while still queued" % job.job_id
-# job.info = msg
-# log.debug( msg )
-# elif job_state == JOB_ADMIN_DELETED:
-# job.fail( job_entity.info )
-# log.info( "job %d deleted by admin while still queued" % job.job_id )
-# else:
-# msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id )
-# job.info = msg
-# log.error( msg )
-# except Exception, e:
-# job.info = "failure running job %d: %s" % ( job.job_id, str( e ) )
-# log.exception( "failure running job %d" % job.job_id )
-# # Update the waiting list
-# self.waiting = new_waiting
-# # If special (e.g. fair) scheduling is enabled, dispatch all jobs
-# # currently in the special queue
-# if self.use_policy :
-# while 1:
-# try:
-# sjob = self.squeue.get()
-# self.dispatcher.put( sjob )
-# log.debug( "job %d dispatched" % sjob.job_id )
-# except Empty:
-# # squeue is empty, so stop dispatching
-# break
-# except Exception, e: # if something else breaks while dispatching
-# job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) )
-# log.exception( "failure running job %d" % sjob.job_id )
-# # Done with the session
-# mapping.Session.remove()
-
- def __check_if_ready_to_run( self, job_wrapper, job ):
- """
- Check if a job is ready to run by verifying that each of its input
- datasets is ready (specifically in the OK state). If any input dataset
- has an error, fail the job and return JOB_INPUT_ERROR. If any input
- dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all
- input datasets are in OK state, return JOB_READY indicating that the
- job can be dispatched. Otherwise, return JOB_WAIT indicating that input
- datasets are still being prepared.
- """
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- job_wrapper.fail( "input data %d is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
- return JOB_READY
-
+
def put( self, job_id, tool ):
"""Add a job to the queue (by job identifier)"""
if not self.track_jobs_in_database:
@@ -355,13 +155,11 @@
def shutdown( self ):
"""Attempts to gracefully shut down the worker thread"""
if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
+ # We're not the real queue, do nothing
return
else:
log.info( "sending stop signal to worker thread" )
self.running = False
-# if not self.track_jobs_in_database:
-# self.queue.put( self.STOP_SIGNAL )
self.sleeper.wake()
log.info( "cloud manager stopped" )
self.dispatcher.shutdown()
@@ -649,11 +447,6 @@
def get_all_stores( self ):
""" Returns all storage volumes' database objects associated with this UCI. """
return model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all()
-# svs = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all()
-# svl = [] # storage volume list
-# for sv in svs:
-# svl.append( sv.volume_id )
-# return svl
def get_uci( self ):
""" Returns database object for given UCI. """
@@ -679,336 +472,6 @@
uci.state = 'deleted' # for bookkeeping reasons, mark as deleted but don't actually delete.
uci.flush()
-#class JobWrapper( object ):
-# """
-# Wraps a 'model.Job' with convience methods for running processes and
-# state management.
-# """
-# def __init__(self, job, tool, queue ):
-# self.job_id = job.id
-# # This is immutable, we cache it for the scheduling policy to use if needed
-# self.session_id = job.session_id
-# self.tool = tool
-# self.queue = queue
-# self.app = queue.app
-# self.extra_filenames = []
-# self.command_line = None
-# self.galaxy_lib_dir = None
-# # With job outputs in the working directory, we need the working
-# # directory to be set before prepare is run, or else premature deletion
-# # and job recovery fail.
-# self.working_directory = \
-# os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
-# self.output_paths = None
-# self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally
-#
-# def get_param_dict( self ):
-# """
-# Restore the dictionary of parameters from the database.
-# """
-# job = model.Job.get( self.job_id )
-# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] )
-# param_dict = self.tool.params_from_strings( param_dict, self.app )
-# return param_dict
-#
-# def prepare( self ):
-# """
-# Prepare the job to run by creating the working directory and the
-# config files.
-# """
-# mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner
-# if not os.path.exists( self.working_directory ):
-# os.mkdir( self.working_directory )
-# # Restore parameters from the database
-# job = model.Job.get( self.job_id )
-# incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] )
-# incoming = self.tool.params_from_strings( incoming, self.app )
-# # Do any validation that could not be done at job creation
-# self.tool.handle_unvalidated_param_values( incoming, self.app )
-# # Restore input / output data lists
-# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
-# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
-# # These can be passed on the command line if wanted as $userId $userEmail
-# if job.history.user: # check for anonymous user!
-# userId = '%d' % job.history.user.id
-# userEmail = str(job.history.user.email)
-# else:
-# userId = 'Anonymous'
-# userEmail = 'Anonymous'
-# incoming['userId'] = userId
-# incoming['userEmail'] = userEmail
-# # Build params, done before hook so hook can use
-# param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory )
-# # Certain tools require tasks to be completed prior to job execution
-# # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
-# if self.tool.tool_type is not None:
-# out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict )
-# # Run the before queue ("exec_before_job") hook
-# self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data,
-# out_data=out_data, tool=self.tool, param_dict=incoming)
-# mapping.context.current.flush()
-# # Build any required config files
-# config_filenames = self.tool.build_config_files( param_dict, self.working_directory )
-# # FIXME: Build the param file (might return None, DEPRECATED)
-# param_filename = self.tool.build_param_file( param_dict, self.working_directory )
-# # Build the job's command line
-# self.command_line = self.tool.build_command_line( param_dict )
-# # FIXME: for now, tools get Galaxy's lib dir in their path
-# if self.command_line and self.command_line.startswith( 'python' ):
-# self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root
-# # We need command_line persisted to the db in order for Galaxy to re-queue the job
-# # if the server was stopped and restarted before the job finished
-# job.command_line = self.command_line
-# job.flush()
-# # Return list of all extra files
-# extra_filenames = config_filenames
-# if param_filename is not None:
-# extra_filenames.append( param_filename )
-# self.param_dict = param_dict
-# self.extra_filenames = extra_filenames
-# return extra_filenames
-#
-# def fail( self, message, exception=False ):
-# """
-# Indicate job failure by setting state and message on all output
-# datasets.
-# """
-# job = model.Job.get( self.job_id )
-# job.refresh()
-# # if the job was deleted, don't fail it
-# if not job.state == model.Job.states.DELETED:
-# # Check if the failure is due to an exception
-# if exception:
-# # Save the traceback immediately in case we generate another
-# # below
-# job.traceback = traceback.format_exc()
-# # Get the exception and let the tool attempt to generate
-# # a better message
-# etype, evalue, tb = sys.exc_info()
-# m = self.tool.handle_job_failure_exception( evalue )
-# if m:
-# message = m
-# if self.app.config.outputs_to_working_directory:
-# for dataset_path in self.get_output_fnames():
-# try:
-# shutil.move( dataset_path.false_path, dataset_path.real_path )
-# log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
-# except ( IOError, OSError ), e:
-# log.error( "fail(): Missing output file in working directory: %s" % e )
-# for dataset_assoc in job.output_datasets:
-# dataset = dataset_assoc.dataset
-# dataset.refresh()
-# dataset.state = dataset.states.ERROR
-# dataset.blurb = 'tool error'
-# dataset.info = message
-# dataset.set_size()
-# dataset.flush()
-# job.state = model.Job.states.ERROR
-# job.command_line = self.command_line
-# job.info = message
-# job.flush()
-# # If the job was deleted, just clean up
-# self.cleanup()
-#
-# def change_state( self, state, info = False ):
-# job = model.Job.get( self.job_id )
-# job.refresh()
-# for dataset_assoc in job.output_datasets:
-# dataset = dataset_assoc.dataset
-# dataset.refresh()
-# dataset.state = state
-# if info:
-# dataset.info = info
-# dataset.flush()
-# if info:
-# job.info = info
-# job.state = state
-# job.flush()
-#
-# def get_state( self ):
-# job = model.Job.get( self.job_id )
-# job.refresh()
-# return job.state
-#
-# def set_runner( self, runner_url, external_id ):
-# job = model.Job.get( self.job_id )
-# job.refresh()
-# job.job_runner_name = runner_url
-# job.job_runner_external_id = external_id
-# job.flush()
-#
-# def finish( self, stdout, stderr ):
-# """
-# Called to indicate that the associated command has been run. Updates
-# the output datasets based on stderr and stdout from the command, and
-# the contents of the output files.
-# """
-# # default post job setup
-# mapping.context.current.clear()
-# job = model.Job.get( self.job_id )
-# # if the job was deleted, don't finish it
-# if job.state == job.states.DELETED:
-# self.cleanup()
-# return
-# elif job.state == job.states.ERROR:
-# # Job was deleted by an administrator
-# self.fail( job.info )
-# return
-# if stderr:
-# job.state = "error"
-# else:
-# job.state = 'ok'
-# if self.app.config.outputs_to_working_directory:
-# for dataset_path in self.get_output_fnames():
-# try:
-# shutil.move( dataset_path.false_path, dataset_path.real_path )
-# log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
-# except ( IOError, OSError ):
-# self.fail( "Job %s's output dataset(s) could not be read" % job.id )
-# return
-# for dataset_assoc in job.output_datasets:
-# #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur
-# for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running
-# dataset.blurb = 'done'
-# dataset.peek = 'no peek'
-# dataset.info = stdout + stderr
-# dataset.set_size()
-# if stderr:
-# dataset.blurb = "error"
-# elif dataset.has_data():
-# #if a dataset was copied, it won't appear in our dictionary:
-# #either use the metadata from originating output dataset, or call set_meta on the copies
-# #it would be quicker to just copy the metadata from the originating output dataset,
-# #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
-# if not self.external_output_metadata.external_metadata_set_successfully( dataset ):
-# # Only set metadata values if they are missing...
-# dataset.set_meta( overwrite = False )
-# else:
-# #load metadata from file
-# #we need to no longer allow metadata to be edited while the job is still running,
-# #since if it is edited, the metadata changed on the running output will no longer match
-# #the metadata that was stored to disk for use via the external process,
-# #and the changes made by the user will be lost, without warning or notice
-# dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out )
-# if self.tool.is_multi_byte:
-# dataset.set_multi_byte_peek()
-# else:
-# dataset.set_peek()
-# else:
-# dataset.blurb = "empty"
-# dataset.flush()
-# if stderr:
-# dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
-# else:
-# dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
-# dataset_assoc.dataset.dataset.flush()
-#
-# # Save stdout and stderr
-# if len( stdout ) > 32768:
-# log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
-# job.stdout = stdout[:32768]
-# if len( stderr ) > 32768:
-# log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
-# job.stderr = stderr[:32768]
-# # custom post process setup
-# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
-# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
-# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows
-# param_dict = self.tool.params_from_strings( param_dict, self.app )
-# # Check for and move associated_files
-# self.tool.collect_associated_files(out_data, self.working_directory)
-# # Create generated output children and primary datasets and add to param_dict
-# collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)}
-# param_dict.update({'__collected_datasets__':collected_datasets})
-# # Certain tools require tasks to be completed after job execution
-# # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ).
-# if self.tool.tool_type is not None:
-# self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job )
-# # Call 'exec_after_process' hook
-# self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data,
-# out_data=out_data, param_dict=param_dict,
-# tool=self.tool, stdout=stdout, stderr=stderr )
-# # TODO
-# # validate output datasets
-# job.command_line = self.command_line
-# mapping.context.current.flush()
-# log.debug( 'job %d ended' % self.job_id )
-# self.cleanup()
-#
-# def cleanup( self ):
-# # remove temporary files
-# try:
-# for fname in self.extra_filenames:
-# os.remove( fname )
-# if self.working_directory is not None:
-# shutil.rmtree( self.working_directory )
-# if self.app.config.set_metadata_externally:
-# self.external_output_metadata.cleanup_external_metadata()
-# except:
-# log.exception( "Unable to cleanup job %d" % self.job_id )
-#
-# def get_command_line( self ):
-# return self.command_line
-#
-# def get_session_id( self ):
-# return self.session_id
-#
-# def get_input_fnames( self ):
-# job = model.Job.get( self.job_id )
-# filenames = []
-# for da in job.input_datasets: #da is JobToInputDatasetAssociation object
-# if da.dataset:
-# filenames.append( da.dataset.file_name )
-# #we will need to stage in metadata file names also
-# #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.)
-# for key, value in da.dataset.metadata.items():
-# if isinstance( value, model.MetadataFile ):
-# filenames.append( value.file_name )
-# return filenames
-#
-# def get_output_fnames( self ):
-# if self.output_paths is not None:
-# return self.output_paths
-#
-# class DatasetPath( object ):
-# def __init__( self, real_path, false_path = None ):
-# self.real_path = real_path
-# self.false_path = false_path
-# def __str__( self ):
-# if self.false_path is None:
-# return self.real_path
-# else:
-# return self.false_path
-#
-# job = model.Job.get( self.job_id )
-# if self.app.config.outputs_to_working_directory:
-# self.output_paths = []
-# for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]:
-# false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
-# self.output_paths.append( DatasetPath( data.file_name, false_path ) )
-# else:
-# self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ]
-# return self.output_paths
-#
-# def check_output_sizes( self ):
-# sizes = []
-# output_paths = self.get_output_fnames()
-# for outfile in [ str( o ) for o in output_paths ]:
-# sizes.append( ( outfile, os.stat( outfile ).st_size ) )
-# return sizes
-# def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ):
-# if tmp_dir is None:
-# #this dir should should relative to the exec_dir
-# tmp_dir = self.app.config.new_file_path
-# if dataset_files_path is None:
-# dataset_files_path = self.app.model.Dataset.file_path
-# if config_root is None:
-# config_root = self.app.config.root
-# if datatypes_config is None:
-# datatypes_config = self.app.config.datatypes_config
-# job = model.Job.get( self.job_id )
-# return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds )
-
class CloudProvider( object ):
def __init__( self, app ):
import providers.eucalyptus
@@ -1018,63 +481,11 @@
self.cloud_provider = {}
self.cloud_provider["eucalyptus"] = providers.eucalyptus.EucalyptusCloudProvider( app )
self.cloud_provider["ec2"] = providers.ec2.EC2CloudProvider( app )
-
-# start_cloud_provider = None
-# if app.config.start_job_runners is not None:
-# start_cloud_provider.extend( app.config.start_job_runners.split(",") )
-# for provider_name in start_cloud_provider:
-# self.provider_name = app.config.cloud_provider
-# if self.provider_name == "eucalyptus":
-# import providers.eucalyptus
-# self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app )
-# elif self.provider_name == "ec2":
-# import providers.ec2
-# self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app )
-# else:
-# log.error( "Unable to start unknown cloud provider: %s" %self.provider_name )
-
+
def put( self, uci_wrapper ):
""" Put given request for UCI manipulation into provider's request queue."""
-# log.debug( "Adding UCI '%s' manipulation request into cloud manager's queue." % uci_wrapper.name )
self.cloud_provider[uci_wrapper.get_provider_type()].put( uci_wrapper )
-
-
-
- def createUCI( self, uci ):
- """
- Createse User Configured Instance (UCI). Essentially, creates storage volume.
- """
- log.debug( "Creating UCI '%s'" % uci.name )
- self.cloud_provider[self.provider_name].createUCI( uci )
-
- def deleteUCI( self, uci ):
- """
- Deletes UCI. NOTE that this implies deletion of any and all data associated
- with this UCI from the cloud. All data will be deleted.
- """
- log.debug( "Deleting UCI '%s'" % uci.name )
- self.cloud_provider[self.provider_name].deleteUCI( uci )
-
- def addStorageToUCI( self, uci ):
- """ Adds more storage to specified UCI """
-
- def startUCI( self, uci ):
- """
- Starts an instance of named UCI on the cloud. This implies, mounting of
- storage and starting Galaxy instance.
- """
- log.debug( "Starting UCI '%s'" % uci.name )
- self.cloud_provider[self.provider_name].startUCI( uci )
-
- def stopUCI( self, uci ):
- """
- Stops cloud instance associated with named UCI. This also implies
- stopping of Galaxy and unmounting of the file system.
- """
- log.debug( "Stopping UCI '%s'" % uci.name )
- self.cloud_provider[self.provider_name].stopUCI( uci )
-
def update( self ):
"""
Runs a global status update across all providers for all UCIs in state other than 'terminated' and 'available'.
@@ -1084,102 +495,10 @@
# log.debug( "Running global update for provider: '%s'" % provider )
self.cloud_provider[provider].update()
- def recover( self, job, job_wrapper ):
- runner_name = ( job.job_runner_name.split(":", 1) )[0]
- log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
- self.cloud_provider[runner_name].recover( job, job_wrapper )
-
def shutdown( self ):
for runner in self.cloud_provider.itervalues():
runner.shutdown()
-class JobStopQueue( object ):
- """
- A queue for jobs which need to be terminated prematurely.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, dispatcher ):
- self.app = app
- self.dispatcher = dispatcher
-
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
-
- # Contains jobs that are waiting (only use from monitor thread)
- self.waiting = []
-
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( target=self.monitor )
- self.monitor_thread.start()
- log.info( "job stopper started" )
-
- def monitor( self ):
- """
- Continually iterate the waiting jobs, stop any that are found.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def monitor_step( self ):
- """
- Called repeatedly by `monitor` to stop jobs.
- """
- # Pull all new jobs from the queue at once
- jobs = []
- try:
- while 1:
- ( job_id, error_msg ) = self.queue.get_nowait()
- if job_id is self.STOP_SIGNAL:
- return
- # Append to watch queue
- jobs.append( ( job_id, error_msg ) )
- except Empty:
- pass
-
- for job_id, error_msg in jobs:
- job = model.Job.get( job_id )
- job.refresh()
- # if desired, error the job so we can inform the user.
- if error_msg is not None:
- job.state = job.states.ERROR
- job.info = error_msg
- else:
- job.state = job.states.DELETED
- job.flush()
- # if job is in JobQueue or FooJobRunner's put method,
- # job_runner_name will be unset and the job will be dequeued due to
- # state change above
- if job.job_runner_name is not None:
- # tell the dispatcher to stop the job
- self.dispatcher.stop( job )
-
- def put( self, job_id, error_msg=None ):
- self.queue.put( ( job_id, error_msg ) )
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- self.queue.put( ( self.STOP_SIGNAL, None ) )
- self.sleeper.wake()
- log.info( "job stopper stopped" )
-
class NoopCloudMonitor( object ):
"""
Implements the CloudMonitor interface but does nothing
diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/cloud/providers/ec2.py
--- a/lib/galaxy/cloud/providers/ec2.py Wed Oct 28 16:35:36 2009 -0400
+++ b/lib/galaxy/cloud/providers/ec2.py Thu Oct 29 11:46:43 2009 -0400
@@ -56,7 +56,6 @@
self.key_pair = "galaxy-keypair"
self.queue = Queue()
- #TODO: Use multiple threads to process requests?
self.threads = []
nworkers = 5
log.info( "Starting EC2 cloud controller workers" )
@@ -72,19 +71,17 @@
while 1:
uci_wrapper = self.queue.get()
-# uci = uci_wrapper.get_uci()
- log.debug( '[%d] uci type: %s' % ( cnt, uci_wrapper.get_name() ) )
uci_state = uci_wrapper.get_state()
if uci_state is self.STOP_SIGNAL:
return
try:
- if uci_state==uci_states.NEW: # "new":
+ if uci_state==uci_states.NEW:
self.createUCI( uci_wrapper )
- elif uci_state==uci_states.DELETING: #"deleting":
+ elif uci_state==uci_states.DELETING:
self.deleteUCI( uci_wrapper )
- elif uci_state==uci_states.SUBMITTED: #"submitted":
+ elif uci_state==uci_states.SUBMITTED:
self.startUCI( uci_wrapper )
- elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down":
+ elif uci_state==uci_states.SHUTTING_DOWN:
self.stopUCI( uci_wrapper )
except:
log.exception( "Uncaught exception executing request." )
@@ -113,7 +110,6 @@
try:
kp = conn.get_key_pair( self.key_pair )
for inst in instances:
-# log.debug("inst: '%s'" % inst )
uci_wrapper.set_key_pair( inst, kp.name )
return kp.name
except boto.exception.EC2ResponseError, e: # No keypair under this name exists so create it
@@ -135,22 +131,7 @@
For valid sizes, see http://aws.amazon.com/ec2/instance-types/
"""
return model.CloudImage.filter( model.CloudImage.table.c.id==2 ).first().image_id
-
-# def get_instances( self, uci ):
-# """
-# Get objects of instances that are pending or running and are connected to uci object
-# """
-# instances = trans.sa_session.query( model.CloudInstance ) \
-# .filter_by( user=user, uci_id=uci.id ) \
-# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \
-# .first()
-# #.all() #TODO: return all but need to edit calling method(s) to handle list
-#
-# instances = uci.instance
-#
-# return instances
-
-
+
def shutdown( self ):
"""Attempts to gracefully shut down the monitor thread"""
log.info( "sending stop signal to worker threads in EC2 cloud manager" )
@@ -170,7 +151,6 @@
and registers relevant information in Galaxy database.
"""
conn = self.get_connection( uci_wrapper )
- # Temporary code - need to ensure user selects zone at UCI creation time!
if uci_wrapper.get_uci_availability_zone()=='':
log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone )
uci_wrapper.set_store_availability_zone( self.zone )
@@ -262,12 +242,11 @@
for i_index in i_indexes:
mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) )
-# log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) )
uci_wrapper.set_mi( i_index, mi_id )
# Check if galaxy security group exists (and create it if it does not)
-# log.debug( '***** Setting up security group' )
security_group = 'galaxyWeb'
+ log.debug( "Setting up '%s' security group." % security_group )
sgs = conn.get_all_security_groups() # security groups
gsgt = False # galaxy security group test
for sg in sgs:
@@ -303,39 +282,6 @@
uci_wrapper.change_state( s, i_id, s )
log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) )
-
-
-# # Wait until instance gets running and then update the DB
-# while s!="running":
-# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) )
-# time.sleep( 15 )
-# s = reservation.i_indexes[0].update()
-#
-# # Update instance data in local DB
-# uci.instance[0].state = s
-# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name
-# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name
-# uci.instance[0].flush()
-# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state
-# # and does not connect or wait on connection between instance and volume to be established
-# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all()
-# vols = []
-# for v in vl:
-# vols.append( v.volume_id )
-# try:
-# volumes = conn.get_all_volumes( vols )
-# for i, v in enumerate( volumes ):
-# uci.store[i].i_id = v.instance_id
-# uci.store[i].status = v.status
-# uci.store[i].device = v.device
-# uci.store[i].flush()
-# except BotoServerError:
-# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." )
-#
-# uci.state = s
-# uci.flush()
-
-
def stopUCI( self, uci_wrapper):
"""
Stops all of cloud instances associated with given UCI.
@@ -344,14 +290,8 @@
# Get all instances associated with given UCI
il = uci_wrapper.get_instances_ids() # instance list
-# log.debug( 'List of instances being terminated: %s' % il )
rl = conn.get_all_instances( il ) # Reservation list associated with given instances
-
-# tState = conn.terminate_instances( il )
-# # TODO: Need to update instance stop time (for all individual instances)
-# stop_time = datetime.utcnow()
-# uci_wrapper.set_stop_time( stop_time )
-
+
# Initiate shutdown of all instances under given UCI
cnt = 0
stopped = []
@@ -364,26 +304,7 @@
uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() )
stopped.append( inst )
-# uci_wrapper.change_state( uci_state='available' )
uci_wrapper.reset_uci_launch_time()
-
-# # Wait for all instances to actually terminate and update local DB
-# terminated=0
-# while terminated!=len( rl ):
-# for i, r in enumerate( rl ):
-# log.debug( "r state: %s" % r.instances[0].state )
-# state = r.instances[0].update()
-# if state=='terminated':
-# uci.instance[i].state = state
-# uci.instance[i].flush()
-# terminated += 1
-# time.sleep ( 5 )
-#
-# # Reset UCI state
-# uci.state = 'available'
-# uci.launch_time = None
-# uci.flush()
-#
log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() )
diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/cloud/providers/eucalyptus.py
--- a/lib/galaxy/cloud/providers/eucalyptus.py Wed Oct 28 16:35:36 2009 -0400
+++ b/lib/galaxy/cloud/providers/eucalyptus.py Thu Oct 29 11:46:43 2009 -0400
@@ -56,7 +56,6 @@
self.key_pair = "galaxy-keypair"
self.queue = Queue()
- #TODO: Use multiple threads to process requests?
self.threads = []
nworkers = 5
log.info( "Starting eucalyptus cloud controller workers" )
@@ -72,19 +71,18 @@
while 1:
uci_wrapper = self.queue.get()
-# uci = uci_wrapper.get_uci()
log.debug( '[%d] uci type: %s' % ( cnt, uci_wrapper.get_name() ) )
uci_state = uci_wrapper.get_state()
if uci_state is self.STOP_SIGNAL:
return
try:
- if uci_state==uci_states.NEW: # "new":
+ if uci_state==uci_states.NEW:
self.createUCI( uci_wrapper )
- elif uci_state==uci_states.DELETING: #"deleting":
+ elif uci_state==uci_states.DELETING:
self.deleteUCI( uci_wrapper )
- elif uci_state==uci_states.SUBMITTED: #"submitted":
+ elif uci_state==uci_states.SUBMITTED:
self.startUCI( uci_wrapper )
- elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down":
+ elif uci_state==uci_states.SHUTTING_DOWN:
self.stopUCI( uci_wrapper )
except:
log.exception( "Uncaught exception executing request." )
@@ -134,22 +132,7 @@
"""
log.debug( "image id: '%s'" % model.CloudImage.get( 1 ).image_id )
return model.CloudImage.get( 1 ).image_id
-
-# def get_instances( self, uci ):
-# """
-# Get objects of instances that are pending or running and are connected to uci object
-# """
-# instances = trans.sa_session.query( model.CloudInstance ) \
-# .filter_by( user=user, uci_id=uci.id ) \
-# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \
-# .first()
-# #.all() #TODO: return all but need to edit calling method(s) to handle list
-#
-# instances = uci.instance
-#
-# return instances
-
-
+
def shutdown( self ):
"""Attempts to gracefully shut down the monitor thread"""
log.info( "sending stop signal to worker threads in eucalyptus cloud manager" )
@@ -169,7 +152,6 @@
and registers relevant information in Galaxy database.
"""
conn = self.get_connection( uci_wrapper )
- # Temporary code - need to ensure user selects zone at UCI creation time!
if uci_wrapper.get_uci_availability_zone()=='':
log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone )
uci_wrapper.set_store_availability_zone( self.zone )
@@ -245,21 +227,7 @@
mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) )
log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) )
uci_wrapper.set_mi( i_index, mi_id )
-
- # log.debug( '***** Setting up security group' )
- # If not existent, setup galaxy security group
- # try:
- # gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.')
- # gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port
- # gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
- # except:
- # pass
- # sgs = conn.get_all_security_groups()
- # for i in range( len( sgs ) ):
- # if sgs[i].name == "galaxy":
- # sg.append( sgs[i] )
- # break # only 1 security group w/ this type can exist, so continue
-
+
log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() )
log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) )
reservation = conn.run_instances( image_id=mi_id, key_name=uci_wrapper.get_key_pair_name( i_index ) )
@@ -276,39 +244,6 @@
uci_wrapper.change_state( s, i_id, s )
log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) )
-
-
-# # Wait until instance gets running and then update the DB
-# while s!="running":
-# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) )
-# time.sleep( 15 )
-# s = reservation.i_indexes[0].update()
-#
-# # Update instance data in local DB
-# uci.instance[0].state = s
-# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name
-# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name
-# uci.instance[0].flush()
-# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state
-# # and does not connect or wait on connection between instance and volume to be established
-# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all()
-# vols = []
-# for v in vl:
-# vols.append( v.volume_id )
-# try:
-# volumes = conn.get_all_volumes( vols )
-# for i, v in enumerate( volumes ):
-# uci.store[i].i_id = v.instance_id
-# uci.store[i].status = v.status
-# uci.store[i].device = v.device
-# uci.store[i].flush()
-# except BotoServerError:
-# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." )
-#
-# uci.state = s
-# uci.flush()
-
-
def stopUCI( self, uci_wrapper):
"""
Stops all of cloud instances associated with given UCI.
@@ -332,30 +267,9 @@
uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() )
stopped.append( inst )
-# uci_wrapper.change_state( uci_state='available' )
uci_wrapper.reset_uci_launch_time()
-
-# # Wait for all instances to actually terminate and update local DB
-# terminated=0
-# while terminated!=len( rl ):
-# for i, r in enumerate( rl ):
-# log.debug( "r state: %s" % r.instances[0].state )
-# state = r.instances[0].update()
-# if state=='terminated':
-# uci.instance[i].state = state
-# uci.instance[i].flush()
-# terminated += 1
-# time.sleep ( 5 )
-#
-# # Reset UCI state
-# uci.state = 'available'
-# uci.launch_time = None
-# uci.flush()
-#
log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() )
-
-
# dbInstances = get_instances( trans, uci ) #TODO: handle list!
#
# # Get actual cloud instance object
@@ -377,7 +291,6 @@
# store.i_id = None
# store.status = volStat
# log.debug ( '***** volume status: %s' % volStat )
-#
#
# # Stop the instance and update status in local database
# cloudInstance.stop()
@@ -495,7 +408,6 @@
region=euca_region,
path=uci.credentials.provider.path )# Get reservations handle for given store
vl = conn.get_all_volumes( [store.volume_id] )
-# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) )
# Update store status in local DB with info from cloud provider
if store.status != vl[0].status:
# In case something failed during creation of UCI but actual storage volume was created and yet
diff -r 022ac0f64679 -r be631ed97541 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Wed Oct 28 16:35:36 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Thu Oct 29 11:46:43 2009 -0400
@@ -12,7 +12,6 @@
from galaxy.util.odict import odict
from galaxy.util.bunch import Bunch
from galaxy.util.topsort import topsort, topsort_levels, CycleError
-from galaxy.workflow.modules import *
from galaxy.model.mapping import desc
from galaxy.model.orm import *
from datetime import datetime, timedelta
@@ -69,12 +68,6 @@
Render cloud main page (management of cloud resources)
"""
user = trans.get_user()
-# pendingInstances = trans.sa_session.query( model.UCI ) \
-# .filter_by( user=user, state="pending" ) \
-# .all()
-#
-# for i inupdate_in range( len ( pendingInstances ) ):
-# stance_state( trans, pendingInstances[i].id )
cloudCredentials = trans.sa_session.query( model.CloudUserCredentials ) \
.filter_by( user=user ) \
@@ -104,7 +97,6 @@
.all()
# Check after update there are instances in pending state; if so, display message
- # TODO: Auto-refresh once instance is running
pendingInstances = trans.sa_session.query( model.UCI ) \
.filter_by( user=user ) \
.filter( or_( model.UCI.c.state==uci_states.PENDING, #"pending" , \
@@ -128,7 +120,6 @@
liveInstances = liveInstances,
prevInstances = prevInstances )
- @web.expose
@web.require_login( "use Galaxy cloud" )
def makeDefault( self, trans, id=None ):
"""
@@ -337,8 +328,6 @@
inst_error = "No registered cloud images. You must contact administrator to add some before proceeding."
log.debug("AttributeError: %s " % str( ae ) )
- #TODO: based on user credentials (i.e., provider) selected, zone options will be different (e.g., EC2: us-east-1a vs EPC: epc)
-
return trans.fill_template( "cloud/configure_uci.mako",
instanceName = instanceName,
credName = storedCreds,
@@ -427,8 +416,6 @@
session.flush()
# Log and display the management page
trans.set_message( "Credential '%s' edited." % credentials.name )
-# if defaultCred:
-# self.makeDefault( trans, credentials.id)
return self.list( trans )
@web.expose
@@ -507,8 +494,6 @@
# Log and display the management page
trans.log_event( "User added new credentials" )
trans.set_message( "Credential '%s' created" % credentials.name )
-# if defaultCred:
-# self.makeDefault( trans, credentials.id)
return self.list( trans )
providers = trans.sa_session.query( model.CloudProvider ).filter_by( user=user ).all()
@@ -521,13 +506,6 @@
providers = providers
)
-# return trans.show_form(
-# web.FormBuilder( web.url_for(), "Add credentials", submit_text="Add" )
-# .add_text( "credName", "Credentials name", value="Unnamed credentials", error=cred_error )
-# .add_text( "providerName", "Cloud provider name", value="ec2 or eucalyptus", error=provider_error )
-# .add_text( "accessKey", "Access key", value='', error=accessKey_error )
-# .add_password( "secretKey", "Secret key", value='', error=secretKey_error ) )
-
@web.expose
@web.require_login( "view credentials" )
def view( self, trans, id=None ):
@@ -770,18 +748,6 @@
# Looks good
return stored
-def get_default_credentials( trans, check_ownership=True ):
- """
- Get a StoredUserCredntials from the database by 'default' setting, verifying ownership.
- """
- user = trans.get_user()
- # Load credentials from database
- stored = trans.sa_session.query( model.CloudUserCredentials ) \
- .filter_by (user=user, defaultCred=True) \
- .first()
-
- return stored
-
def get_uci( trans, id, check_ownership=True ):
"""
Get a UCI object from the database by id, verifying ownership.
@@ -891,204 +857,3 @@
# Actually, probably return key_pair to calling method and store name & key from there...
return key_pair.name
-
-def update_instance_state( trans, id ):
- """
- Update state of instances associated with given UCI id and store state in local database. Also update
- state of the given UCI.
- """
- uci = get_uci( trans, id )
- # Get list of instances associated with given uci as they are stored in local database
- dbInstances = get_instances( trans, uci ) # TODO: handle list (currently only 1 instance can correspond to 1 UCI)
- oldState = dbInstances.state
- # Establish connection with cloud
- conn = get_connection( trans )
- # Get actual cloud instance object
- cloudInstance = get_cloud_instance( conn, dbInstances.instance_id )
- # Update instance status
- cloudInstance.update()
- dbInstances.state = cloudInstance.state
- log.debug( "Updating instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) )
- # Update state of UCI (TODO: once more than 1 instance is assoc. w/ 1 UCI, this will be need to be updated differently)
- uci.state = dbInstances.state
- # Persist
- session = trans.sa_session
- session.save_or_update( dbInstances )
- session.save_or_update( uci )
- session.flush()
-
- # If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS)
- if oldState==instance_states.PENDING and dbInstances.state==instance_states.RUNNING:
- update_instance( trans, dbInstances, cloudInstance, conn, uci )
-
-
-def update_instance( trans, dbInstance, cloudInstance, conn, uci ):
- """
- Update instance: connect EBS volume, mount file system, start Galaxy, and update local DB w/ DNS info
-
- Keyword arguments:
- trans -- current transaction
- dbInstance -- object of 'instance' as it is stored in local database
- cloudInstance -- object of 'instance' as it resides in the cloud. Functions supported by the cloud API can be
- instantiated directly on this object.
- conn -- cloud connection object
- uci -- UCI object
- """
- dbInstance.public_dns = cloudInstance.dns_name
- dbInstance.private_dns = cloudInstance.private_dns_name
-
- # Attach storage volume(s) to instance
- stores = get_stores( trans, uci )
- for i, store in enumerate( stores ):
- log.debug( "Attaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstance.instance_id ) )
- mntDevice = '/dev/sdb'+str(i)
- volStat = conn.attach_volume( store.volume_id, dbInstance.instance_id, mntDevice )
- store.attach_time = datetime.utcnow()
- store.device = mntDevice
- store.i_id = dbInstance.instance_id
- store.status = volStat
- log.debug ( '***** volume status: %s' % volStat )
-
- # Wait until instances have attached and add file system
-
-
-
- # TODO: mount storage through ZFS
- # TODO: start Galaxy
-
- # Persist
- session = trans.sa_session
- session.save_or_update( dbInstance )
- session.flush()
-
-def attach_ordered_steps( workflow, steps ):
- ordered_steps = order_workflow_steps( steps )
- if ordered_steps:
- workflow.has_cycles = False
- for i, step in enumerate( ordered_steps ):
- step.order_index = i
- workflow.steps.append( step )
- else:
- workflow.has_cycles = True
- workflow.steps = steps
-
-def edgelist_for_workflow_steps( steps ):
- """
- Create a list of tuples representing edges between `WorkflowSteps` based
- on associated `WorkflowStepConnection`s
- """
- edges = []
- steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) )
- for step in steps:
- edges.append( ( steps_to_index[step], steps_to_index[step] ) )
- for conn in step.input_connections:
- edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) )
- return edges
-
-def order_workflow_steps( steps ):
- """
- Perform topological sort of the steps, return ordered or None
- """
- try:
- edges = edgelist_for_workflow_steps( steps )
- node_order = topsort( edges )
- return [ steps[i] for i in node_order ]
- except CycleError:
- return None
-
-def order_workflow_steps_with_levels( steps ):
- try:
- return topsort_levels( edgelist_for_workflow_steps( steps ) )
- except CycleError:
- return None
-
-class FakeJob( object ):
- """
- Fake job object for datasets that have no creating_job_associations,
- they will be treated as "input" datasets.
- """
- def __init__( self, dataset ):
- self.is_fake = True
- self.id = "fake_%s" % dataset.id
-
-def get_job_dict( trans ):
- """
- Return a dictionary of Job -> [ Dataset ] mappings, for all finished
- active Datasets in the current history and the jobs that created them.
- """
- history = trans.get_history()
- # Get the jobs that created the datasets
- warnings = set()
- jobs = odict()
- for dataset in history.active_datasets:
- # FIXME: Create "Dataset.is_finished"
- if dataset.state in ( 'new', 'running', 'queued' ):
- warnings.add( "Some datasets still queued or running were ignored" )
- continue
-
- #if this hda was copied from another, we need to find the job that created the origial hda
- job_hda = dataset
- while job_hda.copied_from_history_dataset_association:
- job_hda = job_hda.copied_from_history_dataset_association
-
- if not job_hda.creating_job_associations:
- jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ]
-
- for assoc in job_hda.creating_job_associations:
- job = assoc.job
- if job in jobs:
- jobs[ job ].append( ( assoc.name, dataset ) )
- else:
- jobs[ job ] = [ ( assoc.name, dataset ) ]
- return jobs, warnings
-
-def cleanup_param_values( inputs, values ):
- """
- Remove 'Data' values from `param_values`, along with metadata cruft,
- but track the associations.
- """
- associations = []
- names_to_clean = []
- # dbkey is pushed in by the framework
- if 'dbkey' in values:
- del values['dbkey']
- root_values = values
- # Recursively clean data inputs and dynamic selects
- def cleanup( prefix, inputs, values ):
- for key, input in inputs.items():
- if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ):
- if input.is_dynamic:
- values[key] = UnvalidatedValue( values[key] )
- if isinstance( input, DataToolParameter ):
- tmp = values[key]
- values[key] = None
- # HACK: Nested associations are not yet working, but we
- # still need to clean them up so we can serialize
- # if not( prefix ):
- if tmp: #this is false for a non-set optional dataset
- associations.append( ( tmp.hid, prefix + key ) )
- # Cleanup the other deprecated crap associated with datasets
- # as well. Worse, for nested datasets all the metadata is
- # being pushed into the root. FIXME: MUST REMOVE SOON
- key = prefix + key + "_"
- for k in root_values.keys():
- if k.startswith( key ):
- del root_values[k]
- elif isinstance( input, Repeat ):
- group_values = values[key]
- for i, rep_values in enumerate( group_values ):
- rep_index = rep_values['__index__']
- prefix = "%s_%d|" % ( key, rep_index )
- cleanup( prefix, input.inputs, group_values[i] )
- elif isinstance( input, Conditional ):
- group_values = values[input.name]
- current_case = group_values['__current_case__']
- prefix = "%s|" % ( key )
- cleanup( prefix, input.cases[current_case].inputs, group_values )
- cleanup( "", inputs, values )
- return associations
-
-
-
-
-
details: http://www.bx.psu.edu/hg/galaxy/rev/802b761c5032
changeset: 3074:802b761c5032
user: Enis Afgan <afgane(a)gmail.com>
date: Tue Oct 27 15:32:44 2009 -0400
description:
UI enhancements regarding provider registration and error reporting.
diffstat:
lib/galaxy/cloud/providers/ec2.py | 37 +-
lib/galaxy/cloud/providers/eucalyptus.py | 15 +-
lib/galaxy/web/controllers/cloud.py | 553 ++++++------------------------------
templates/cloud/add_credentials.mako | 4 +-
templates/cloud/add_provider.mako | 52 ++-
templates/cloud/configure_cloud.mako | 43 +-
6 files changed, 200 insertions(+), 504 deletions(-)
diffs (922 lines):
diff -r 6632a5d39f41 -r 802b761c5032 lib/galaxy/cloud/providers/ec2.py
--- a/lib/galaxy/cloud/providers/ec2.py Mon Oct 26 17:04:47 2009 -0400
+++ b/lib/galaxy/cloud/providers/ec2.py Tue Oct 27 15:32:44 2009 -0400
@@ -95,7 +95,13 @@
Establishes EC2 cloud connection using user's credentials associated with given UCI
"""
log.debug( '##### Establishing EC2 cloud connection' )
- conn = EC2Connection( uci_wrapper.get_access_key(), uci_wrapper.get_secret_key() )
+ provider = uci_wrapper.get_provider()
+ region = RegionInfo( None, provider.region_name, provider.region_endpoint )
+ conn = EC2Connection( aws_access_key_id=uci_wrapper.get_access_key(),
+ aws_secret_access_key=uci_wrapper.get_secret_key(),
+ is_secure=provider.is_secure,
+ region=region,
+ path=provider.path )
return conn
def set_keypair( self, uci_wrapper, conn ):
@@ -458,16 +464,24 @@
a_key = uci.credentials.access_key
s_key = uci.credentials.secret_key
# Get connection
- conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key )
+ region = RegionInfo( None, uci.credentials.provider.region_name, uci.credentials.provider.region_endpoint )
+ conn = EC2Connection( aws_access_key_id=a_key,
+ aws_secret_access_key=s_key,
+ is_secure=uci.credentials.provider.is_secure,
+ region=region,
+ path=uci.credentials.provider.path )
# Get reservations handle for given instance
rl= conn.get_all_instances( [inst.instance_id] )
# Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query
- # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. As a result, below code simply
- # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference...
+ # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. Until alternative solution
+ # is found, below code sets state of given UCI to 'error' to indicate to the user something out of ordinary happened.
if len( rl ) == 0:
log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id )
+ inst.error = "Instance ID was not found by the cloud provider. Instance might have crashed or otherwise been terminated. State set to 'terminated'."
+ uci.error = "Instance ID '"+inst.instance_id+"' was not found by the cloud provider. Instance might have crashed or otherwise been terminated."+ \
+ "Manual check is recommended."
inst.state = instance_states.TERMINATED
- uci.state = uci_states.AVAILABLE
+ uci.state = uci_states.ERROR
uci.launch_time = None
inst.flush()
uci.flush()
@@ -479,11 +493,13 @@
if s != inst.state:
inst.state = s
inst.flush()
- if s == instance_states.TERMINATED: # After instance has shut down, ensure UCI is marked as 'available'
+ # After instance has shut down, ensure UCI is marked as 'available'
+ if s == instance_states.TERMINATED and uci.state != uci_states.ERROR:
uci.state = uci_states.AVAILABLE
+ uci.launch_time = None
uci.flush()
+ # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
if s != uci.state and s != instance_states.TERMINATED:
- # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
uci.state = s
uci.flush()
if cInst.public_dns_name != inst.public_dns:
@@ -501,7 +517,12 @@
a_key = uci.credentials.access_key
s_key = uci.credentials.secret_key
# Get connection
- conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key )
+ region = RegionInfo( None, uci.credentials.provider.region_name, uci.credentials.provider.region_endpoint )
+ conn = EC2Connection( aws_access_key_id=a_key,
+ aws_secret_access_key=s_key,
+ is_secure=uci.credentials.provider.is_secure,
+ region=region,
+ path=uci.credentials.provider.path )
# Get reservations handle for given store
vl = conn.get_all_volumes( [store.volume_id] )
# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) )
diff -r 6632a5d39f41 -r 802b761c5032 lib/galaxy/cloud/providers/eucalyptus.py
--- a/lib/galaxy/cloud/providers/eucalyptus.py Mon Oct 26 17:04:47 2009 -0400
+++ b/lib/galaxy/cloud/providers/eucalyptus.py Tue Oct 27 15:32:44 2009 -0400
@@ -443,12 +443,15 @@
# Get reservations handle for given instance
rl= conn.get_all_instances( [inst.instance_id] )
# Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query
- # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. As a result, below code simply
- # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference...
+ # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. Until alternative solution
+ # is found, below code sets state of given UCI to 'error' to indicate to the user something out of ordinary happened.
if len( rl ) == 0:
log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id )
+ inst.error = "Instance ID was not found by the cloud provider. Instance might have crashed or otherwise been terminated. State set to 'terminated'."
+ uci.error = "Instance ID '"+inst.instance_id+"' was not found by the cloud provider. Instance might have crashed or otherwise been terminated."+ \
+ "Manual check is recommended."
inst.state = instance_states.TERMINATED
- uci.state = uci_states.AVAILABLE
+ uci.state = uci_states.ERROR
uci.launch_time = None
inst.flush()
uci.flush()
@@ -460,11 +463,13 @@
if s != inst.state:
inst.state = s
inst.flush()
- if s == instance_states.TERMINATED: # After instance has shut down, ensure UCI is marked as 'available'
+ # After instance has shut down, ensure UCI is marked as 'available'
+ if s == instance_states.TERMINATED and uci.state != uci_states.ERROR:
uci.state = uci_states.AVAILABLE
+ uci.launch_time = None
uci.flush()
+ # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
if s != uci.state and s != instance_states.TERMINATED:
- # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
uci.state = s
uci.flush()
if cInst.public_dns_name != inst.public_dns:
diff -r 6632a5d39f41 -r 802b761c5032 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Mon Oct 26 17:04:47 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Tue Oct 27 15:32:44 2009 -0400
@@ -273,11 +273,10 @@
inst_error = vol_error = cred_error = None
error = {}
user = trans.get_user()
- # TODO: Hack until present user w/ bullet list w/ registered credentials
storedCreds = trans.sa_session.query( model.CloudUserCredentials ).filter_by( user=user ).all()
if len( storedCreds ) == 0:
return trans.show_error_message( "You must register credentials before configuring a Galaxy instance." )
-
+ # Create dict mapping of cloud providers to zones available by those providers
providersToZones = {}
for storedCred in storedCreds:
if storedCred.provider.type == 'ec2':
@@ -413,6 +412,21 @@
return trans.show_form(
web.FormBuilder( url_for( id=trans.security.encode_id(instance.id) ), "Rename instance", submit_text="Rename" )
.add_text( "new_name", "Instance name", value=instance.name ) )
+
+ @web.expose
+ @web.require_login( "use Galaxy cloud" )
+ def set_uci_state( self, trans, id, state='available', clear_error=True ):
+ """
+ Sets state of UCI to given state, optionally resets error field, and resets UCI's launch time field to 'None'.
+ """
+ uci = get_uci( trans, id )
+ uci.state = state
+ if clear_error:
+ uci.error = None
+ uci.launch_time = None
+ trans.sa_session.flush()
+ trans.set_message( "Instance '%s' state reset." % uci.name )
+ return self.list( trans )
@web.expose
@web.require_login( "add credentials" )
@@ -527,42 +541,94 @@
proxy_user='', proxy_pass='', debug='', https_connection_factory='', path='' ):
user = trans.get_user()
error = {}
+ try:
+ is_secure = int(is_secure)
+ except ValueError:
+ pass
# Check if Amazon EC2 has already been registered by this user
ec2_registered = trans.sa_session.query( model.CloudProvider ).filter_by( user=user, type='ec2' ).first()
if region_name or region_endpoint or name or is_secure or port or proxy or debug or path:
+ log.debug (" in if ")
if trans.app.model.CloudProvider \
.filter_by (user=user, name=name) \
.first():
+ log.debug (" in if 2 ")
error['name_error'] = "A provider with that name already exist."
elif name=='' or len( name ) > 255:
+ log.debug (" in if 3")
error['name_error'] = "Provider name must be between 1 and 255 characters long."
elif type=='':
+ log.debug (" in if 4")
error['type_error'] = "Provider type must be selected."
elif ec2_registered:
+ log.debug (" in if 5")
error['type_error'] = "Amazon EC2 has already been registered as a provider."
- elif is_secure!=0 or is_secure!=1:
- error['is_secure_error'] = "Field 'is secure' can only take on a value '0' or '1'."
+ elif not (is_secure == 0 or is_secure == 1):
+ log.debug (" in if 6")
+ error['is_secure_error'] = "Field 'is secure' can only take on a value '0' or '1'"
else:
+ log.debug (" in else ")
provider = model.CloudProvider()
provider.user = user
provider.type = type
provider.name = name
- provider.region_name = region_name
- provider.region_endpoint = region_endpoint
+ if region_name:
+ provider.region_name = region_name
+ else:
+ provider.region_name = None
+
+ if region_endpoint:
+ provider.region_endpoint = region_endpoint
+ else:
+ provider.region_endpoint = None
+
if is_secure=='0':
provider.is_secure = False
else:
provider.is_secure = True
- provider.host = host
- provider.port = port
- provider.proxy = proxy
- provider.proxy_port = proxy_port
- provider.proxy_user = proxy_user
- provider.proxy_pass = proxy_pass
- provider.debug = debug
- provider.https_connection_factory = https_connection_factory
+
+ if host:
+ provider.host = host
+ else:
+ provider.host = None
+
+ if port:
+ provider.port = port
+ else:
+ provider.port = None
+
+ if proxy:
+ provider.proxy = proxy
+ else:
+ provider.proxy = None
+
+ if proxy_port:
+ provider.proxy_port = proxy_port
+ else:
+ provider.proxy_port = None
+
+ if proxy_user:
+ provider.proxy_user = proxy_user
+ else:
+ provider.proxy_user = None
+
+ if proxy_pass:
+ provider.proxy_pass = proxy_pass
+ else:
+ provider.proxy_pass = None
+
+ if debug:
+ provider.debug = debug
+ else:
+ provider.debug = None
+
+ if https_connection_factory:
+ provider.https_connection_factory = https_connection_factory
+ else:
+ provider.https_connection_factory = None
+
provider.path = path
# Persist
session = trans.sa_session
@@ -601,456 +667,21 @@
.filter_by( user=user, type='ec2' ).first()
if not exists:
- provider = model.CloudProvider()
- provider.user = user
- provider.type = 'ec2'
- provider.name = 'EC2'
- # Persist
- session = trans.sa_session
- session.save_or_update( provider )
- session.flush()
+ self.add_provider( trans, name='Amazon EC2', type='ec2', region_name='us-east-1', region_endpoint='us-east-1.ec2.amazonaws.com', is_secure=1, path='/' )
+ return self.add( trans )
+# providers = trans.sa_session.query( model.CloudProvider ).filter_by( user=user ).all()
+# return trans.fill_template( "cloud/add_credentials.mako",
+# credName = '',
+# providerName = '',
+# accessKey = '',
+# secretKey = '',
+# error = {},
+# providers = providers
+# )
+ trans.show_error_message( "EC2 is already registered as a cloud provider under name '%s'." % exists.name )
return self.list( trans )
- @web.expose
- @web.require_login( "edit workflows" )
- def editor( self, trans, id=None ):
- """
- Render the main workflow editor interface. The canvas is embedded as
- an iframe (neccesary for scrolling to work properly), which is
- rendered by `editor_canvas`.
- """
- if not id:
- error( "Invalid workflow id" )
- id = trans.security.decode_id( id )
- return trans.fill_template( "workflow/editor.mako", workflow_id=id )
-
- @web.json
- def editor_form_post( self, trans, type='tool', tool_id=None, **incoming ):
- """
- Accepts a tool state and incoming values, and generates a new tool
- form and some additional information, packed into a json dictionary.
- This is used for the form shown in the right pane when a node
- is selected.
- """
- trans.workflow_building_mode = True
- module = module_factory.from_dict( trans, {
- 'type': type,
- 'tool_id': tool_id,
- 'tool_state': incoming.pop("tool_state")
- } )
- module.update_state( incoming )
- return {
- 'tool_state': module.get_state(),
- 'data_inputs': module.get_data_inputs(),
- 'data_outputs': module.get_data_outputs(),
- 'tool_errors': module.get_errors(),
- 'form_html': module.get_config_form()
- }
-
- @web.json
- def get_new_module_info( self, trans, type, **kwargs ):
- """
- Get the info for a new instance of a module initialized with default
- paramters (any keyword arguments will be passed along to the module).
- Result includes data inputs and outputs, html representation
- of the initial form, and the initial tool state (with default values).
- This is called asynchronously whenever a new node is added.
- """
- trans.workflow_building_mode = True
- module = module_factory.new( trans, type, **kwargs )
- return {
- 'type': module.type,
- 'name': module.get_name(),
- 'tool_id': module.get_tool_id(),
- 'tool_state': module.get_state(),
- 'data_inputs': module.get_data_inputs(),
- 'data_outputs': module.get_data_outputs(),
- 'form_html': module.get_config_form()
- }
-
- @web.json
- def load_workflow( self, trans, id ):
- """
- Get the latest Workflow for the StoredWorkflow identified by `id` and
- encode it as a json string that can be read by the workflow editor
- web interface.
- """
- user = trans.get_user()
- id = trans.security.decode_id( id )
- trans.workflow_building_mode = True
- # Load encoded workflow from database
- stored = trans.sa_session.query( model.StoredWorkflow ).get( id )
- assert stored.user == user
- workflow = stored.latest_workflow
- # Pack workflow data into a dictionary and return
- data = {}
- data['name'] = workflow.name
- data['steps'] = {}
- data['upgrade_messages'] = {}
- # For each step, rebuild the form and encode the state
- for step in workflow.steps:
- # Load from database representation
- module = module_factory.from_workflow_step( trans, step )
- # Fix any missing parameters
- upgrade_message = module.check_and_update_state()
- if upgrade_message:
- data['upgrade_messages'][step.order_index] = upgrade_message
- # Pack atrributes into plain dictionary
- step_dict = {
- 'id': step.order_index,
- 'type': module.type,
- 'tool_id': module.get_tool_id(),
- 'name': module.get_name(),
- 'tool_state': module.get_state(),
- 'tool_errors': module.get_errors(),
- 'data_inputs': module.get_data_inputs(),
- 'data_outputs': module.get_data_outputs(),
- 'form_html': module.get_config_form(),
- }
- # Connections
- input_conn_dict = {}
- for conn in step.input_connections:
- input_conn_dict[ conn.input_name ] = \
- dict( id=conn.output_step.order_index, output_name=conn.output_name )
- step_dict['input_connections'] = input_conn_dict
- # Position
- step_dict['position'] = step.position
- # Add to return value
- data['steps'][step.order_index] = step_dict
- print data['upgrade_messages']
- return data
-
- @web.json
- def save_workflow( self, trans, id, workflow_data ):
- """
- Save the workflow described by `workflow_data` with id `id`.
- """
- # Get the stored workflow
- stored = get_stored_workflow( trans, id )
- # Put parameters in workflow mode
- trans.workflow_building_mode = True
- # Convert incoming workflow data from json
- data = simplejson.loads( workflow_data )
- # Create new workflow from incoming data
- workflow = model.Workflow()
- # Just keep the last name (user can rename later)
- workflow.name = stored.name
- # Assume no errors until we find a step that has some
- workflow.has_errors = False
- # Create each step
- steps = []
- # The editor will provide ids for each step that we don't need to save,
- # but do need to use to make connections
- steps_by_external_id = {}
- # First pass to build step objects and populate basic values
- for key, step_dict in data['steps'].iteritems():
- # Create the model class for the step
- step = model.WorkflowStep()
- steps.append( step )
- steps_by_external_id[ step_dict['id' ] ] = step
- # FIXME: Position should be handled inside module
- step.position = step_dict['position']
- module = module_factory.from_dict( trans, step_dict )
- module.save_to_step( step )
- if step.tool_errors:
- workflow.has_errors = True
- # Stick this in the step temporarily
- step.temp_input_connections = step_dict['input_connections']
- # Second pass to deal with connections between steps
- for step in steps:
- # Input connections
- for input_name, conn_dict in step.temp_input_connections.iteritems():
- if conn_dict:
- conn = model.WorkflowStepConnection()
- conn.input_step = step
- conn.input_name = input_name
- conn.output_name = conn_dict['output_name']
- conn.output_step = steps_by_external_id[ conn_dict['id'] ]
- del step.temp_input_connections
- # Order the steps if possible
- attach_ordered_steps( workflow, steps )
- # Connect up
- workflow.stored_workflow = stored
- stored.latest_workflow = workflow
- # Persist
- trans.sa_session.flush()
- # Return something informative
- errors = []
- if workflow.has_errors:
- errors.append( "Some steps in this workflow have validation errors" )
- if workflow.has_cycles:
- errors.append( "This workflow contains cycles" )
- if errors:
- rval = dict( message="Workflow saved, but will not be runnable due to the following errors",
- errors=errors )
- else:
- rval = dict( message="Workflow saved" )
- rval['name'] = workflow.name
- return rval
-
- @web.json
- def get_datatypes( self, trans ):
- ext_to_class_name = dict()
- classes = []
- for k, v in trans.app.datatypes_registry.datatypes_by_extension.iteritems():
- c = v.__class__
- ext_to_class_name[k] = c.__module__ + "." + c.__name__
- classes.append( c )
- class_to_classes = dict()
- def visit_bases( types, cls ):
- for base in cls.__bases__:
- if issubclass( base, Data ):
- types.add( base.__module__ + "." + base.__name__ )
- visit_bases( types, base )
- for c in classes:
- n = c.__module__ + "." + c.__name__
- types = set( [ n ] )
- visit_bases( types, c )
- class_to_classes[ n ] = dict( ( t, True ) for t in types )
- return dict( ext_to_class_name=ext_to_class_name, class_to_classes=class_to_classes )
-
- @web.expose
- def build_from_current_history( self, trans, job_ids=None, dataset_ids=None, workflow_name=None ):
- user = trans.get_user()
- history = trans.get_history()
- if not user:
- return trans.show_error_message( "Must be logged in to create workflows" )
- if ( job_ids is None and dataset_ids is None ) or workflow_name is None:
- jobs, warnings = get_job_dict( trans )
- # Render
- return trans.fill_template(
- "workflow/build_from_current_history.mako",
- jobs=jobs,
- warnings=warnings,
- history=history )
- else:
- # Ensure job_ids and dataset_ids are lists (possibly empty)
- if job_ids is None:
- job_ids = []
- elif type( job_ids ) is not list:
- job_ids = [ job_ids ]
- if dataset_ids is None:
- dataset_ids = []
- elif type( dataset_ids ) is not list:
- dataset_ids = [ dataset_ids ]
- # Convert both sets of ids to integers
- job_ids = [ int( id ) for id in job_ids ]
- dataset_ids = [ int( id ) for id in dataset_ids ]
- # Find each job, for security we (implicately) check that they are
- # associated witha job in the current history.
- jobs, warnings = get_job_dict( trans )
- jobs_by_id = dict( ( job.id, job ) for job in jobs.keys() )
- steps = []
- steps_by_job_id = {}
- hid_to_output_pair = {}
- # Input dataset steps
- for hid in dataset_ids:
- step = model.WorkflowStep()
- step.type = 'data_input'
- hid_to_output_pair[ hid ] = ( step, 'output' )
- steps.append( step )
- # Tool steps
- for job_id in job_ids:
- assert job_id in jobs_by_id, "Attempt to create workflow with job not connected to current history"
- job = jobs_by_id[ job_id ]
- tool = trans.app.toolbox.tools_by_id[ job.tool_id ]
- param_values = job.get_param_values( trans.app )
- associations = cleanup_param_values( tool.inputs, param_values )
- step = model.WorkflowStep()
- step.type = 'tool'
- step.tool_id = job.tool_id
- step.tool_inputs = tool.params_to_strings( param_values, trans.app )
- # NOTE: We shouldn't need to do two passes here since only
- # an earlier job can be used as an input to a later
- # job.
- for other_hid, input_name in associations:
- if other_hid in hid_to_output_pair:
- other_step, other_name = hid_to_output_pair[ other_hid ]
- conn = model.WorkflowStepConnection()
- conn.input_step = step
- conn.input_name = input_name
- # Should always be connected to an earlier step
- conn.output_step = other_step
- conn.output_name = other_name
- steps.append( step )
- steps_by_job_id[ job_id ] = step
- # Store created dataset hids
- for assoc in job.output_datasets:
- hid_to_output_pair[ assoc.dataset.hid ] = ( step, assoc.name )
- # Workflow to populate
- workflow = model.Workflow()
- workflow.name = workflow_name
- # Order the steps if possible
- attach_ordered_steps( workflow, steps )
- # And let's try to set up some reasonable locations on the canvas
- # (these are pretty arbitrary values)
- levorder = order_workflow_steps_with_levels( steps )
- base_pos = 10
- for i, steps_at_level in enumerate( levorder ):
- for j, index in enumerate( steps_at_level ):
- step = steps[ index ]
- step.position = dict( top = ( base_pos + 120 * j ),
- left = ( base_pos + 220 * i ) )
- # Store it
- stored = model.StoredWorkflow()
- stored.user = user
- stored.name = workflow_name
- workflow.stored_workflow = stored
- stored.latest_workflow = workflow
- trans.sa_session.save_or_update( stored )
- trans.sa_session.flush()
- # Index page with message
- return trans.show_message( "Workflow '%s' created from current history." % workflow_name )
- ## return trans.show_ok_message( "<p>Workflow '%s' created.</p><p><a target='_top' href='%s'>Click to load in workflow editor</a></p>"
- ## % ( workflow_name, web.url_for( action='editor', id=trans.security.encode_id(stored.id) ) ) )
-
- @web.expose
- def run( self, trans, id, check_user=True, **kwargs ):
- stored = get_stored_workflow( trans, id, check_ownership=False )
- if check_user:
- user = trans.get_user()
- if stored.user != user:
- if trans.sa_session.query( model.StoredWorkflowUserShareAssociation ) \
- .filter_by( user=user, stored_workflow=stored ).count() == 0:
- error( "Workflow is not owned by or shared with current user" )
- # Get the latest revision
- workflow = stored.latest_workflow
- # It is possible for a workflow to have 0 steps
- if len( workflow.steps ) == 0:
- error( "Workflow cannot be run because it does not have any steps" )
- #workflow = Workflow.from_simple( simplejson.loads( stored.encoded_value ), trans.app )
- if workflow.has_cycles:
- error( "Workflow cannot be run because it contains cycles" )
- if workflow.has_errors:
- error( "Workflow cannot be run because of validation errors in some steps" )
- # Build the state for each step
- errors = {}
- if kwargs:
- # If kwargs were provided, the states for each step should have
- # been POSTed
- for step in workflow.steps:
- # Connections by input name
- step.input_connections_by_name = \
- dict( ( conn.input_name, conn ) for conn in step.input_connections )
- # Extract just the arguments for this step by prefix
- p = "%s|" % step.id
- l = len(p)
- step_args = dict( ( k[l:], v ) for ( k, v ) in kwargs.iteritems() if k.startswith( p ) )
- step_errors = None
- if step.type == 'tool' or step.type is None:
- module = module_factory.from_workflow_step( trans, step )
- # Any connected input needs to have value DummyDataset (these
- # are not persisted so we need to do it every time)
- module.add_dummy_datasets( connections=step.input_connections )
- # Get the tool
- tool = module.tool
- # Get the state
- step.state = state = module.state
- # Get old errors
- old_errors = state.inputs.pop( "__errors__", {} )
- # Update the state
- step_errors = tool.update_state( trans, tool.inputs, step.state.inputs, step_args,
- update_only=True, old_errors=old_errors )
- else:
- module = step.module = module_factory.from_workflow_step( trans, step )
- state = step.state = module.decode_runtime_state( trans, step_args.pop( "tool_state" ) )
- step_errors = module.update_runtime_state( trans, state, step_args )
- if step_errors:
- errors[step.id] = state.inputs["__errors__"] = step_errors
- if 'run_workflow' in kwargs and not errors:
- # Run each step, connecting outputs to inputs
- outputs = odict()
- for i, step in enumerate( workflow.steps ):
- if step.type == 'tool' or step.type is None:
- tool = trans.app.toolbox.tools_by_id[ step.tool_id ]
- input_values = step.state.inputs
- # Connect up
- def callback( input, value, prefixed_name, prefixed_label ):
- if isinstance( input, DataToolParameter ):
- if prefixed_name in step.input_connections_by_name:
- conn = step.input_connections_by_name[ prefixed_name ]
- return outputs[ conn.output_step.id ][ conn.output_name ]
- visit_input_values( tool.inputs, step.state.inputs, callback )
- # Execute it
- outputs[ step.id ] = tool.execute( trans, step.state.inputs )
- else:
- outputs[ step.id ] = step.module.execute( trans, step.state )
-
- return trans.fill_template( "workflow/run_complete.mako",
- workflow=stored,
- outputs=outputs )
- else:
- for step in workflow.steps:
- if step.type == 'tool' or step.type is None:
- # Restore the tool state for the step
- module = module_factory.from_workflow_step( trans, step )
- # Any connected input needs to have value DummyDataset (these
- # are not persisted so we need to do it every time)
- module.add_dummy_datasets( connections=step.input_connections )
- # Store state with the step
- step.module = module
- step.state = module.state
- # Error dict
- if step.tool_errors:
- errors[step.id] = step.tool_errors
- else:
- ## Non-tool specific stuff?
- step.module = module_factory.from_workflow_step( trans, step )
- step.state = step.module.get_runtime_state()
- # Connections by input name
- step.input_connections_by_name = dict( ( conn.input_name, conn ) for conn in step.input_connections )
- # Render the form
- return trans.fill_template(
- "workflow/run.mako",
- steps=workflow.steps,
- workflow=stored,
- errors=errors,
- incoming=kwargs )
-
- @web.expose
- def configure_menu( self, trans, workflow_ids=None ):
- user = trans.get_user()
- if trans.request.method == "POST":
- if workflow_ids is None:
- workflow_ids = []
- elif type( workflow_ids ) != list:
- workflow_ids = [ workflow_ids ]
- sess = trans.sa_session
- # This explicit remove seems like a hack, need to figure out
- # how to make the association do it automatically.
- for m in user.stored_workflow_menu_entries:
- sess.delete( m )
- user.stored_workflow_menu_entries = []
- q = sess.query( model.StoredWorkflow )
- # To ensure id list is unique
- seen_workflow_ids = set()
- for id in workflow_ids:
- if id in seen_workflow_ids:
- continue
- else:
- seen_workflow_ids.add( id )
- m = model.StoredWorkflowMenuEntry()
- m.stored_workflow = q.get( id )
- user.stored_workflow_menu_entries.append( m )
- sess.flush()
- return trans.show_message( "Menu updated", refresh_frames=['tools'] )
- else:
- user = trans.get_user()
- ids_in_menu = set( [ x.stored_workflow_id for x in user.stored_workflow_menu_entries ] )
- workflows = trans.sa_session.query( model.StoredWorkflow ) \
- .filter_by( user=user, deleted=False ) \
- .order_by( desc( model.StoredWorkflow.c.update_time ) ) \
- .all()
- shared_by_others = trans.sa_session \
- .query( model.StoredWorkflowUserShareAssociation ) \
- .filter_by( user=user ) \
- .filter( model.StoredWorkflow.c.deleted == False ) \
- .all()
- return trans.fill_template( "workflow/configure_menu.mako",
- workflows=workflows,
- shared_by_others=shared_by_others,
- ids_in_menu=ids_in_menu )
@web.json
def json_update( self, trans ):
user = trans.get_user()
diff -r 6632a5d39f41 -r 802b761c5032 templates/cloud/add_credentials.mako
--- a/templates/cloud/add_credentials.mako Mon Oct 26 17:04:47 2009 -0400
+++ b/templates/cloud/add_credentials.mako Tue Oct 27 15:32:44 2009 -0400
@@ -103,9 +103,9 @@
</div>
%else:
- In order to add credentials, desired cloud provider needs to be registered.<p/>
+ In order to add credentials, desired cloud provider needs to be registered first.<p/>
Register <a href="${h.url_for( action='add_ec2' )}">
- <span>Amazon EC2 automatically</span></a>
+ <span>Amazon EC2 (us-east-1 region) automatically</span></a>
or add
<a href="${h.url_for( action='add_provider' )}">
<span>custom cloud provider</span></a>.
diff -r 6632a5d39f41 -r 802b761c5032 templates/cloud/add_provider.mako
--- a/templates/cloud/add_provider.mako Mon Oct 26 17:04:47 2009 -0400
+++ b/templates/cloud/add_provider.mako Tue Oct 27 15:32:44 2009 -0400
@@ -11,7 +11,19 @@
$("#type").change(function() {
if ($(this).val() == 'ec2') {
- $("#name").val("EC2");
+ clear();
+ $("#autofill").attr( 'disabled', true );
+ $("#autofill").attr( 'checked', false );
+ $("#name").val( "EC2" );
+ $("#region_name").val( "us-east-1" );
+ $("#region_endpoint").val( "us-east-1.ec2.amazonaws.com" );
+ $("#is_secure").val("1");
+ $("#debug").val("");
+ $("#path").val("/");
+ }
+ else if ($(this).val() == 'eucalyptus') {
+ clear();
+ $("#autofill").attr( 'disabled', false );
}
});
})
@@ -27,12 +39,23 @@
$("#path").val("/services/Eucalyptus");
}
else {
- $("#region_name").val("");
- $("#region_endpoint").val("");
- $("#is_secure").val("");
- $("#port").val("");
- $("#path").val("");
+ clear();
}
+}
+
+function clear() {
+ $("#name").val("");
+ $("#region_name").val("");
+ $("#region_endpoint").val("");
+ $("#is_secure").val("");
+ $("#port").val("");
+ $("#proxy").val("");
+ $("#proxy_port").val("");
+ $("#proxy_user").val("");
+ $("#proxy_pass").val("");
+ $("#debug").val("");
+ $("#https_connection_factory").val("");
+ $("#path").val("");
}
@@ -60,8 +83,9 @@
<option value="eucalyptus">Eucalyptus</option>
<option value="ec2">Amazon EC2</option>
</select>
- <br/><input type="checkbox" id="autofill" onclick="javascript:af()">
- auto fill using Eucalyptus Public Cloud values
+ <br/>
+ <input type="checkbox" id="autofill" onclick="javascript:af()" disabled="true">
+ auto fill using Eucalyptus Public Cloud values
</div>
%if error.has_key('type_error'):
<div class="form-row-error-message">${error['type_error']}</div>
@@ -90,9 +114,12 @@
%>
<div class="${cls}">
<label>Region name:</label>
- <div class="form-row-input">
+ <div id="region_selection" class="form-row-input">
<input type="text" name="region_name" id="region_name" value="${region_name}" size="40">
</div>
+ %if error.has_key('name_error'):
+ <div class="form-row-error-message">${error['name_error']}</div>
+ %endif
<div style="clear: both"></div>
</div>
@@ -107,14 +134,19 @@
<div style="clear: both"></div>
</div>
- <%
+ <%
cls = "form-row"
+ if error.has_key('is_secure_error'):
+ cls += " form-row-error"
%>
<div class="${cls}">
<label>Is secure ('O' for False or '1' for True):</label>
<div class="form-row-input">
<input type="text" name="is_secure" id="is_secure" value="${is_secure}" size="40">
</div>
+ %if error.has_key('is_secure_error'):
+ <div class="form-row-error-message">${error['is_secure_error']}; you entered: '${is_secure}'</div>
+ %endif
<div style="clear: both"></div>
</div>
diff -r 6632a5d39f41 -r 802b761c5032 templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Mon Oct 26 17:04:47 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Tue Oct 27 15:32:44 2009 -0400
@@ -35,24 +35,29 @@
}
else if ( old_state=='shutting-down' && new_state=='available' ) {
location.reload(true);
+ }
+ else if ( old_state=='running' && new_state=='available' ) {
+ location.reload(true);
+ }
+ else if ( old_state=='running' && new_state=='error' ) {
+ location.reload(true);
+ }
+ else if ( old_state=='pending' && new_state=='error' ) {
+ location.reload(true);
+ }
+ else if ( old_state=='pending' && new_state=='available' ) {
+ location.reload(true);
}
else if ( new_state=='shutting-down' || new_state=='shutting-downUCI' ) {
$(elem + "-link").text( "" );
}
- xmlhttp = new XMLHttpRequest();
- xmlhttp.open( "HEAD", "http://127.0.0.1:8080/admin", false );
- xmlhttp.send( null );
- //alert(xmlhttp.getAllResponseHeaders())
- console.log( "xmlhttp.readyState: %s", xmlhttp.readyState );
- console.log( "xmlhttp.status: %s", xmlhttp.status );
- if ( new_state=='running' && xmlhttp.readyState==1 ) {
- console.log ("in ready statsus = 1");
- //if (xmlhttp.status==200) {
- // console.log( "in status = 200" );
- // location.reload(true);
- //}
- }
-
+ // In order to show 'Access Galaxy' button, the whole page needs to be refreshed. So, while Galaxy is starting,
+ // keep refreshing the page. Should be handled better...
+ else if ( $(elem+"-link").text().search('starting') && old_state=='running' && new_state=='running' ) {
+ //location.reload(true);
+ $(elem + "-link").text("Still starting...");
+ }
+
// Update 'state' and 'time alive' fields
$(elem + "-state").text( data[i].state );
if (data[i].launch_time) {
@@ -184,7 +189,7 @@
context.write( '<span>Access Galaxy</span>' )
context.write( '<img src="'+h.url_for('/static/images/silk/resultset_next.png')+'" /></a></div>' )
except urllib2.URLError:
- context.write( '<span>Galaxy starting...</span>' )
+ context.write( 'Galaxy starting...' )
%>
%endif
@@ -247,8 +252,10 @@
<a onclick="document.getElementById('short').style.display = 'block';
document.getElementById('full').style.display = 'none'; return 0;"
href="javascript:void(0)">
- ${str(prevInstance.error)}
- </a>
+ error:</a><br />
+ ${str(prevInstance.error)}
+ <p />
+ <a href="${h.url_for( action='set_uci_state', id=trans.security.encode_id(prevInstance.id), state='available' )}">reset state</a>
</div>
%else:
${str(prevInstance.state)}
@@ -278,7 +285,7 @@
You have no AWS credentials associated with your Galaxy account:
<a class="action-button" href="${h.url_for( action='add' )}">
<img src="${h.url_for('/static/images/silk/add.png')}" />
- <span>Add AWS credentials</span>
+ <span>add credentials</span>
</a>
or
<a href="http://aws.amazon.com/" target="_blank">