details: http://www.bx.psu.edu/hg/galaxy/rev/049083fee997 changeset: 3070:049083fee997 user: Enis Afgan <afgane@gmail.com> date: Mon Oct 19 17:45:49 2009 -0400 description: Verified functionality of EC2 provider code and it WORKS! Note that there is one hard dependency on user data file w/ EC2. diffstat: lib/galaxy/cloud/__init__.py | 92 ++++++++++++------ lib/galaxy/cloud/providers/ec2.py | 152 ++++++++++++++++++++---------- lib/galaxy/cloud/providers/eucalyptus.py | 172 +++++++++++++++++++++------------- lib/galaxy/web/controllers/cloud.py | 86 +++++++++++------ templates/cloud/configure_cloud.mako | 2 +- 5 files changed, 322 insertions(+), 182 deletions(-) diffs (939 lines): diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/cloud/__init__.py --- a/lib/galaxy/cloud/__init__.py Fri Oct 16 13:06:44 2009 -0400 +++ b/lib/galaxy/cloud/__init__.py Mon Oct 19 17:45:49 2009 -0400 @@ -18,11 +18,22 @@ log = logging.getLogger( __name__ ) -# States for running a job. These are NOT the same as data states -#messages = { -# JOB_WAIT -# -# } +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + CREATING = "creating" +) + JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted' class CloudManager( object ): @@ -32,10 +43,10 @@ def __init__( self, app ): self.app = app if self.app.config.get_bool( "enable_cloud_execution", True ): - # The dispatcher manager underlying cloud instances -# self.provider = CloudProvider( app ) + # The dispatcher manager for underlying cloud instances - implements and contacts individual cloud providers + self.provider = CloudProvider( app ) # Monitor for updating status of cloud instances - self.cloud_monitor = CloudMonitor( self.app ) + self.cloud_monitor = CloudMonitor( self.app, self.provider ) # self.job_stop_queue = JobStopQueue( app, self.dispatcher ) else: self.job_queue = self.job_stop_queue = NoopCloudMonitor() @@ -93,7 +104,7 @@ CloudProvider. """ STOP_SIGNAL = object() - def __init__( self, app ): + def __init__( self, app, provider ): """Start the cloud manager""" self.app = app # Keep track of the pid that started the cloud manager, only it @@ -186,20 +197,20 @@ # new_instances for r in session.query( model.UCI ) \ - .filter( or_( model.UCI.c.state=="newUCI", - model.UCI.c.state=="submittedUCI", - model.UCI.c.state=="shutting-downUCI", - model.UCI.c.state=="deletingUCI" ) ) \ + .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, #"newUCI", + model.UCI.c.state==uci_states.SUBMITTED_UCI, #"submittedUCI", + model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, #"shutting-downUCI", + model.UCI.c.state==uci_states.DELETING_UCI ) ) \ .all(): - uci = UCIwrapper( r ) - new_requests.append( uci ) + uci_wrapper = UCIwrapper( r ) + new_requests.append( uci_wrapper ) # log.debug( 'new_requests: %s' % new_requests ) - for uci in new_requests: + for uci_wrapper in new_requests: session.clear() # log.debug( 'r.name: %s, state: %s' % ( r.name, r.state ) ) # session.save_or_update( r ) # session.flush() - self.provider.put( uci ) + self.provider.put( uci_wrapper ) # Done with the session mapping.Session.remove() @@ -511,6 +522,14 @@ # --------- Getter methods ----------------- + def get_provider_name( self ): + """ Returns name of cloud provider associated with given UCI. """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + cred_id = uci.credentials_id + cred = model.CloudUserCredentials.get( cred_id ) + return cred.provider_name + def get_instances_indexes( self, state=None ): """ Returns indexes of instances associated with given UCI as they are stored in local Galaxy database and @@ -977,26 +996,35 @@ class CloudProvider( object ): def __init__( self, app ): + import providers.eucalyptus + import providers.ec2 + self.app = app self.cloud_provider = {} + self.cloud_provider["eucalyptus"] = providers.eucalyptus.EucalyptusCloudProvider( app ) + self.cloud_provider["ec2"] = providers.ec2.EC2CloudProvider( app ) + # start_cloud_provider = None # if app.config.start_job_runners is not None: # start_cloud_provider.extend( app.config.start_job_runners.split(",") ) # for provider_name in start_cloud_provider: - self.provider_name = app.config.cloud_provider - if self.provider_name == "eucalyptus": - import providers.eucalyptus - self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app ) - elif self.provider_name == "ec2": - import providers.ec2 - self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app ) - else: - log.error( "Unable to start unknown cloud provider: %s" %self.provider_name ) +# self.provider_name = app.config.cloud_provider +# if self.provider_name == "eucalyptus": +# import providers.eucalyptus +# self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app ) +# elif self.provider_name == "ec2": +# import providers.ec2 +# self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app ) +# else: +# log.error( "Unable to start unknown cloud provider: %s" %self.provider_name ) def put( self, uci_wrapper ): """ Put given request for UCI manipulation into provider's request queue.""" # log.debug( "Adding UCI '%s' manipulation request into cloud manager's queue." % uci_wrapper.name ) - self.cloud_provider[self.provider_name].put( uci_wrapper ) + self.cloud_provider[uci_wrapper.get_provider_name()].put( uci_wrapper ) + + + def createUCI( self, uci ): """ @@ -1034,12 +1062,12 @@ def update( self ): """ - Runs a global status update on all storage volumes and all instances whose UCI is - 'running' state. - Reason behind this method is to sync state of local DB and real world resources + Runs a global status update across all providers for all UCIs in state other than 'terminated' and 'available'. + Reason behind this method is to sync state of local DB and real world resources. """ -# log.debug( "Running global update" ) - self.cloud_provider[self.provider_name].update() + for provider in self.cloud_provider.keys(): +# log.debug( "Running global update for provider: '%s'" % provider ) + self.cloud_provider[provider].update() def recover( self, job, job_wrapper ): runner_name = ( job.job_runner_name.split(":", 1) )[0] diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/cloud/providers/ec2.py --- a/lib/galaxy/cloud/providers/ec2.py Fri Oct 16 13:06:44 2009 -0400 +++ b/lib/galaxy/cloud/providers/ec2.py Mon Oct 19 17:45:49 2009 -0400 @@ -5,23 +5,53 @@ from galaxy import model # Database interaction class from galaxy.model import mapping from galaxy.datatypes.data import nice_size +from galaxy.util.bunch import Bunch from Queue import Queue from sqlalchemy import or_ import galaxy.eggs galaxy.eggs.require("boto") from boto.ec2.connection import EC2Connection -from boto.ec2.regioninfo import RegionInfo +import boto.exception import logging log = logging.getLogger( __name__ ) +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + DELETED = "deleted" +) + +instance_states = Bunch( + TERMINATED = "terminated", + RUNNING = "running", + PENDING = "pending", + SHUTTING_DOWN = "shutting-down" +) + +store_states = Bunch( + IN_USE = "in-use", + CREATING = "creating" +) + class EC2CloudProvider( object ): """ Amazon EC2-based cloud provider implementation for managing instances. """ STOP_SIGNAL = object() def __init__( self, app ): + self.name = "ec2" self.zone = "us-east-1a" self.key_pair = "galaxy-keypair" self.queue = Queue() @@ -48,15 +78,13 @@ if uci_state is self.STOP_SIGNAL: return try: - if uci_state=="new": - log.debug( "Calling create UCI" ) + if uci_state==uci_states.NEW: # "new": self.createUCI( uci_wrapper ) - elif uci_state=="deleting": + elif uci_state==uci_states.DELETING: #"deleting": self.deleteUCI( uci_wrapper ) - elif uci_state=="submitted": - log.debug( "Calling start UCI" ) + elif uci_state==uci_states.SUBMITTED: #"submitted": self.startUCI( uci_wrapper ) - elif uci_state=="shutting-down": + elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down": self.stopUCI( uci_wrapper ) except: log.exception( "Uncaught exception executing request." ) @@ -75,19 +103,21 @@ Generate keypair using user's default credentials """ log.debug( "Getting user's keypair" ) - kp = conn.get_key_pair( self.key_pair ) instances = uci_wrapper.get_instances_indexes() - try: + kp = conn.get_key_pair( self.key_pair ) for inst in instances: - log.debug("inst: '%s'" % inst ) +# log.debug("inst: '%s'" % inst ) uci_wrapper.set_key_pair( inst, kp.name ) return kp.name - except AttributeError: # No keypair under this name exists so create it - log.info( "No keypair found, creating keypair '%s'" % self.key_pair ) - kp = conn.create_key_pair( self.key_pair ) - for inst in instances: - uci_wrapper.set_key_pair( inst, kp.name, kp.material ) + except boto.exception.EC2ResponseError, e: # No keypair under this name exists so create it + if e.code == 'InvalidKeyPair.NotFound': + log.info( "No keypair found, creating keypair '%s'" % self.key_pair ) + kp = conn.create_key_pair( self.key_pair ) + for inst in instances: + uci_wrapper.set_key_pair( inst, kp.name, kp.material ) + else: + log.error( "EC2 response error: '%s'" % e ) return kp.name @@ -146,20 +176,24 @@ uci_wrapper.set_store_volume_id( 0, vol.id ) # Wait for a while to ensure volume was created - vol_status = vol.status - for i in range( 30 ): - if vol_status is not "available": - log.debug( 'Updating volume status; current status: %s' % vol_status ) - vol_status = vol.status - time.sleep(3) - if i is 29: - log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) ) - conn.delete_volume( vol.id ) - uci_wrapper.change_state( uci_state='error' ) - return - - uci_wrapper.change_state( uci_state='available' ) - uci_wrapper.set_store_status( vol.id, vol_status ) +# vol_status = vol.status +# for i in range( 30 ): +# if vol_status is not "available": +# log.debug( 'Updating volume status; current status: %s' % vol_status ) +# vol_status = vol.status +# time.sleep(3) +# if i is 29: +# log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) ) +# conn.delete_volume( vol.id ) +# uci_wrapper.change_state( uci_state='error' ) +# return + vl = conn.get_all_volumes( [vol.id] ) + if len( vl ) > 0: + uci_wrapper.change_state( uci_state=vl[0].status ) + uci_wrapper.set_store_status( vol.id, vl[0].status ) + else: + uci_wrapper.change_state( uci_state=uci_states.ERROR ) + uci_wrapper.set_store_status( vol.id, uci_states.ERROR ) def deleteUCI( self, uci_wrapper ): """ @@ -191,7 +225,7 @@ else: log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \ MANUAL intervention and processing needed." % ( failedList, deletedList ) ) - uci_wrapper.change_state( uci_state="error" ) + uci_wrapper.change_state( uci_state=uci_state.ERROR ) def addStorageToUCI( self, name ): """ Adds more storage to specified UCI @@ -201,7 +235,7 @@ uci = uci_wrapper.get_uci() log.debug( "Would be starting instance '%s'" % uci.name ) - uci_wrapper.change_state( 'pending' ) + uci_wrapper.change_state( uci_state.PENDING ) # log.debug( "Sleeping a bit... (%s)" % uci.name ) # time.sleep(20) # log.debug( "Woke up! (%s)" % uci.name ) @@ -213,16 +247,16 @@ conn = self.get_connection( uci_wrapper ) # self.set_keypair( uci_wrapper, conn ) - i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of *new* i_indexes associated with this UCI + i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of i_indexes associated with this UCI whose state is 'None' log.debug( "Starting instances with IDs: '%s' associated with UCI '%s' " % ( uci_wrapper.get_name(), i_indexes ) ) for i_index in i_indexes: mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) ) - log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) +# log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) uci_wrapper.set_mi( i_index, mi_id ) # Check if galaxy security group exists (and create it if it does not) - log.debug( '***** Setting up security group' ) +# log.debug( '***** Setting up security group' ) security_group = 'galaxyWeb' sgs = conn.get_all_security_groups() # security groups gsgt = False # galaxy security group test @@ -235,12 +269,15 @@ gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port # Start an instance - log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() ) + log.debug( "***** Starting instance for UCI '%s'" % uci_wrapper.get_name() ) + #TODO: Get customization scripts remotley and pass volID and user credential data only as user data from here. + userdata = open('/Users/afgane/Dropbox/Galaxy/EC2startupScripts/web/ec2autorun.zip', 'rb').read() log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s, security_groups=[%s], instance_type=%s, placement=%s )' % ( mi_id, uci_wrapper.get_key_pair_name( i_index ), [security_group], uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) ) reservation = conn.run_instances( image_id=mi_id, key_name=uci_wrapper.get_key_pair_name( i_index ), security_groups=[security_group], + user_data=userdata, instance_type=uci_wrapper.get_type( i_index ), placement=uci_wrapper.get_uci_availability_zone() ) # Record newly available instance data into local Galaxy database @@ -337,8 +374,7 @@ # uci.launch_time = None # uci.flush() # - log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() ) - + log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() ) # dbInstances = get_instances( trans, uci ) #TODO: handle list! @@ -388,20 +424,26 @@ def update( self ): """ - Runs a global status update on all storage volumes and all instances whose UCI is in - 'running', 'pending', or 'shutting-down' state. + Runs a global status update on all instances that are in 'running', 'pending', "creating", or 'shutting-down' state. + Also, runs update on all storage volumes that are in "in-use", "creating", or 'None' state. Reason behind this method is to sync state of local DB and real-world resources """ - log.debug( "Running general status update for EPC UCIs." ) - instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all() + log.debug( "Running general status update for EC2 UCIs..." ) + instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, #"running", + model.CloudInstance.c.state==instance_states.PENDING, #"pending", + model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all() for inst in instances: - log.debug( "Running general status update on instance '%s'" % inst.instance_id ) - self.updateInstance( inst ) + if self.name == inst.uci.credentials.provider_name: + log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider_name, inst.instance_id ) ) + self.updateInstance( inst ) - stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all() + stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_states.IN_USE, + model.CloudStore.c.status==store_states.CREATING, + model.CloudStore.c.status==None ) ).all() for store in stores: - log.debug( "Running general status update on store '%s'" % store.volume_id ) - self.updateStore( store ) + if self.name == store.uci.credentials.provider_name: + log.debug( "[%s] Running general status update on store '%s'" % ( store.uci.credentials.provider_name, store.volume_id ) ) + self.updateStore( store ) def updateInstance( self, inst ): @@ -420,8 +462,8 @@ # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference... if len( rl ) == 0: log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id ) - inst.state = 'terminated' - uci.state = 'available' + inst.state = instance_states.TERMINATED + uci.state = uci_states.AVAILABLE uci.launch_time = None inst.flush() uci.flush() @@ -429,14 +471,14 @@ for r in rl: for i, cInst in enumerate( r.instances ): s = cInst.update() - log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) ) + log.debug( "Checking state of cloud instance '%s' associated with UCI '%s' and reservation '%s'. State='%s'" % ( cInst, uci.name, r, s ) ) if s != inst.state: inst.state = s inst.flush() - if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available' - uci.state = 'available' + if s == instance_states.TERMINATED: # After instance has shut down, ensure UCI is marked as 'available' + uci.state = uci_states.AVAILABLE uci.flush() - if s != uci.state and s != 'terminated': + if s != uci.state and s != instance_states.TERMINATED: # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. uci.state = s uci.flush() @@ -461,6 +503,12 @@ # log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) ) # Update store status in local DB with info from cloud provider if store.status != vl[0].status: + # In case something failed during creation of UCI but actual storage volume was created and yet + # UCI state remained as 'new', try to remedy this by updating UCI state here + if ( store.status == None ) and ( store.volume_id != None ): + uci.state = vl[0].status + uci.flush() + store.status = vl[0].status store.flush() if store.i_id != vl[0].instance_id: diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/cloud/providers/eucalyptus.py --- a/lib/galaxy/cloud/providers/eucalyptus.py Fri Oct 16 13:06:44 2009 -0400 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Mon Oct 19 17:45:49 2009 -0400 @@ -5,6 +5,7 @@ from galaxy import model # Database interaction class from galaxy.model import mapping from galaxy.datatypes.data import nice_size +from galaxy.util.bunch import Bunch from Queue import Queue from sqlalchemy import or_ @@ -16,12 +17,41 @@ import logging log = logging.getLogger( __name__ ) +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + DELETED = "deleted" +) + +instance_states = Bunch( + TERMINATED = "terminated", + RUNNING = "running", + PENDING = "pending", + SHUTTING_DOWN = "shutting-down" +) + +store_states = Bunch( + IN_USE = "in-use", + CREATING = "creating" +) + class EucalyptusCloudProvider( object ): """ Eucalyptus-based cloud provider implementation for managing instances. """ STOP_SIGNAL = object() def __init__( self, app ): + self.name = "eucalyptus" self.zone = "epc" self.key_pair = "galaxy-keypair" self.queue = Queue() @@ -48,15 +78,13 @@ if uci_state is self.STOP_SIGNAL: return try: - if uci_state=="new": - log.debug( "Calling create UCI" ) + if uci_state==uci_states.NEW: # "new": self.createUCI( uci_wrapper ) - elif uci_state=="deleting": + elif uci_state==uci_states.DELETING: #"deleting": self.deleteUCI( uci_wrapper ) - elif uci_state=="submitted": - log.debug( "Calling start UCI" ) + elif uci_state==uci_states.SUBMITTED: #"submitted": self.startUCI( uci_wrapper ) - elif uci_state=="shutting-down": + elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down": self.stopUCI( uci_wrapper ) except: log.exception( "Uncaught exception executing request." ) @@ -149,7 +177,7 @@ uci_wrapper.set_store_volume_id( 0, vol.id ) # EPC does not allow creation of storage volumes (it deletes one as soon as it is created, so manually set uci_state here) - uci_wrapper.change_state( uci_state='available' ) + uci_wrapper.change_state( uci_state=uci_states.AVAILABLE ) uci_wrapper.set_store_status( vol.id, vol.status ) def deleteUCI( self, uci_wrapper ): @@ -182,7 +210,7 @@ else: log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \ MANUAL intervention and processing needed." % ( failedList, deletedList ) ) - uci_wrapper.change_state( uci_state="error" ) + uci_wrapper.change_state( uci_state=uci_states.ERROR ) def addStorageToUCI( self, name ): """ Adds more storage to specified UCI """ @@ -191,7 +219,7 @@ uci = uci_wrapper.get_uci() log.debug( "Would be starting instance '%s'" % uci.name ) - uci_wrapper.change_state( 'pending' ) + uci_wrapper.change_state( uci_states.PENDING ) # log.debug( "Sleeping a bit... (%s)" % uci.name ) # time.sleep(20) # log.debug( "Woke up! (%s)" % uci.name ) @@ -322,7 +350,7 @@ # uci.launch_time = None # uci.flush() # - log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() ) + log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() ) @@ -373,20 +401,26 @@ def update( self ): """ - Runs a global status update on all storage volumes and all instances whose UCI is in - 'running', 'pending', or 'shutting-down' state. + Runs a global status update on all instances that are in 'running', 'pending', "creating", or 'shutting-down' state. + Also, runs update on all storage volumes that are in "in-use", "creating", or 'None' state. Reason behind this method is to sync state of local DB and real-world resources """ - log.debug( "Running general status update for EPC UCIs." ) - instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all() + log.debug( "Running general status update for EPC UCIs..." ) + instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, #"running", + model.CloudInstance.c.state==instance_states.PENDING, #"pending", + model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all() for inst in instances: - log.debug( "Running general status update on instance '%s'" % inst.instance_id ) - self.updateInstance( inst ) + if self.name == inst.uci.credentials.provider_name: + log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider_name, inst.instance_id ) ) + self.updateInstance( inst ) - stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all() + stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_states.IN_USE, + model.CloudStore.c.status==store_states.CREATING, + model.CloudStore.c.status==None ) ).all() for store in stores: - log.debug( "Running general status update on store '%s'" % store.volume_id ) - self.updateStore( store ) + if self.name == store.uci.credentials.provider_name: + log.debug( "[%s] Running general status update on store '%s'" % ( store.uci.credentials.provider_name, store.volume_id ) ) + self.updateStore( store ) def updateInstance( self, inst ): @@ -406,8 +440,8 @@ # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference... if len( rl ) == 0: log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id ) - inst.state = 'terminated' - uci.state = 'available' + inst.state = instance_states.TERMINATED + uci.state = uci_states.AVAILABLE uci.launch_time = None inst.flush() uci.flush() @@ -419,10 +453,10 @@ if s != inst.state: inst.state = s inst.flush() - if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available' - uci.state = 'available' + if s == instance_states.TERMINATED: # After instance has shut down, ensure UCI is marked as 'available' + uci.state = uci_states.AVAILABLE uci.flush() - if s != uci.state and s != 'terminated': + if s != uci.state and s != instance_states.TERMINATED: # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. uci.state = s uci.flush() @@ -448,6 +482,12 @@ # log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) ) # Update store status in local DB with info from cloud provider if store.status != vl[0].status: + # In case something failed during creation of UCI but actual storage volume was created and yet + # UCI state remained as 'new', try to remedy this by updating UCI state here + if ( store.status == None ) and ( store.volume_id != None ): + uci.state = vl[0].status + uci.flush() + store.status = vl[0].status store.flush() if store.i_id != vl[0].instance_id: @@ -460,48 +500,48 @@ store.device = vl[0].device store.flush() - def updateUCI( self, uci ): - """ - Runs a global status update on all storage volumes and all instances that are - associated with specified UCI - """ - conn = self.get_connection( uci ) - - # Update status of storage volumes - vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() - vols = [] - for v in vl: - vols.append( v.volume_id ) - try: - volumes = conn.get_all_volumes( vols ) - for i, v in enumerate( volumes ): - uci.store[i].i_id = v.instance_id - uci.store[i].status = v.status - uci.store[i].device = v.device - uci.store[i].flush() - except: - log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name ) - pass - - # Update status of instances - il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() - instanceList = [] - for i in il: - instanceList.append( i.instance_id ) - log.debug( 'instanceList: %s' % instanceList ) - try: - reservations = conn.get_all_instances( instanceList ) - for i, r in enumerate( reservations ): - uci.instance[i].state = r.instances[0].update() - log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) ) - uci.state = uci.instance[i].state - uci.instance[i].public_dns = r.instances[0].dns_name - uci.instance[i].private_dns = r.instances[0].private_dns_name - uci.instance[i].flush() - uci.flush() - except: - log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name ) - pass +# def updateUCI( self, uci ): +# """ +# Runs a global status update on all storage volumes and all instances that are +# associated with specified UCI +# """ +# conn = self.get_connection( uci ) +# +# # Update status of storage volumes +# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() +# vols = [] +# for v in vl: +# vols.append( v.volume_id ) +# try: +# volumes = conn.get_all_volumes( vols ) +# for i, v in enumerate( volumes ): +# uci.store[i].i_id = v.instance_id +# uci.store[i].status = v.status +# uci.store[i].device = v.device +# uci.store[i].flush() +# except: +# log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name ) +# pass +# +# # Update status of instances +# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() +# instanceList = [] +# for i in il: +# instanceList.append( i.instance_id ) +# log.debug( 'instanceList: %s' % instanceList ) +# try: +# reservations = conn.get_all_instances( instanceList ) +# for i, r in enumerate( reservations ): +# uci.instance[i].state = r.instances[0].update() +# log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) ) +# uci.state = uci.instance[i].state +# uci.instance[i].public_dns = r.instances[0].dns_name +# uci.instance[i].private_dns = r.instances[0].private_dns_name +# uci.instance[i].flush() +# uci.flush() +# except: +# log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name ) +# pass # --------- Helper methods ------------ diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/web/controllers/cloud.py --- a/lib/galaxy/web/controllers/cloud.py Fri Oct 16 13:06:44 2009 -0400 +++ b/lib/galaxy/web/controllers/cloud.py Mon Oct 19 17:45:49 2009 -0400 @@ -29,6 +29,29 @@ import logging log = logging.getLogger( __name__ ) +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + DELETED = "deleted" +) + +instance_states = Bunch( + TERMINATED = "terminated", + RUNNING = "running", + PENDING = "pending", + SHUTTING_DOWN = "shutting-down" +) + class CloudController( BaseController ): # def __init__( self ): @@ -59,23 +82,23 @@ liveInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state=="running", - model.UCI.c.state=="pending", - model.UCI.c.state=="submitted", - model.UCI.c.state=="submittedUCI", - model.UCI.c.state=="shutting-down", - model.UCI.c.state=="shutting-downUCI" ) ) \ + .filter( or_( model.UCI.c.state==uci_states.RUNNING, #"running", + model.UCI.c.state==uci_states.PENDING, #"pending", + model.UCI.c.state==uci_states.SUBMITTED, #"submitted", + model.UCI.c.state==uci_states.SUBMITTED_UCI, #"submittedUCI", + model.UCI.c.state==uci_states.SHUTTING_DOWN, #"shutting-down", + model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI ) ) \ .order_by( desc( model.UCI.c.update_time ) ) \ .all() prevInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state=="available", - model.UCI.c.state=="new", - model.UCI.c.state=="newUCI", - model.UCI.c.state=="error", - model.UCI.c.state=="deleting", - model.UCI.c.state=="deletingUCI" ) ) \ + .filter( or_( model.UCI.c.state==uci_states.AVAILABLE, #"available", + model.UCI.c.state==uci_states.NEW, #"new", + model.UCI.c.state==uci_states.NEW_UCI, #"newUCI", + model.UCI.c.state==uci_states.ERROR, #"error", + model.UCI.c.state==uci_states.DELETING, #"deleting", + model.UCI.c.state==uci_states.DELETING_UCI ) ) \ .order_by( desc( model.UCI.c.update_time ) ) \ .all() @@ -83,9 +106,9 @@ # TODO: Auto-refresh once instance is running pendingInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state=="pending" , \ - model.UCI.c.state=="submitted" , \ - model.UCI.c.state=="submittedUCI" ) ) \ + .filter( or_( model.UCI.c.state==uci_states.PENDING, #"pending" , \ + model.UCI.c.state==uci_states.SUBMITTED, #"submitted" , \ + model.UCI.c.state==uci_states.SUBMITTED_UCI ) ) \ .all() if pendingInstances: trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to " @@ -131,12 +154,12 @@ stores = get_stores( trans, uci ) # log.debug(self.app.config.job_working_directory) if ( len(stores) is not 0 ) and \ - ( uci.state != 'submitted' ) and \ - ( uci.state != 'submittedUCI' ) and \ - ( uci.state != 'pending' ) and \ - ( uci.state != 'deleting' ) and \ - ( uci.state != 'deletingUCI' ) and \ - ( uci.state != 'error' ): + ( uci.state != uci_states.SUBMITTED ) and \ + ( uci.state != uci_states.SUBMITTED_UCI ) and \ + ( uci.state != uci_states.PENDING ) and \ + ( uci.state != uci_states.DELETING ) and \ + ( uci.state != uci_states.DELETING_UCI ) and \ + ( uci.state != uci_states.ERROR ): instance = model.CloudInstance() instance.user = user instance.image = mi @@ -169,7 +192,7 @@ # instance.instance_id = str( reservation.instances[0]).split(":")[1] # instance.state = "pending" # instance.state = reservation.instances[0].state - uci.state = 'submittedUCI' + uci.state = uci_states.SUBMITTED_UCI # Persist session = trans.sa_session @@ -181,6 +204,7 @@ trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to " "start up and then refresh this page. A button to connect to the instance will then appear alongside " "instance description." ) + time.sleep(1) # Wait for initial update to occur to avoid immediate page reload return self.list( trans ) trans.show_error_message( "Cannot start instance that is in state '%s'." % uci.state ) @@ -199,7 +223,7 @@ Stop a cloud UCI instance. This implies stopping Galaxy server and disconnecting/unmounting relevant file system(s). """ uci = get_uci( trans, id ) - uci.state = 'shutting-downUCI' + uci.state = uci_states.SHUTTING_DOWN_UCI session = trans.sa_session # session.save_or_update( stores ) session.save_or_update( uci ) @@ -252,7 +276,6 @@ # session.flush() # trans.log_event( "User stopped cloud instance '%s'" % uci.name ) # trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) -# return self.list( trans ) @web.expose @@ -264,9 +287,9 @@ """ uci = get_uci( trans, id ) - if ( uci.state != 'deletingUCI' ) and ( uci.state != 'deleting' ) and ( uci.state != 'error' ): + if ( uci.state != uci_states.DELETING_UCI ) and ( uci.state != uci_states.DELETING ) and ( uci.state != uci_states.ERROR ): name = uci.name - uci.state = "deletingUCI" + uci.state = uci_states.DELETING_UCI # dbInstances = get_instances( trans, uci ) #TODO: handle list! # # conn = get_connection( trans ) @@ -332,7 +355,7 @@ if instanceName: # Create new user configured instance try: - if trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first(): + if trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!=uci_states.DELETED ) ).first(): error['inst_error'] = "An instance with that name already exist." elif instanceName=='' or len( instanceName ) > 255: error['inst_error'] = "Instance name must be between 1 and 255 characters long." @@ -355,7 +378,7 @@ trans.app.model.CloudUserCredentials.table.c.name==credName ).first() uci.user= user uci.total_size = volSize # This is OK now because new instance is being created. - uci.state = "newUCI" + uci.state = uci_states.NEW_UCI storage = model.CloudStore() storage.user = user @@ -370,6 +393,7 @@ # Log and display the management page trans.log_event( "User configured new cloud instance" ) trans.set_message( "New Galaxy instance '%s' configured. Once instance status shows 'available' you will be able to start the instance." % instanceName ) + time.sleep(1) # Wait for initial update to occur to avoid immediate page reload return self.list( trans ) except ValueError: vol_error = "Volume size must be specified as an integer value only, between 1 and 1000." @@ -993,7 +1017,7 @@ def get_UCIs_state( trans ): user = trans.get_user() - instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.state != "deleted" ).all() + instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.state != uci_states.DELETED ).all() insd = {} # instance name-state dict for inst in instances: insd[inst.name] = inst.state @@ -1088,7 +1112,7 @@ user = trans.get_user() instances = trans.sa_session.query( model.CloudInstance ) \ .filter_by( user=user, uci_id=uci.id ) \ - .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \ + .filter( or_(model.CloudInstance.table.c.state==instance_states.RUNNING, model.CloudInstance.table.c.state==instance_states.PENDING ) ) \ .first() #.all() #TODO: return all but need to edit calling method(s) to handle list @@ -1172,7 +1196,7 @@ session.flush() # If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS) - if oldState=="pending" and dbInstances.state=="running": + if oldState==instance_states.PENDING and dbInstances.state==instance_states.RUNNING: update_instance( trans, dbInstances, cloudInstance, conn, uci ) diff -r 9881b0df3252 -r 049083fee997 templates/cloud/configure_cloud.mako --- a/templates/cloud/configure_cloud.mako Fri Oct 16 13:06:44 2009 -0400 +++ b/templates/cloud/configure_cloud.mako Mon Oct 19 17:45:49 2009 -0400 @@ -26,7 +26,7 @@ <li> <a class="action-button" href="${h.url_for( action='add' )}"> <img src="${h.url_for('/static/images/silk/add.png')}" /> - <span>Add AWS credentials</span> + <span>Add credentials</span> </a> </li> </ul>