[hg] galaxy 3068: Decoupled cloud controller from cloud provider...
details: http://www.bx.psu.edu/hg/galaxy/rev/6aab50510e43 changeset: 3068:6aab50510e43 user: Enis Afgan <afgane@gmail.com> date: Wed Oct 14 19:20:11 2009 -0400 description: Decoupled cloud controller from cloud providers. Added cloud manager that runs as a daemon and pools database for change of ststus (i.e., user action). Implemented code to support 'eucalyptus' provider (tested with Eucalyptus Public Cloud). diffstat: lib/galaxy/cloud/__init__.py | 367 ++++++++++++++++- lib/galaxy/cloud/providers/eucalyptus.py | 476 +++++++++++++++++----- lib/galaxy/model/mapping.py | 2 +- lib/galaxy/model/migrate/versions/0014_cloud_tables.py | 8 +- lib/galaxy/web/controllers/cloud.py | 137 ++++-- templates/cloud/add_credentials.mako | 97 ++++ 6 files changed, 895 insertions(+), 192 deletions(-) diffs (1401 lines): diff -r 7c438fd3cf4a -r 6aab50510e43 lib/galaxy/cloud/__init__.py --- a/lib/galaxy/cloud/__init__.py Fri Oct 02 18:31:32 2009 -0400 +++ b/lib/galaxy/cloud/__init__.py Wed Oct 14 19:20:11 2009 -0400 @@ -7,6 +7,7 @@ from galaxy.datatypes.interval import * from galaxy.datatypes import metadata from galaxy.util.bunch import Bunch +from sqlalchemy import or_ import pkg_resources pkg_resources.require( "PasteDeploy" ) @@ -147,13 +148,18 @@ # HACK: Delay until after forking, we need a way to do post fork notification!!! time.sleep( 10 ) + cnt = 0 # Run global update only periodically so keep counter variable while self.running: try: # log.debug( "Calling monitor_step" ) self.__monitor_step() + if cnt%30 == 0: # Run global update every 30 seconds + self.provider.update() + cnt = 0 except: log.exception( "Exception in cloud manager monitor_step" ) # Sleep + cnt += 1 self.sleeper.sleep( 2 ) def __monitor_step( self ): @@ -167,31 +173,53 @@ it is marked as having errors and removed from the queue. Otherwise, the job is dispatched. """ - # Get an orm session + # Get an orm (object relational mapping) session session = mapping.Session() # Pull all new jobs from the queue at once - new_jobs = [] - new_instances = [] - new_UCIs = [] - stop_UCIs = [] + new_requests = [] +# new_instances = [] +# new_UCIs = [] +# stop_UCIs = [] +# delete_UCIs = [] # for r in session.query( model.cloud_instance ).filter( model.cloud_instance.s.state == model.cloud_instance.states.NEW ).all(): # new_instances - for r in session.query( model.UCI ).filter( model.UCI.c.state == "new" ).all(): - new_UCIs.append( r ) - for r in new_UCIs: - self.provider.createUCI( r ) + for r in session.query( model.UCI ) \ + .filter( or_( model.UCI.c.state=="newUCI", + model.UCI.c.state=="submittedUCI", + model.UCI.c.state=="shutting-downUCI", + model.UCI.c.state=="deletingUCI" ) ) \ + .all(): + uci = UCIwrapper( r ) + new_requests.append( uci ) +# log.debug( 'new_requests: %s' % new_requests ) + for uci in new_requests: + session.clear() +# log.debug( 'r.name: %s, state: %s' % ( r.name, r.state ) ) +# session.save_or_update( r ) +# session.flush() + self.provider.put( uci ) - for r in session.query( model.UCI ).filter( model.UCI.c.state == "submitted" ).all(): - new_instances.append( r ) - for r in new_instances: - self.provider.startUCI( r ) + # Done with the session + mapping.Session.remove() - for r in session.query( model.UCI ).filter( model.UCI.c.state == "terminating" ).all(): - stop_UCIs.append( r ) - for r in stop_UCIs: - self.provider.stopUCI( r ) +# for r in session.query( model.UCI ).filter( model.UCI.c.state == "submitted" ).all(): +# new_instances.append( r ) +# for r in new_instances: +# self.provider.startUCI( r ) +# +# for r in session.query( model.UCI ).filter( model.UCI.c.state == "shutting-down" ).all(): +# stop_UCIs.append( r ) +# for r in stop_UCIs: +# self.provider.stopUCI( r ) +# +# for r in session.query( model.UCI ).filter( model.UCI.c.state == "deleting" ).all(): +# delete_UCIs.append( r ) +# for r in delete_UCIs: +# self.provider.deleteUCI( r ) + + # if self.track_jobs_in_database: # for j in session.query( model.Job ).options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ).filter( model.Job.c.state == model.Job.states.NEW ).all(): @@ -327,6 +355,281 @@ log.info( "cloud manager stopped" ) self.dispatcher.shutdown() +class UCIwrapper( object ): + """ + Wraps 'model.UCI' with convenience methods for state management + """ + def __init__( self, uci ): + self.uci_id = uci.id + + # --------- Setter methods ----------------- + + def change_state( self, uci_state=None, instance_id=None, i_state=None ): + """ + Sets state for UCI and/or UCI's instance with instance_id as provided by cloud provider and stored in local + Galaxy database. + Need to provide either state for the UCI or instance_id and it's state or all arguments. + """ +# log.debug( "Changing state - new uci_state: %s, instance_id: %s, i_state: %s" % ( uci_state, instance_id, i_state ) ) + if uci_state is not None: + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.state = uci_state + uci.flush() + if ( instance_id is not None ) and ( i_state is not None ): + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=instance_id).first() + instance.state = i_state + instance.flush() + + def set_mi( self, i_index, mi_id ): + """ + Sets Machine Image (MI), e.g., 'ami-66fa190f', for UCI's instance with given index as it + is stored in local Galaxy database. + """ + mi = model.CloudImage.filter( model.CloudImage.c.image_id==mi_id ).first() + instance = model.CloudInstance.get( i_index ) + instance.image = mi + instance.flush() + + def set_key_pair( self, i_index, key_name, key_material=None ): + """ + Single UCI may instantiate many instances, i_index refers to the numeric index + of instance controlled by this UCI as it is stored in local DB (see get_instances_ids()). + """ + instance = model.CloudInstance.get( i_index ) + instance.keypair_name = key_name + if key_material is not None: + instance.keypair_material = key_material + instance.flush() + + def set_launch_time( self, launch_time, i_index=None, i_id=None ): + """ + Stores launch time in local database for instance with specified index (as it is stored in local + Galaxy database) or with specified instance ID (as obtained from the cloud provider AND stored + in local Galaxy Database). Only one of i_index or i_id needs to be provided. + """ + if i_index != None: + instance = model.CloudInstance.get( i_index ) + instance.launch_time = launch_time + instance.flush() + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + instance.launch_time = launch_time + instance.flush() + + def set_uci_launch_time( self, launch_time ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.launch_time = launch_time + uci.flush() + + def set_stop_time( self, stop_time, i_index=None, i_id=None ): + if i_index != None: + instance = model.CloudInstance.get( i_index ) + instance.stop_time = stop_time + instance.flush() + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + instance.stop_time = stop_time + instance.flush() + + def reset_uci_launch_time( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.launch_time = None + uci.flush() + + def set_reservation_id( self, i_index, reservation_id ): + instance = model.CloudInstance.get( i_index ) + instance.reservation_id = reservation_id + instance.flush() + + def set_instance_id( self, i_index, instance_id ): + """ + i_index refers to UCI's instance ID as stored in local database + instance_id refers to real-world, cloud resource ID (e.g., 'i-78hd823a') + """ + instance = model.CloudInstance.get( i_index ) + instance.instance_id = instance_id + instance.flush() + + def set_public_dns( self, instance_id, public_dns ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.instance[instance_id].public_dns = public_dns + uci.instance[instance_id].flush() + + def set_private_dns( self, instance_id, private_dns ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.instance[instance_id].private_dns = private_dns + uci.instance[instance_id].flush() + + def set_store_device( self, store_id, device ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].device = device + uci.store[store_id].flush() + + def set_store_status( self, store_id, status ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].status = status + uci.store[store_id].flush() + + def set_store_availability_zone( self, store_id, availability_zone ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].availability_zone = availability_zone + uci.store[store_id].flush() + + def set_store_volume_id( self, store_id, volume_id ): + """ + Given store ID associated with this UCI, set volume ID as it is registered + on the cloud provider (e.g., vol-39890501) + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].volume_id = volume_id + uci.store[store_id].flush() + + def set_store_instance( self, store_id, instance_id ): + """ Stores instance ID that given store volume is attached to. """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].i_id = instance_id + uci.store[store_id].flush() + + # --------- Getter methods ----------------- + + def get_instances_indexes( self, state=None ): + """ + Returns indexes of instances associated with given UCI as they are stored in local Galaxy database and + whose state corresponds to passed argument. Returned values enable indexing instances from local Galaxy database. + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + instances = model.CloudInstance.filter_by( uci=uci ).filter( model.CloudInstance.c.state==state ).all() + il = [] + for i in instances: + il.append( i.id ) + + return il + + def get_type( self, i_index ): + instance = model.CloudInstance.get( i_index ) + return instance.type + + def get_state( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.state + + def get_instance_state( self, instance_id ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].state + + def get_instances_ids( self ): + """ + Returns list IDs of all instances' associated with this UCI that are not in 'terminated' state + (e.g., ['i-402906D2', 'i-q0290dsD2'] ). + """ + il = model.CloudInstance.filter_by( uci_id=self.uci_id ).filter( model.CloudInstance.c.state != 'terminated' ).all() + instanceList = [] + for i in il: + instanceList.append( i.instance_id ) + return instanceList + + def get_name( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.name + + def get_key_pair_name( self, i_index=None, i_id=None ): + """ + Given EITHER instance index as it is stored in local Galaxy database OR instance ID as it is + obtained from cloud provider and stored in local Galaxy database, return keypair name assocaited + with given instance. + """ + if i_index != None: + instance = model.CloudInstance.get( i_index ) + return instance.keypair_name + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + return instance.keypair_name + + def get_access_key( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.credentials.access_key + + def get_secret_key( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.credentials.secret_key + + def get_mi_id( self, instance_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].mi_id + + def get_public_dns( self, instance_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].public_dns + + def get_private_dns( self, instance_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].private_dns + + def get_store_availability_zone( self, store_id ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.store[store_id].availability_zone + + def get_store_size( self, store_id ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.store[store_id].size + + def get_store_volume_id( self, store_id ): + """ + Given store ID associated with this UCI, get volume ID as it is registered + on the cloud provider (e.g., 'vol-39890501') + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.store[store_id].volume_id + + def get_all_stores( self ): + """ Returns all storage volumes' database objects associated with this UCI. """ + return model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() +# svs = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() +# svl = [] # storage volume list +# for sv in svs: +# svl.append( sv.volume_id ) +# return svl + + def get_uci( self ): + """ Returns database object for given UCI. """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci + + def uci_launch_time_set( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.launch_time + + def delete( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() +# uci.delete() + uci.state = 'deleted' # for bookkeeping reasons, mark as deleted but don't actually delete. + uci.flush() + class JobWrapper( object ): """ Wraps a 'model.Job' with convience methods for running processes and @@ -675,6 +978,11 @@ else: log.error( "Unable to start unknown cloud provider: %s" %self.provider_name ) + def put( self, uci_wrapper ): + """ Put given request for UCI manipulation into provider's request queue.""" +# log.debug( "Adding UCI '%s' manipulation request into cloud manager's queue." % uci_wrapper.name ) + self.cloud_provider[self.provider_name].put( uci_wrapper ) + def createUCI( self, uci ): """ Createse User Configured Instance (UCI). Essentially, creates storage volume. @@ -682,13 +990,15 @@ log.debug( "Creating UCI '%s'" % uci.name ) self.cloud_provider[self.provider_name].createUCI( uci ) - def deleteUCI( self, uciName ): + def deleteUCI( self, uci ): """ Deletes UCI. NOTE that this implies deletion of any and all data associated with this UCI from the cloud. All data will be deleted. """ + log.debug( "Deleting UCI '%s'" % uci.name ) + self.cloud_provider[self.provider_name].deleteUCI( uci ) - def addStorageToUCI( self, uciName ): + def addStorageToUCI( self, uci ): """ Adds more storage to specified UCI """ def startUCI( self, uci ): @@ -706,17 +1016,16 @@ """ log.debug( "Stopping UCI '%s'" % uci.name ) self.cloud_provider[self.provider_name].stopUCI( uci ) + + def update( self ): + """ + Runs a global status update on all storage volumes and all instances whose UCI is + 'running' state. + Reason behind this method is to sync state of local DB and real world resources + """ +# log.debug( "Running global update" ) + self.cloud_provider[self.provider_name].update() - def put( self, job_wrapper ): - runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) - self.cloud_provider[runner_name].put( job_wrapper ) - - def stop( self, job ): - runner_name = ( job.job_runner_name.split(":", 1) )[0] - log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) - self.cloud_provider[runner_name].stop_job( job ) - def recover( self, job, job_wrapper ): runner_name = ( job.job_runner_name.split(":", 1) )[0] log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) diff -r 7c438fd3cf4a -r 6aab50510e43 lib/galaxy/cloud/providers/eucalyptus.py --- a/lib/galaxy/cloud/providers/eucalyptus.py Fri Oct 02 18:31:32 2009 -0400 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Wed Oct 14 19:20:11 2009 -0400 @@ -3,7 +3,10 @@ from datetime import datetime from galaxy import model # Database interaction class +from galaxy.model import mapping from galaxy.datatypes.data import nice_size +from Queue import Queue +from sqlalchemy import or_ import galaxy.eggs galaxy.eggs.require("boto") @@ -22,56 +25,86 @@ log.debug( "Using eucalyptus as default cloud provider." ) self.zone = "epc" self.key_pair = "galaxy-keypair" + self.queue = Queue() #TODO: Use multiple threads to process requests? + self.threads = [] + nworkers = 5 + log.info( "Starting eucalyptus cloud controller workers" ) + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.threads.append( worker ) + log.debug( "%d cloud workers ready", nworkers ) - - def get_connection( self, uci ): + def run_next( self ): + """Run the next job, waiting until one is available if necessary""" + cnt = 0 + while 1: + + uci_wrapper = self.queue.get() +# uci = uci_wrapper.get_uci() + log.debug( '[%d] uci name: %s' % ( cnt, uci_wrapper.get_name() ) ) + uci_state = uci_wrapper.get_state() + if uci_state is self.STOP_SIGNAL: + return + try: + if uci_state=="new": + log.debug( "Calling create UCI" ) + self.createUCI( uci_wrapper ) + elif uci_state=="deleting": + self.deleteUCI( uci_wrapper ) + elif uci_state=="submitted": + log.debug( "Calling start UCI" ) + self.startUCI( uci_wrapper ) + elif uci_state=="shutting-down": + self.stopUCI( uci_wrapper ) + except: + log.exception( "Uncaught exception executing request." ) + cnt += 1 + + def get_connection( self, uci_wrapper ): """ Establishes EC2 connection using user's default credentials """ log.debug( '##### Establishing cloud connection' ) -# creds = model.CloudUserCredentials.filter_by( user=user, defaultCred=True ).first() - a_key = uci.credentials.access_key - s_key = uci.credentials.secret_key # Amazon EC2 - #conn = EC2Connection( a_key, s_key ) + #conn = EC2Connection( uci_wrapper.get_access_key(), uci_wrapper.get_secret_key() ) + # Eucalyptus Public Cloud - # TODO: Add option in Galaxy config file to specify these values (i.e., for locally manages Eucalyptus deployments) + # TODO: Add option in Galaxy config file to specify these values (i.e., for locally managed Eucalyptus deployments) euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" ) - conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" ) + conn = EC2Connection( aws_access_key_id=uci_wrapper.get_access_key(), aws_secret_access_key=uci_wrapper.get_secret_key(), is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" ) return conn - def get_keypair_name( self, uci, conn ): + def set_keypair( self, uci_wrapper, conn ): """ Generate keypair using user's default credentials """ log.debug( "Getting user's keypair" ) kp = conn.get_key_pair( self.key_pair ) + instances = uci_wrapper.get_instances_indexes() try: - for i, inst in enumerate( uci.instance ): - uci.instance[i].keypair_name = kp.name + for inst in instances: + log.debug("inst: '%s'" % inst ) + uci_wrapper.set_key_pair( inst, kp.name ) return kp.name except AttributeError: # No keypair under this name exists so create it - log.debug( "No keypair found, creating keypair '%s'" % self.key_pair ) + log.info( "No keypair found, creating keypair '%s'" % self.key_pair ) kp = conn.create_key_pair( self.key_pair ) - for i, inst in enumerate( uci.instance ): - uci.instance[i].keypair_name = kp.name - uci.instance[i].keypair_material = kp.material - uci.flush() - # TODO: Store key_pair.material into instance table - this is the only time private key can be retrieved - # Actually, probably return key_pair to calling method and store name & key from there... - + for inst in instances: + uci_wrapper.set_key_pair( inst, kp.name, kp.material ) + return kp.name - def get_mi( self, type='small' ): + def get_mi_id( self, type ): """ Get appropriate machine image (mi) based on instance size. TODO: Dummy method - need to implement logic For valid sizes, see http://aws.amazon.com/ec2/instance-types/ """ - return model.CloudImage.filter( model.CloudImage.table.c.id==1 ).first() + return model.CloudImage.filter( model.CloudImage.table.c.id==1 ).first().image_id # def get_instances( self, uci ): # """ @@ -91,25 +124,31 @@ def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" log.info( "sending stop signal to worker threads in eucalyptus cloud manager" ) - self.queue.put( self.STOP_SIGNAL ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) log.info( "eucalyptus cloud manager stopped" ) + + def put( self, uci_wrapper ): + # Get rid of UCI from state description + state = uci_wrapper.get_state() + uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) + self.queue.put( uci_wrapper ) - def createUCI( self, uci ): + def createUCI( self, uci_wrapper ): """ Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider and registers relevant information in Galaxy database. """ - conn = self.get_connection( uci ) + conn = self.get_connection( uci_wrapper ) # Temporary code - need to ensure user selects zone at UCI creation time! - if uci.store[0].availability_zone=='': + if uci_wrapper.get_store_availability_zone( 0 )=='': log.info( "Availability zone for storage volume was not selected, using default zone: %s" % self.zone ) - uci.store[0].availability_zone = self.zone - uci.store[0].flush() + uci_wrapper.set_store_availability_zone( 0, self.zone ) #TODO: check if volume associated with UCI already exists (if server crashed for example) and don't recreate it - log.debug( "Creating volume in zone '%s'..." % uci.store[0].availability_zone ) - vol = conn.create_volume( uci.store[0].size, uci.store[0].availability_zone, snapshot=None ) - uci.store[0].volume_id = vol.id + log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_store_availability_zone( 0 ) ) + vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_store_availability_zone( 0 ), snapshot=None ) + uci_wrapper.set_store_volume_id( 0, vol.id ) # Wait for a while to ensure volume was created # vol_status = vol.status @@ -125,111 +164,180 @@ # uci.flush() # return - uci.state = 'available' - uci.store[0].status = vol.status - uci.store[0].flush() - uci.flush() + # EPC does not allow creation of storage volumes (it deletes one as soon as it is created, so manually set uci_state here) + uci_wrapper.change_state( uci_state='available' ) + uci_wrapper.set_store_status( 0, vol.status ) - def deleteUCI( self, name ): + def deleteUCI( self, uci_wrapper ): """ Deletes UCI. NOTE that this implies deletion of any and all data associated with this UCI from the cloud. All data will be deleted. """ - + conn = self.get_connection( uci_wrapper ) + vl = [] # volume list + count = 0 # counter for checking if all volumes assoc. w/ UCI were deleted + + # Get all volumes assoc. w/ UCI, delete them from cloud as well as in local DB + vl = uci_wrapper.get_all_stores() + deletedList = [] + failedList = [] + for v in vl: + log.debug( "Deleting volume with id='%s'" % v.volume_id ) + if conn.delete_volume( v.volume_id ): + deletedList.append( v.volume_id ) + v.delete() + v.flush() + count += 1 + else: + failedList.append( v.volume_id ) + + # Delete UCI if all of associated + log.debug( "count=%s, len(vl)=%s" % (count, len( vl ) ) ) + if count == len( vl ): + uci_wrapper.delete() + else: + log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \ + MANUAL intervention and processing needed." % ( failedList, deletedList ) ) + uci_wrapper.change_state( uci_state="error" ) + def addStorageToUCI( self, name ): """ Adds more storage to specified UCI """ + + def dummyStartUCI( self, uci_wrapper ): - def startUCI( self, uci ): + uci = uci_wrapper.get_uci() + log.debug( "Would be starting instance '%s'" % uci.name ) + uci_wrapper.change_state( 'pending' ) +# log.debug( "Sleeping a bit... (%s)" % uci.name ) +# time.sleep(20) +# log.debug( "Woke up! (%s)" % uci.name ) + + def startUCI( self, uci_wrapper ): """ - Starts an instance of named UCI on the cloud. This implies, mounting of - storage and starting Galaxy instance. + Starts an instance of named UCI on the cloud. """ - conn = self.get_connection( uci ) + conn = self.get_connection( uci_wrapper ) +# + self.set_keypair( uci_wrapper, conn ) - uci.instance[0].keypair_name = self.get_keypair_name( uci, conn ) - mi = self.get_mi( uci.instance[0].type ) + i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of i_indexes associated with this UCI -# log.debug( "mi: %s, mi.image_id: %s, uci.instance[0].keypair_name: %s" % ( mi, mi.image_id, uci.instance[0].keypair_name ) ) - uci.instance[0].image = mi + for i_index in i_indexes: + mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) ) + log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) + uci_wrapper.set_mi( i_index, mi_id ) + + # log.debug( '***** Setting up security group' ) + # If not existent, setup galaxy security group + # try: + # gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.') + # gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port + # gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port + # except: + # pass + # sgs = conn.get_all_security_groups() + # for i in range( len( sgs ) ): + # if sgs[i].name == "galaxy": + # sg.append( sgs[i] ) + # break # only 1 security group w/ this name can exist, so continue + + log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() ) + log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) ) + reservation = conn.run_instances( image_id=mi_id, key_name=uci_wrapper.get_key_pair_name( i_index ) ) + #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=['galaxy'], instance_type=instance.type, placement=instance.availability_zone ) + l_time = datetime.utcnow() + uci_wrapper.set_launch_time( l_time, i_index=i_index ) # format_time( reservation.i_indexes[0].launch_time ) ) + if not uci_wrapper.uci_launch_time_set(): + uci_wrapper.set_uci_launch_time( l_time ) + uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) + i_id = str( reservation.instances[0]).split(":")[1] + uci_wrapper.set_instance_id( i_index, i_id ) + s = reservation.instances[0].state # TODO: once more than a single instance will be started through single reservation, change this + uci_wrapper.change_state( s, i_id, s ) + log.debug( "UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) -# log.debug( '***** Setting up security group' ) - # If not existent, setup galaxy security group -# try: -# gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.') -# gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port -# gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port -# except: -# pass -# sgs = conn.get_all_security_groups() -# for i in range( len( sgs ) ): -# if sgs[i].name == "galaxy": -# sg.append( sgs[i] ) -# break # only 1 security group w/ this name can exist, so continue - - log.debug( "***** Starting UCI instance '%s'" % uci.name ) -# log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( uci.instance[0].image.image_id, uci.instance[0].keypair_name ) ) - reservation = conn.run_instances( image_id=uci.instance[0].image.image_id, key_name=uci.instance[0].keypair_name ) - #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=['galaxy'], instance_type=instance.type, placement=instance.availability_zone ) - uci.instance[0].launch_time = datetime.utcnow() - uci.launch_time = uci.instance[0].launch_time - uci.instance[0].reservation_id = str( reservation ).split(":")[1] - uci.instance[0].instance_id = str( reservation.instances[0]).split(":")[1] - s = reservation.instances[0].state - uci.instance[0].state = s - uci.state = s - uci.instance[0].flush() - uci.flush() - # Wait until instance gets running and then update the DB - while s!="running": - log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) ) - time.sleep( 15 ) - s = reservation.instances[0].update() - - uci.instance[0].state = s - uci.state = s - uci.instance[0].public_dns = reservation.instances[0].dns_name - uci.instance[0].private_dns = reservation.instances[0].private_dns_name - uci.instance[0].flush() - uci.flush() +# # Wait until instance gets running and then update the DB +# while s!="running": +# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) ) +# time.sleep( 15 ) +# s = reservation.i_indexes[0].update() +# +# # Update instance data in local DB +# uci.instance[0].state = s +# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name +# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name +# uci.instance[0].flush() +# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state +# # and does not connect or wait on connection between instance and volume to be established +# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all() +# vols = [] +# for v in vl: +# vols.append( v.volume_id ) +# try: +# volumes = conn.get_all_volumes( vols ) +# for i, v in enumerate( volumes ): +# uci.store[i].i_id = v.instance_id +# uci.store[i].status = v.status +# uci.store[i].device = v.device +# uci.store[i].flush() +# except BotoServerError: +# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." ) +# +# uci.state = s +# uci.flush() - def stopUCI( self, uci ): + + def stopUCI( self, uci_wrapper): """ Stops all of cloud instances associated with named UCI. """ - conn = self.get_connection( uci ) - tl = [] # temination list + conn = self.get_connection( uci_wrapper ) - for i, inst in enumerate( uci.instance ): - tl.append( uci.instance[i].instance_id ) + # Get all instances associated with given UCI + il = uci_wrapper.get_instances_ids() # instance list +# log.debug( 'List of instances being terminated: %s' % il ) + rl = conn.get_all_instances( il ) # Reservation list associated with given instances - instList = conn.get_all_instances( tl ) -# log.debug( 'instList: %s' % instList ) - - for i, inst in enumerate( instList ): -# log.debug( 'inst: %s' % inst ) - log.debug( 'Before stop - inst.instances[0].update(): %s' % inst.instances[0].update() ) - inst.instances[0].stop() - log.debug( 'After stop - inst.instances[0].update(): %s' % inst.instances[0].update() ) - uci.instance[i].stop_time = datetime.utcnow() +# tState = conn.terminate_instances( il ) +# # TODO: Need to update instance stop time (for all individual instances) +# stop_time = datetime.utcnow() +# uci_wrapper.set_stop_time( stop_time ) - terminated=0 - while terminated!=len( instList ): - for i, inst in enumerate( instList ): - log.debug( "inst state: %s" % inst.instances[0].state ) - state = inst.instances[0].update() - if state=='terminated': - uci.instance[i].state = state - uci.instance[i].flush() - terminated += 1 - time.sleep ( 5 ) - - uci.state = 'available' - uci.launch_time = None - uci.flush() - - log.debug( "All instances for UCI '%s' were terminated." % uci.name ) + # Initiate shutdown of all instances under given UCI + cnt = 0 + stopped = [] + notStopped = [] + for r in rl: + for inst in r.instances: + log.debug( "Sending stop signal to instance '%s' associated with reservation '%s'." % ( inst, r ) ) + inst.stop() + uci_wrapper.set_stop_time( datetime.utcnow(), i_id=inst.id ) + uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() ) + stopped.append( inst ) + +# uci_wrapper.change_state( uci_state='available' ) + uci_wrapper.reset_uci_launch_time() + +# # Wait for all instances to actually terminate and update local DB +# terminated=0 +# while terminated!=len( rl ): +# for i, r in enumerate( rl ): +# log.debug( "r state: %s" % r.instances[0].state ) +# state = r.instances[0].update() +# if state=='terminated': +# uci.instance[i].state = state +# uci.instance[i].flush() +# terminated += 1 +# time.sleep ( 5 ) +# +# # Reset UCI state +# uci.state = 'available' +# uci.launch_time = None +# uci.flush() +# + log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() ) @@ -277,4 +385,144 @@ # session.flush() # trans.log_event( "User stopped cloud instance '%s'" % uci.name ) # trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) - \ No newline at end of file + + def update( self ): + """ + Runs a global status update on all storage volumes and all instances whose UCI is in + 'running', 'pending', or 'shutting-down' state. + Reason behind this method is to sync state of local DB and real-world resources + """ + log.debug( "Running general status update for EPC UCIs." ) + instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all() + for inst in instances: + log.debug( "Running general status update on instance '%s'" % inst.instance_id ) + self.updateInstance( inst ) + + stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all() + for store in stores: + log.debug( "Running general status update on store '%s'" % store.volume_id ) + self.updateStore( store ) + + def updateInstance( self, inst ): + + # Get credentials associated wit this instance + uci_id = inst.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Get connection + euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" ) + conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" ) + # Get reservations handle for given instance + rl= conn.get_all_instances( [inst.instance_id] ) + # Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query + # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. As a result, below code simply + # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference... + if len( rl ) == 0: + log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id ) + inst.state = 'terminated' + uci.state = 'available' + uci.launch_time = None + inst.flush() + uci.flush() + # Update instance status in local DB with info from cloud provider + for r in rl: + for i, cInst in enumerate( r.instances ): + s = cInst.update() + log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) ) + if s != inst.state: + inst.state = s + inst.flush() + if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available' + uci.state = 'available' + uci.flush() + if s != uci.state and s != 'terminated': + # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. + uci.state = s + uci.flush() + if cInst.public_dns_name != inst.public_dns: + inst.public_dns = cInst.public_dns_name + inst.flush() + if cInst.private_dns_name != inst.private_dns: + inst.private_dns = cInst.private_dns_name + inst.flush() + + def updateStore( self, store ): + # Get credentials associated wit this store + uci_id = store.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Get connection + euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" ) + conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" ) + # Get reservations handle for given store + vl = conn.get_all_volumes( [store.volume_id] ) +# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) ) + # Update store status in local DB with info from cloud provider + if store.status != vl[0].status: + store.status = vl[0].status + store.flush() + if store.i_id != vl[0].instance_id: + store.i_id = vl[0].instance_id + store.flush() + if store.attach_time != vl[0].attach_time: + store.attach_time = vl[0].attach_time + store.flush() + if store.device != vl[0].device: + store.device = vl[0].device + store.flush() + + def updateUCI( self, uci ): + """ + Runs a global status update on all storage volumes and all instances that are + associated with specified UCI + """ + conn = self.get_connection( uci ) + + # Update status of storage volumes + vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() + vols = [] + for v in vl: + vols.append( v.volume_id ) + try: + volumes = conn.get_all_volumes( vols ) + for i, v in enumerate( volumes ): + uci.store[i].i_id = v.instance_id + uci.store[i].status = v.status + uci.store[i].device = v.device + uci.store[i].flush() + except: + log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name ) + pass + + # Update status of instances + il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() + instanceList = [] + for i in il: + instanceList.append( i.instance_id ) + log.debug( 'instanceList: %s' % instanceList ) + try: + reservations = conn.get_all_instances( instanceList ) + for i, r in enumerate( reservations ): + uci.instance[i].state = r.instances[0].update() + log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) ) + uci.state = uci.instance[i].state + uci.instance[i].public_dns = r.instances[0].dns_name + uci.instance[i].private_dns = r.instances[0].private_dns_name + uci.instance[i].flush() + uci.flush() + except: + log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name ) + pass + + # --------- Helper methods ------------ + + def format_time( time ): + dict = {'T':' ', 'Z':''} + for i, j in dict.iteritems(): + time = time.replace(i, j) + return time + \ No newline at end of file diff -r 7c438fd3cf4a -r 6aab50510e43 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py Fri Oct 02 18:31:32 2009 -0400 +++ b/lib/galaxy/model/mapping.py Wed Oct 14 19:20:11 2009 -0400 @@ -965,7 +965,7 @@ properties=dict( user=relation( User ), credentials=relation( CloudUserCredentials ), instance=relation( CloudInstance, backref='uci' ), - store=relation( CloudStore, backref='uci' ) + store=relation( CloudStore, backref='uci', cascade='all, delete-orphan' ) ) ) assign_mapper( context, CloudInstance, CloudInstance.table, diff -r 7c438fd3cf4a -r 6aab50510e43 lib/galaxy/model/migrate/versions/0014_cloud_tables.py --- a/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Fri Oct 02 18:31:32 2009 -0400 +++ b/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Wed Oct 14 19:20:11 2009 -0400 @@ -82,7 +82,10 @@ CloudImage_table.create() except Exception, e: log.debug( "Creating cloud_image table failed. Table probably exists already." ) - UCI_table.create() + try: + UCI_table.create() + except Exception, e: + log.debug( "Creating UCI table failed. Table probably exists already." ) CloudInstance_table.create() CloudStore_table.create() try: @@ -115,7 +118,8 @@ log.debug( "Dropping cloud_user_credentials table failed: %s" % str( e ) ) try: - UCI_table.drop() + log.debug( "Would drop UCI table." ) +# UCI_table.drop() except Exception, e: log.debug( "Dropping UCI table failed: %s" % str( e ) ) diff -r 7c438fd3cf4a -r 6aab50510e43 lib/galaxy/web/controllers/cloud.py --- a/lib/galaxy/web/controllers/cloud.py Fri Oct 02 18:31:32 2009 -0400 +++ b/lib/galaxy/web/controllers/cloud.py Wed Oct 14 19:20:11 2009 -0400 @@ -56,25 +56,39 @@ liveInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state=="running", model.UCI.c.state=="pending", model.UCI.c.state=="terminating" ) ) \ - .order_by( desc( model.UCI.c.launch_time ) ) \ + .filter( or_( model.UCI.c.state=="running", + model.UCI.c.state=="pending", + model.UCI.c.state=="submitted", + model.UCI.c.state=="submittedUCI", + model.UCI.c.state=="shutting-down", + model.UCI.c.state=="shutting-downUCI" ) ) \ + .order_by( desc( model.UCI.c.update_time ) ) \ .all() prevInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state=="available", model.UCI.c.state=="new", model.UCI.c.state=="error", model.UCI.c.state=="submitted" ) ) \ + .filter( or_( model.UCI.c.state=="available", + model.UCI.c.state=="new", + model.UCI.c.state=="newUCI", + model.UCI.c.state=="error", + model.UCI.c.state=="deleting", + model.UCI.c.state=="deletingUCI" ) ) \ .order_by( desc( model.UCI.c.update_time ) ) \ .all() # Check after update there are instances in pending state; if so, display message # TODO: Auto-refresh once instance is running pendingInstances = trans.sa_session.query( model.UCI ) \ - .filter_by( user=user, state="pending" ) \ + .filter_by( user=user ) \ + .filter( or_( model.UCI.c.state=="pending" , \ + model.UCI.c.state=="submitted" , \ + model.UCI.c.state=="submittedUCI" ) ) \ .all() if pendingInstances: trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to " "start up and then refresh this page. A button to connect to the instance will then appear alongside " "instance description." ) + return trans.fill_template( "cloud/configure_cloud.mako", cloudCredentials = cloudCredentials, liveInstances = liveInstances, @@ -113,7 +127,13 @@ uci = get_uci( trans, id ) stores = get_stores( trans, uci ) # log.debug(self.app.config.job_working_directory) - if len(stores) is not 0: + if ( len(stores) is not 0 ) and \ + ( uci.state != 'submitted' ) and \ + ( uci.state != 'submittedUCI' ) and \ + ( uci.state != 'pending' ) and \ + ( uci.state != 'deleting' ) and \ + ( uci.state != 'deletingUCI' ) and \ + ( uci.state != 'error' ): instance = model.CloudInstance() instance.user = user instance.image = mi @@ -146,7 +166,7 @@ # instance.instance_id = str( reservation.instances[0]).split(":")[1] # instance.state = "pending" # instance.state = reservation.instances[0].state - uci.state = 'submitted' + uci.state = 'submittedUCI' # Persist session = trans.sa_session @@ -155,8 +175,14 @@ session.flush() trans.log_event ("User initiated starting of cloud instance '%s'." % uci.name ) + trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to " + "start up and then refresh this page. A button to connect to the instance will then appear alongside " + "instance description." ) return self.list( trans ) + trans.show_error_message( "Cannot start instance that is in state '%s'." % uci.state ) + return self.list( trans ) + # return trans.show_form( # web.FormBuilder( web.url_for(), "Start instance size", submit_text="Start" ) # .add_input( "radio","Small","size", value='small' ) @@ -170,7 +196,7 @@ Stop a cloud UCI instance. This implies stopping Galaxy server and disconnecting/unmounting relevant file system(s). """ uci = get_uci( trans, id ) - uci.state = 'terminating' + uci.state = 'shutting-downUCI' session = trans.sa_session # session.save_or_update( stores ) session.save_or_update( uci ) @@ -234,32 +260,38 @@ any and all storage associated with this UCI! """ uci = get_uci( trans, id ) - dbInstances = get_instances( trans, uci ) #TODO: handle list! - conn = get_connection( trans ) - session = trans.sa_session + if ( uci.state != 'deletingUCI' ) and ( uci.state != 'deleting' ) and ( uci.state != 'error' ): + name = uci.name + uci.state = "deletingUCI" + # dbInstances = get_instances( trans, uci ) #TODO: handle list! + # + # conn = get_connection( trans ) + session = trans.sa_session + # + # # Delete volume(s) associated with given uci + # stores = get_stores( trans, uci ) + # for i, store in enumerate( stores ): + # log.debug( "Deleting volume '%s' that is associated with UCI '%s'." % ( store.volume_id, uci.name ) ) + # volStat = None + # try: + # volStat = conn.delete_volume( store.volume_id ) + # except: + # log.debug ( 'Error deleting volume %s' % store.volume_id ) + # + # if volStat: + # session.delete( store ) + # + # # Delete UCI from table + # uciName = uci.name # Store name for logging + # session.delete( uci ) + + session.flush() + trans.log_event( "User deleted cloud instance '%s'" % name ) + trans.set_message( "Galaxy instance '%s' marked for deletion." % name ) + return self.list( trans ) - # Delete volume(s) associated with given uci - stores = get_stores( trans, uci ) - for i, store in enumerate( stores ): - log.debug( "Deleting volume '%s' that is associated with UCI '%s'." % ( store.volume_id, uci.name ) ) - volStat = None - try: - volStat = conn.delete_volume( store.volume_id ) - except: - log.debug ( 'Error deleting volume %s' % store.volume_id ) - - if volStat: - session.delete( store ) - - # Delete UCI from table - uciName = uci.name # Store name for logging - session.delete( uci ) - - session.flush() - trans.log_event( "User deleted cloud instance '%s'" % uciName ) - trans.set_message( "Galaxy instance '%s' deleted." % uciName ) - + trans.set_message( "Instance '%s' is already marked for deletion." % uci.name ) return self.list( trans ) @web.expose @@ -280,7 +312,7 @@ """ inst_error = vol_error = cred_error = None user = trans.get_user() - # TODO: Hack until present user w/ bullet list w/ registered credentials + # TODO: Hack until present user w/ bullet list w/ registered credentials storedCreds = trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user ).all() credsMatch = False @@ -293,8 +325,7 @@ try: if len( instanceName ) > 255: inst_error = "Instance name exceeds maximum allowable length." - elif trans.app.model.UCI.filter( - trans.app.model.UCI.table.c.name==instanceName ).first(): + elif trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first(): inst_error = "An instance with that name already exist." elif int( volSize ) > 1000: vol_error = "Volume size cannot exceed 1000GB. You must specify an integer between 1 and 1000." @@ -310,7 +341,7 @@ trans.app.model.CloudUserCredentials.table.c.name==credName ).first() uci.user= user uci.total_size = volSize # This is OK now because new instance is being created. - uci.state = "new" # Valid states include: "new, "available", "running", "pending", "submitted", "terminating", or "error" + uci.state = "newUCI" storage = model.CloudStore() storage.user = user storage.uci = uci @@ -403,15 +434,20 @@ Add user's cloud credentials stored under name `credName`. """ user = trans.get_user() - cred_error = accessKey_error = secretKey_error = provider_error = None + error = {} + if credName: if len( credName ) > 255: - cred_error = "Credentials name exceeds maximum allowable length." + error['cred_error'] = "Credentials name exceeds maximum allowable length." elif trans.app.model.CloudUserCredentials.filter( trans.app.model.CloudUserCredentials.table.c.name==credName ).first(): - cred_error = "Credentials with that name already exist." + error['cred_error'] = "Credentials with that name already exist." elif ( ( providerName.lower()!='ec2' ) and ( providerName.lower()!='eucalyptus' ) ): - provider_error = "You specified an unsupported cloud provider." + error['provider_error'] = "You specified an unsupported cloud provider." + elif accessKey=='' or len( accessKey ) > 255: + error['access_key_error'] = "Access key much be between 1 and 255 characters long." + elif secretKey=='' or len( secretKey ) > 255: + error['secret_key_error'] = "Secret key much be between 1 and 255 characters long." else: # Create new user stored credentials credentials = model.CloudUserCredentials() @@ -430,12 +466,21 @@ # if defaultCred: # self.makeDefault( trans, credentials.id) return self.list( trans ) - return trans.show_form( - web.FormBuilder( web.url_for(), "Add credentials", submit_text="Add" ) - .add_text( "credName", "Credentials name", value="Unnamed credentials", error=cred_error ) - .add_text( "providerName", "Cloud provider name", value="ec2 or eucalyptus", error=provider_error ) - .add_text( "accessKey", "Access key", value='', error=accessKey_error ) - .add_password( "secretKey", "Secret key", value='', error=secretKey_error ) ) + + return trans.fill_template( "cloud/add_credentials.mako", \ + credName = credName, \ + providerName = providerName, \ + accessKey = accessKey, \ + secretKey = secretKey, \ + error = error + ) + +# return trans.show_form( +# web.FormBuilder( web.url_for(), "Add credentials", submit_text="Add" ) +# .add_text( "credName", "Credentials name", value="Unnamed credentials", error=cred_error ) +# .add_text( "providerName", "Cloud provider name", value="ec2 or eucalyptus", error=provider_error ) +# .add_text( "accessKey", "Access key", value='', error=accessKey_error ) +# .add_password( "secretKey", "Secret key", value='', error=secretKey_error ) ) @web.expose @web.require_login( "view credentials" ) @@ -1018,7 +1063,7 @@ # Retrieve cloud instance based on passed instance id. get_all_instances( idLst ) method returns reservation ID. Because # we are passing only 1 ID, we can retrieve only the first element of the returning list. Furthermore, because (for now!) # only 1 instance corresponds each individual reservation, grab only the first element of the returned list of instances. - cloudInstance = conn.get_all_instances( idLst )[0].instances[0] + cloudInstance = conn.get_all_instances( [instance_id] )[0].instances[0] return cloudInstance def get_connection( trans, credName ): diff -r 7c438fd3cf4a -r 6aab50510e43 templates/cloud/add_credentials.mako --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/templates/cloud/add_credentials.mako Wed Oct 14 19:20:11 2009 -0400 @@ -0,0 +1,97 @@ +<% _=n_ %> +<%inherit file="/base.mako"/> +<%def name="title()">Add credentials</%def> + +<%def name="javascripts()"> +${parent.javascripts()} +<script type="text/javascript"> +$(function(){ + $("input:text:first").focus(); +}) +</script> +</%def> + +%if header: + ${header} +%endif + +<div class="form"> + <div class="form-title">Add credentials</div> + <div class="form-body"> + <form name="Add credentials" action="${h.url_for( action='add' )}" method="post" > + + <% + cls = "form-row" + if error.has_key('cred_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Credentials name:</label> + <div class="form-row-input"> + <input type="text" name="credName" value="Unnamed credentials" size="40"> + </div> + %if error.has_key('cred_error'): + <div class="form-row-error-message">${error['cred_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + + <% + cls = "form-row" + if error.has_key('provider_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Cloud provider name:</label> + <div class="form-row-input"> + <select name="providerName"> + <option value="ec2">Amazon EC2</option> + <option value="eucalyptus">Eucalpytus Public Cloud (EPC)</option> + </select> + </div> + %if error.has_key('provider_error'): + <div class="form-row-error-message">${error['provider_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + <% + cls = "form-row" + if error.has_key('access_key_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Access key:</label> + <div class="form-row-input"> + <input type="text" name="accessKey" value="" size="40"> + </div> + %if error.has_key('access_key_error'): + <div class="form-row-error-message">${error['access_key_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + + <% + cls = "form-row" + if error.has_key('secret_key_error'): + cls += " form-row-error" + %> + <div class="${cls}"> + <label>Secret key:</label> + <div class="form-row-input"> + <input type="password" name="secretKey" value="" size="40"> + </div> + %if error.has_key('secret_key_error'): + <div class="form-row-error-message">${error['secret_key_error']}</div> + %endif + <div style="clear: both"></div> + </div> + + + <div class="form-row"><input type="submit" value="Add"></div> + + </form> + </div> +</div>
participants (1)
-
Greg Von Kuster