[hg] galaxy 3095: Ported DB interaction to be compatible with Sq...
details: http://www.bx.psu.edu/hg/galaxy/rev/39502dd3fd23 changeset: 3095:39502dd3fd23 user: Enis Afgan <afgane@gmail.com> date: Mon Nov 16 20:37:47 2009 -0500 description: Ported DB interaction to be compatible with Sqlalchemy 0.5.6 diffstat: lib/galaxy/cloud/__init__.py | 425 +++++++++++++++++++---------------- lib/galaxy/cloud/providers/ec2.py | 341 ++++++++++++++++----------- lib/galaxy/cloud/providers/eucalyptus.py | 432 ++++++++++++++++++++++-------------- lib/galaxy/config.py | 9 +- lib/galaxy/web/controllers/cloud.py | 151 +++++------ templates/cloud/configure_cloud.mako | 3 +- 6 files changed, 768 insertions(+), 593 deletions(-) diffs (2608 lines): diff -r 7d013eb98022 -r 39502dd3fd23 lib/galaxy/cloud/__init__.py --- a/lib/galaxy/cloud/__init__.py Thu Nov 12 16:36:07 2009 -0500 +++ b/lib/galaxy/cloud/__init__.py Mon Nov 16 20:37:47 2009 -0500 @@ -60,6 +60,7 @@ """ def __init__( self, app ): self.app = app + self.sa_session = app.model.context if self.app.config.get_bool( "enable_cloud_execution", True ): # The dispatcher manager for underlying cloud instances - implements and contacts individual cloud providers self.provider = CloudProvider( app ) @@ -99,6 +100,7 @@ # Keep track of the pid that started the cloud manager, only it # has valid threads self.parent_pid = os.getpid() + self.sa_session = app.model.context # Contains requests that are waiting (only use from monitor thread) self.waiting = [] @@ -122,7 +124,6 @@ cnt = 0 # Run global update only periodically so keep counter variable while self.running: try: -# log.debug( "Calling monitor_step" ) self.__monitor_step() if cnt%30 == 0: # Run global update every 30 iterations (1 minute) self.provider.update() @@ -144,27 +145,23 @@ it is marked as having errors and removed from the queue. Otherwise, the job is dispatched. """ - # Get an orm (object relational mapping) session - session = mapping.Session() + model = self.app.model new_requests = [] - for r in session.query( model.UCI ) \ - .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, - model.UCI.c.state==uci_states.SUBMITTED_UCI, - model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, - model.UCI.c.state==uci_states.DELETING_UCI, - model.UCI.c.state==uci_states.SNAPSHOT_UCI ) ) \ + for r in self.sa_session.query( model.UCI ) \ + .filter( or_( model.UCI.table.c.state==uci_states.NEW_UCI, + model.UCI.table.c.state==uci_states.SUBMITTED_UCI, + model.UCI.table.c.state==uci_states.SHUTTING_DOWN_UCI, + model.UCI.table.c.state==uci_states.DELETING_UCI, + model.UCI.table.c.state==uci_states.SNAPSHOT_UCI ) ) \ .all(): - uci_wrapper = UCIwrapper( r ) + uci_wrapper = UCIwrapper( r, self.app ) new_requests.append( uci_wrapper ) for uci_wrapper in new_requests: - session.clear() + self.sa_session.expunge_all() self.put( uci_wrapper ) - - # Done with the session - mapping.Session.remove() - + def put( self, uci_wrapper ): """Add a request to the queue.""" self.provider.put( uci_wrapper ) @@ -186,8 +183,10 @@ """ Wraps 'model.UCI' with convenience methods for state management """ - def __init__( self, uci ): + def __init__( self, uci, app ): self.uci_id = uci.id + self.app = app + self.sa_session = self.app.model.context # --------- Setter methods ----------------- @@ -199,56 +198,61 @@ """ # log.debug( "Changing state - new uci_state: %s, instance_id: %s, i_state: %s" % ( uci_state, instance_id, i_state ) ) if uci_state is not None: - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) uci.state = uci_state - uci.flush() + self.sa_session.flush() if ( instance_id is not None ) and ( i_state is not None ): - instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=instance_id).first() + instance = self.sa_session.query( model.CloudInstance ).filter_by( uci_id=self.uci_id, instance_id=instance_id).first() instance.state = i_state - instance.flush() + self.sa_session.add( instance ) + self.sa_session.flush() def set_mi( self, i_index, mi_id ): """ Sets Machine Image (MI), e.g., 'ami-66fa190f', for UCI's instance with given index as it is stored in local Galaxy database. """ - mi = model.CloudImage.filter( model.CloudImage.c.image_id==mi_id ).first() - instance = model.CloudInstance.get( i_index ) + mi = self.sa_session.query( model.CloudImage ).filter( model.CloudImage.table.c.image_id==mi_id ).first() + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) instance.image = mi - instance.flush() + self.sa_session.add( instance ) + self.sa_session.flush() def set_key_pair( self, key_name, key_material=None ): """ Sets key pair value for current UCI. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) uci.key_pair_name = key_name if key_material is not None: uci.key_pair_material = key_material - uci.flush() + self.sa_session.flush() - def set_launch_time( self, launch_time, i_index=None, i_id=None ): + def set_instance_launch_time( self, launch_time, i_index=None, i_id=None ): """ Stores launch time in local database for instance with specified index - i_index (as it is stored in local Galaxy database) or with specified instance ID - i_id (as obtained from the cloud provider AND stored in local Galaxy Database). Either 'i_index' or 'i_id' needs to be provided. """ if i_index != None: - instance = model.CloudInstance.get( i_index ) - instance.launch_time = launch_time - instance.flush() + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) elif i_id != None: - instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() - instance.launch_time = launch_time - instance.flush() + instance = self.sa_session.query( model.CloudInstance ).filter_by( uci_id=self.uci_id, instance_id=i_id).first() + else: + return None + + instance.launch_time = launch_time + self.sa_session.add( instance ) + self.sa_session.flush() def set_uci_launch_time( self, launch_time ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) uci.launch_time = launch_time - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() def set_stop_time( self, stop_time, i_index=None, i_id=None ): """ @@ -257,20 +261,16 @@ in local Galaxy Database). Either 'i_index' or 'i_id' needs to be provided. """ if i_index != None: - instance = model.CloudInstance.get( i_index ) - instance.stop_time = stop_time - instance.flush() + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) elif i_id != None: - instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() - instance.stop_time = stop_time - instance.flush() + instance = self.sa_session.query( model.CloudInstance ).filter_by( uci_id=self.uci_id, instance_id=i_id).first() + else: + return None + + instance.stop_time = stop_time + self.sa_session.add( instance ) + self.sa_session.flush() - def reset_uci_launch_time( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.launch_time = None - uci.flush() - def set_security_group_name( self, security_group_name, i_index=None, i_id=None ): """ Stores security group name in local database for instance with specified index - i_index (as it is stored in local @@ -278,90 +278,107 @@ in local Galaxy Database). Either 'i_index' or 'i_id' needs to be provided. """ if i_index != None: - instance = model.CloudInstance.get( i_index ) - instance.security_group = security_group_name - instance.flush() + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) elif i_id != None: - instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() - instance.security_group = security_group_name - instance.flush() + instance = self.sa_session.query( model.CloudInstance ).filter_by( uci_id=self.uci_id, instance_id=i_id).first() + else: + return None + + instance.security_group = security_group_name + self.sa_session.add( instance ) + self.sa_session.flush() def set_reservation_id( self, i_index, reservation_id ): - instance = model.CloudInstance.get( i_index ) + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) instance.reservation_id = reservation_id - instance.flush() + self.sa_session.add( instance ) + self.sa_session.flush() def set_instance_id( self, i_index, instance_id ): """ i_index refers to UCI's instance ID as stored in local database instance_id refers to real-world, cloud resource ID (e.g., 'i-78hd823a') """ - instance = model.CloudInstance.get( i_index ) + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) instance.instance_id = instance_id - instance.flush() + self.sa_session.add( instance ) + self.sa_session.flush() - def set_public_dns( self, instance_id, public_dns ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.instance[instance_id].public_dns = public_dns - uci.instance[instance_id].flush() +# def set_public_dns( self, instance_id, public_dns ): +# uci = self.sa_session.query( model.UCI ).get( self.uci_id ) +# self.sa_session.refresh( uci ) +# uci.instance[instance_id].public_dns = public_dns +# uci.instance[instance_id].flush() +# +# def set_private_dns( self, instance_id, private_dns ): +# uci = self.sa_session.query( model.UCI ).get( self.uci_id ) +# self.sa_session.refresh( uci ) +# uci.instance[instance_id].private_dns = private_dns +# uci.instance[instance_id].flush() - def set_private_dns( self, instance_id, private_dns ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.instance[instance_id].private_dns = private_dns - uci.instance[instance_id].flush() - + def reset_uci_launch_time( self ): + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) + uci.launch_time = None + self.sa_session.add( uci ) + self.sa_session.flush() + + def set_error( self, error, set_state=False ): + """ + Sets error field of given UCI in local Galaxy database as well as any instances associated with + this UCI whose state is 'None' or 'SUBMITTED'. If set_state is set to 'true', + method also sets state of give UCI and corresponding instances to 'error' + """ + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) + uci.error = error + if set_state: + uci.state = uci_states.ERROR + # Process all instances associated with this UCI + instances = self.sa_session.query( model.CloudInstance ) \ + .filter_by( uci=uci ) \ + .filter( or_( model.CloudInstance.table.c.state==None, model.CloudInstance.table.c.state==instance_states.SUBMITTED ) ) \ + .all() + for i in instances: + i.error = error + i.state = instance_states.ERROR + self.sa_session.add( i ) + self.sa_session.flush() + + self.sa_session.add( uci ) + self.sa_session.flush() + + def set_deleted( self ): + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) + uci.state = uci_states.DELETED # for bookkeeping reasons, mark as deleted but don't actually delete. + uci.deleted = True + self.sa_session.add( uci ) + self.sa_session.flush() + def set_store_device( self, store_id, device ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) uci.store[store_id].device = device uci.store[store_id].flush() def set_store_error( self, error, store_index=None, store_id=None ): if store_index != None: - store = model.CloudStore.get( store_index ) + store = self.sa_session.query( model.CloudStore ).get( store_index ) elif store_id != None: - store = model.CloudStore.filter_by( volume_id = store_id ).first() + store = self.sa_session.query( model.CloudStore ).filter_by( volume_id = store_id ).first() else: return None store.error = error - store.flush() + self.sa_session.add( store ) + self.sa_session.flush() def set_store_status( self, vol_id, status ): - vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first() + vol = self.sa_session.query( model.CloudStore ).filter( model.CloudStore.table.c.volume_id == vol_id ).first() vol.status = status - vol.flush() - - def set_snapshot_id( self, snap_index, id ): - snap = model.CloudSnapshot.get( snap_index ) - snap.snapshot_id = id - snap.flush() - - def set_snapshot_status( self, status, snap_index=None, snap_id=None ): - if snap_index != None: - snap = model.CloudSnapshot.get( snap_index ) - elif snap_id != None: - snap = model.CloudSnapshot.filter_by( snapshot_id = snap_id).first() - else: - return - snap.status = status - snap.flush() - - def set_snapshot_error( self, error, snap_index=None, snap_id=None, set_status=False ): - if snap_index != None: - snap = model.CloudSnapshot.get( snap_index ) - elif snap_id != None: - snap = model.CloudSnapshot.filter_by( snapshot_id = snap_id).first() - else: - return - snap.error = error - - if set_status: - snap.status = snapshot_status.ERROR - - snap.flush() + self.sa_session.add( vol ) + self.sa_session.flush() def set_store_availability_zone( self, availability_zone, vol_id=None ): """ @@ -369,78 +386,91 @@ UCI or for the volume whose volume ID (e.g., 'vol-39F80512') is provided as argument. """ if vol_id is not None: - vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).all() + vol = self.sa_session.query( model.CloudStore ).filter( model.CloudStore.table.c.volume_id == vol_id ).all() else: - vol = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() + vol = self.sa_session.query( model.CloudStore ).filter( model.CloudStore.table.c.uci_id == self.uci_id ).all() for v in vol: v.availability_zone = availability_zone - v.flush() + self.sa_session.add( v ) + self.sa_session.flush() - def set_store_volume_id( self, store_id, volume_id ): + def set_store_volume_id( self, store_index, volume_id ): """ - Given store ID associated with this UCI, set volume ID as it is registered + Given store index associated with this UCI in local database, set volume ID as it is registered on the cloud provider (e.g., vol-39890501) """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.store[store_id].volume_id = volume_id - uci.store[store_id].flush() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) + uci.store[store_index].volume_id = volume_id + #uci.store[store_index].flush() + self.sa_session.add( uci ) + self.sa_session.flush() def set_store_instance( self, vol_id, instance_id ): """ Stores instance ID that given store volume is attached to. Store volume ID should be given in following format: 'vol-78943248' """ - vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first() + vol = self.sa_session.query( model.CloudStore ).filter( model.CloudStore.table.c.volume_id == vol_id ).first() vol.i_id = instance_id - vol.flush() + self.sa_session.add( vol ) + self.sa_session.flush() + + def set_snapshot_id( self, snap_index, id ): + snap = model.CloudSnapshot.get( snap_index ) - def set_error( self, error, set_state=False ): - """ - Sets error field of given UCI in local Galaxy database as well as any instances associated with - this UCI whose state is 'None' or 'SUBMITTED'. If set_state is set to 'true', - method also sets state of give UCI and corresponding instances to 'error' - """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.error = error - if set_state: - uci.state = uci_states.ERROR - instances = model.CloudInstance \ - .filter_by( uci=uci ) \ - .filter( or_( model.CloudInstance.c.state==None, model.CloudInstance.c.state==instance_states.SUBMITTED ) ) \ - .all() - for i in instances: - i.error = error - i.state = instance_states.ERROR - i.flush() - uci.flush() + snap.snapshot_id = id + self.sa_session.add( snap ) + self.sa_session.flush() + + def set_snapshot_status( self, status, snap_index=None, snap_id=None ): + if snap_index != None: + snap = self.sa_session.query( model.CloudSnapshot ).get( snap_index ) + elif snap_id != None: + snap = self.sa_session.query( model.CloudSnapshot ).filter_by( snapshot_id = snap_id).first() + else: + return + snap.status = status + self.sa_session.add( snap ) + self.sa_session.flush() - def set_deleted( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() - uci.state = uci_states.DELETED # for bookkeeping reasons, mark as deleted but don't actually delete. - uci.deleted = True - uci.flush() + def set_snapshot_error( self, error, snap_index=None, snap_id=None, set_status=False ): + if snap_index != None: + snap = self.sa_session.query( model.CloudSnapshot ).get( snap_index ) + elif snap_id != None: + snap = self.sa_session.query( model.CloudSnapshot ).filter_by( snapshot_id = snap_id).first() + else: + return + snap.error = error + if set_status: + snap.status = snapshot_status.ERROR + + self.sa_session.add( snap ) + self.sa_session.flush() # --------- Getter methods ----------------- def get_provider_type( self ): """ Returns type of cloud provider associated with given UCI. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() -# cred_id = uci.credentials_id -# cred = model.CloudUserCredentials.get( cred_id ) + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.credentials.provider.type - def get_type( self, i_index ): - instance = model.CloudInstance.get( i_index ) + def get_provider( self ): + """ Returns database object of cloud provider associated with credentials of given UCI. """ + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) + return uci.credentials.provider + + def get_instance_type( self, i_index ): + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) + self.sa_session.refresh( instance ) return instance.type - def get_state( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + def get_uci_state( self ): + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.state def get_instances_indexes( self, state=None ): @@ -448,9 +478,12 @@ Returns indexes of instances associated with given UCI as they are stored in local Galaxy database and whose state corresponds to passed argument. Returned values enable indexing instances from local Galaxy database. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() - instances = model.CloudInstance.filter_by( uci=uci ).filter( model.CloudInstance.c.state==state ).all() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) + instances = self.sa_session.query( model.CloudInstance ) \ + .filter_by( uci=uci ) \ + .filter( model.CloudInstance.table.c.state==state ) \ + .all() il = [] for i in instances: il.append( i.id ) @@ -458,40 +491,46 @@ return il def get_instance_state( self, instance_id ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.instance[instance_id].state def get_instances_ids( self ): """ - Returns list IDs of all instances' associated with this UCI that are not in 'terminated' state - (e.g., ['i-402906D2', 'i-q0290dsD2'] ). + Returns list IDs of all instances' associated with this UCI that are not in 'terminated' or + 'error' but the state is defined (i.e., state is not None) + (e.g., return value: ['i-402906D2', 'i-q0290dsD2'] ). """ - il = model.CloudInstance.filter_by( uci_id=self.uci_id ).filter( model.CloudInstance.c.state != 'terminated' ).all() + il = self.sa_session.query( model.CloudInstance ) \ + .filter_by( uci_id=self.uci_id ) \ + .filter( or_( model.CloudInstance.table.c.state != 'terminated', + model.CloudInstance.table.c.state != 'error', + model.CloudInstance.table.c.state != None ) ) \ + .all() instanceList = [] for i in il: instanceList.append( i.instance_id ) return instanceList def get_name( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.name def get_key_pair_name( self ): """ Returns keypair name associated with given UCI. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.key_pair_name def get_key_pair_material( self ): """ Returns keypair material (i.e., private key) associated with given UCI. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.key_pair_material def get_security_group_name( self, i_index=None, i_id=None ): @@ -501,35 +540,35 @@ with given instance. """ if i_index != None: - instance = model.CloudInstance.get( i_index ) + instance = self.sa_session.query( model.CloudInstance ).get( i_index ) return instance.security_group elif i_id != None: - instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + instance = self.sa_session.query( model.CloudInstance ).filter_by( uci_id=self.uci_id, instance_id=i_id).first() return instance.security_group def get_access_key( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.credentials.access_key def get_secret_key( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.credentials.secret_key def get_mi_id( self, instance_id=0 ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.instance[instance_id].mi_id def get_public_dns( self, instance_id=0 ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.instance[instance_id].public_dns def get_private_dns( self, instance_id=0 ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.instance[instance_id].private_dns def get_uci_availability_zone( self ): @@ -539,13 +578,13 @@ availability zone, availability of a UCI is determined by availability zone of any one storage volume. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.store[0].availability_zone def get_store_size( self, store_id=0 ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.store[store_id].size def get_store_volume_id( self, store_id=0 ): @@ -553,33 +592,27 @@ Given store ID associated with this UCI, get volume ID as it is registered on the cloud provider (e.g., 'vol-39890501') """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.store[store_id].volume_id def get_all_stores( self ): """ Returns all storage volumes' database objects associated with this UCI. """ - return model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() + return self.sa_session.query( model.CloudStore ).filter( model.CloudStore.table.c.uci_id == self.uci_id ).all() def get_snapshots( self, status=None ): """ Returns database objects for all snapshots associated with this UCI and in given status.""" - return model.CloudSnapshot.filter_by( uci_id=self.uci_id, status=status ).all() + return self.sa_session.query( model.CloudSnapshot ).filter_by( uci_id=self.uci_id, status=status ).all() def get_uci( self ): """ Returns database object for given UCI. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci - def get_provider( self ): - """ Returns database object of cloud provider associated with credentials of given UCI. """ - uci = model.UCI.get( self.uci_id ) - uci.refresh() - return uci.credentials.provider - def uci_launch_time_set( self ): - uci = model.UCI.get( self.uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( self.uci_id ) + self.sa_session.refresh( uci ) return uci.launch_time class CloudProvider( object ): diff -r 7d013eb98022 -r 39502dd3fd23 lib/galaxy/cloud/providers/ec2.py --- a/lib/galaxy/cloud/providers/ec2.py Thu Nov 12 16:36:07 2009 -0500 +++ b/lib/galaxy/cloud/providers/ec2.py Mon Nov 16 20:37:47 2009 -0500 @@ -74,6 +74,7 @@ self.zone = "us-east-1a" self.security_group = "galaxyWeb" self.queue = Queue() + self.sa_session = app.model.context self.threads = [] nworkers = 5 @@ -84,13 +85,26 @@ self.threads.append( worker ) log.debug( "%d EC2 cloud workers ready", nworkers ) + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads in EC2 cloud manager" ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) + log.info( "EC2 cloud manager stopped" ) + + def put( self, uci_wrapper ): + # Get rid of UCI from state description + state = uci_wrapper.get_uci_state() + uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) + self.queue.put( uci_wrapper ) + def run_next( self ): """Run the next job, waiting until one is available if necessary""" cnt = 0 while 1: uci_wrapper = self.queue.get() - uci_state = uci_wrapper.get_state() + uci_state = uci_wrapper.get_uci_state() if uci_state is self.STOP_SIGNAL: return try: @@ -194,13 +208,13 @@ """ Get appropriate machine image (mi) based on instance size. """ - i_type = uci_wrapper.get_type( i_index ) + i_type = uci_wrapper.get_instance_type( i_index ) if i_type=='m1.small' or i_type=='c1.medium': arch = 'i386' else: arch = 'x86_64' - mi = model.CloudImage.filter_by( deleted=False, provider_type=self.type, architecture=arch ).first() + mi = self.sa_session.query( model.CloudImage ).filter_by( deleted=False, provider_type=self.type, architecture=arch ).first() if mi: return mi.image_id else: @@ -209,19 +223,6 @@ uci_wrapper.set_error( err+". Contact site administrator to ensure needed machine image is registered.", True ) return None - def shutdown( self ): - """Attempts to gracefully shut down the monitor thread""" - log.info( "sending stop signal to worker threads in EC2 cloud manager" ) - for i in range( len( self.threads ) ): - self.queue.put( self.STOP_SIGNAL ) - log.info( "EC2 cloud manager stopped" ) - - def put( self, uci_wrapper ): - # Get rid of UCI from state description - state = uci_wrapper.get_state() - uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) - self.queue.put( uci_wrapper ) - def createUCI( self, uci_wrapper ): """ Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider @@ -294,7 +295,8 @@ if conn.delete_volume( v.volume_id ): deletedList.append( v.volume_id ) v.deleted = True - v.flush() + self.sa_session.add( v ) + self.sa_session.flush() count += 1 else: failedList.append( v.volume_id ) @@ -308,8 +310,8 @@ if count == len( vl ): uci_wrapper.set_deleted() else: - err = "Deleting following volume(s) failed: "+failedList+". However, these volumes were successfully deleted: "+deletedList+". \ - MANUAL intervention and processing needed." + err = "Deleting following volume(s) failed: " + str( failedList ) + ". However, these volumes were successfully deleted: " \ + + str( deletedList ) + ". MANUAL intervention and processing needed." log.error( err ) uci_wrapper.set_error( err, True ) @@ -317,7 +319,7 @@ """ Creates snapshot of all storage volumes associated with this UCI. """ - if uci_wrapper.get_state() != uci_states.ERROR: + if uci_wrapper.get_uci_state() != uci_states.ERROR: conn = self.get_connection( uci_wrapper ) snapshots = uci_wrapper.get_snapshots( status = snapshot_status.SUBMITTED ) @@ -361,7 +363,7 @@ """ Starts instance(s) of given UCI on the cloud. """ - if uci_wrapper.get_state() != uci_states.ERROR: + if uci_wrapper.get_uci_state() != uci_states.ERROR: conn = self.get_connection( uci_wrapper ) self.check_key_pair( uci_wrapper, conn ) if uci_wrapper.get_key_pair_name() == None: @@ -379,70 +381,73 @@ log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name(): %s" % ( mi_id, uci_wrapper.get_key_pair_name() ) ) uci_wrapper.set_mi( i_index, mi_id ) - # Check if galaxy security group exists (and create it if it does not) - log.debug( "Setting up '%s' security group." % self.security_group ) - try: - conn.get_all_security_groups( [self.security_group] ) # security groups - except boto.exception.EC2ResponseError, e: - if e.code == 'InvalidGroup.NotFound': - log.info( "No security group found, creating security group '%s'" % self.security_group ) - try: - gSecurityGroup = conn.create_security_group(self.security_group, 'Security group for Galaxy.') - gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port - gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port - except boto.exception.EC2ResponseError, ee: - err = "EC2 response error while creating security group: " + str( ee ) + if mi_id != None: + # Check if galaxy security group exists (and create it if it does not) + log.debug( "Setting up '%s' security group." % self.security_group ) + try: + conn.get_all_security_groups( [self.security_group] ) # security groups + except boto.exception.EC2ResponseError, e: + if e.code == 'InvalidGroup.NotFound': + log.info( "No security group found, creating security group '%s'" % self.security_group ) + try: + gSecurityGroup = conn.create_security_group(self.security_group, 'Security group for Galaxy.') + gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port + gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port + except boto.exception.EC2ResponseError, ee: + err = "EC2 response error while creating security group: " + str( ee ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + err = "EC2 response error while retrieving security group: " + str( e ) log.error( err ) uci_wrapper.set_error( err, True ) - else: - err = "EC2 response error while retrieving security group: " + str( e ) - log.error( err ) - uci_wrapper.set_error( err, True ) - - if uci_wrapper.get_state() != uci_states.ERROR: - # Start an instance - log.debug( "Starting instance for UCI '%s'" % uci_wrapper.get_name() ) - #TODO: Once multiple volumes can be attached to a single instance, update 'userdata' composition - userdata = uci_wrapper.get_store_volume_id()+"|"+uci_wrapper.get_access_key()+"|"+uci_wrapper.get_secret_key() - log.debug( "Using following command: conn.run_instances( image_id='%s', key_name='%s', security_groups=['%s'], user_data=[OMITTED], instance_type='%s', placement='%s' )" - % ( mi_id, uci_wrapper.get_key_pair_name(), self.security_group, uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) ) - reservation = None - try: - reservation = conn.run_instances( image_id=mi_id, - key_name=uci_wrapper.get_key_pair_name(), - security_groups=[self.security_group], - user_data=userdata, - instance_type=uci_wrapper.get_type( i_index ), - placement=uci_wrapper.get_uci_availability_zone() ) - except boto.exception.EC2ResponseError, e: - err = "EC2 response error when starting UCI '"+ uci_wrapper.get_name() +"': " + str( e ) - log.error( err ) - uci_wrapper.set_error( err, True ) - except Exception, ex: - err = "Error when starting UCI '" + uci_wrapper.get_name() + "': " + str( ex ) - log.error( err ) - uci_wrapper.set_error( err, True ) - # Record newly available instance data into local Galaxy database - if reservation: - uci_wrapper.set_launch_time( self.format_time( reservation.instances[0].launch_time ), i_index=i_index ) - if not uci_wrapper.uci_launch_time_set(): - uci_wrapper.set_uci_launch_time( self.format_time( reservation.instances[0].launch_time ) ) + + if uci_wrapper.get_uci_state() != uci_states.ERROR: + # Start an instance + log.debug( "Starting instance for UCI '%s'" % uci_wrapper.get_name() ) + #TODO: Once multiple volumes can be attached to a single instance, update 'userdata' composition + userdata = uci_wrapper.get_store_volume_id()+"|"+uci_wrapper.get_access_key()+"|"+uci_wrapper.get_secret_key() + log.debug( "Using following command: conn.run_instances( image_id='%s', key_name='%s', security_groups=['%s'], user_data=[OMITTED], instance_type='%s', placement='%s' )" + % ( mi_id, uci_wrapper.get_key_pair_name(), self.security_group, uci_wrapper.get_instance_type( i_index ), uci_wrapper.get_uci_availability_zone() ) ) + reservation = None try: - uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) - # TODO: if more than a single instance will be started through single reservation, change this reference to element [0] - i_id = str( reservation.instances[0]).split(":")[1] - uci_wrapper.set_instance_id( i_index, i_id ) - s = reservation.instances[0].state - uci_wrapper.change_state( s, i_id, s ) - uci_wrapper.set_security_group_name( self.security_group, i_id=i_id ) - log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) + reservation = conn.run_instances( image_id=mi_id, + key_name=uci_wrapper.get_key_pair_name(), + security_groups=[self.security_group], + user_data=userdata, + instance_type=uci_wrapper.get_instance_type( i_index ), + placement=uci_wrapper.get_uci_availability_zone() ) except boto.exception.EC2ResponseError, e: - err = "EC2 response error when retrieving instance information for UCI '" + uci_wrapper.get_name() + "': " + str( e ) + err = "EC2 response error when starting UCI '"+ uci_wrapper.get_name() +"': " + str( e ) log.error( err ) uci_wrapper.set_error( err, True ) - else: - log.error( "UCI '%s' is in 'error' state, starting instance was aborted." % uci_wrapper.get_name() ) + except Exception, ex: + err = "Error when starting UCI '" + uci_wrapper.get_name() + "': " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + # Record newly available instance data into local Galaxy database + if reservation: + l_time = datetime.utcnow() + # uci_wrapper.set_instance_launch_time( self.format_time( reservation.instances[0].launch_time ), i_index=i_index ) + uci_wrapper.set_instance_launch_time( l_time, i_index=i_index ) + if not uci_wrapper.uci_launch_time_set(): + uci_wrapper.set_uci_launch_time( l_time ) + try: + uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) + # TODO: if more than a single instance will be started through single reservation, change this reference to element [0] + i_id = str( reservation.instances[0]).split(":")[1] + uci_wrapper.set_instance_id( i_index, i_id ) + s = reservation.instances[0].state + uci_wrapper.change_state( s, i_id, s ) + uci_wrapper.set_security_group_name( self.security_group, i_id=i_id ) + log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_uci_state() ) ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error when retrieving instance information for UCI '" + uci_wrapper.get_name() + "': " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + log.error( "UCI '%s' is in 'error' state, starting instance was aborted." % uci_wrapper.get_name() ) else: err = "No instances in state '"+ instance_states.SUBMITTED +"' found for UCI '" + uci_wrapper.get_name() + \ "'. Nothing to start." @@ -459,8 +464,13 @@ # Get all instances associated with given UCI il = uci_wrapper.get_instances_ids() # instance list + # Process list of instances and remove any references to empty instance id's + for i in il: + if i is None: + l.remove( i ) + log.debug( 'List of instances being terminated: %s' % il ) rl = conn.get_all_instances( il ) # Reservation list associated with given instances - + # Initiate shutdown of all instances under given UCI cnt = 0 stopped = [] @@ -536,18 +546,22 @@ """ log.debug( "Running general status update for EC2 UCIs..." ) # Update instances - instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, - model.CloudInstance.c.state==instance_states.PENDING, - model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all() + instances = self.sa_session.query( model.CloudInstance ) \ + .filter( or_( model.CloudInstance.table.c.state==instance_states.RUNNING, + model.CloudInstance.table.c.state==instance_states.PENDING, + model.CloudInstance.table.c.state==instance_states.SHUTTING_DOWN ) ) \ + .all() for inst in instances: if self.type == inst.uci.credentials.provider.type: log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider.type, inst.instance_id ) ) self.updateInstance( inst ) # Update storage volume(s) - stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_status.IN_USE, - model.CloudStore.c.status==store_status.CREATING, - model.CloudStore.c.status==None ) ).all() + stores = self.sa_session.query( model.CloudStore ) \ + .filter( or_( model.CloudStore.table.c.status==store_status.IN_USE, + model.CloudStore.table.c.status==store_status.CREATING, + model.CloudStore.table.c.status==None ) ) \ + .all() for store in stores: if self.type == store.uci.credentials.provider.type: # and store.volume_id != None: log.debug( "[%s] Running general status update on store with local database ID: '%s'" % ( store.uci.credentials.provider.type, store.id ) ) @@ -564,7 +578,9 @@ # store.flush() # Update pending snapshots or delete ones marked for deletion - snapshots = model.CloudSnapshot.filter_by( status=snapshot_status.PENDING, status=snapshot_status.DELETE ).all() + snapshots = self.sa_session.query( model.CloudSnapshot ) \ + .filter_by( status=snapshot_status.PENDING, status=snapshot_status.DELETE ) \ + .all() for snapshot in snapshots: if self.type == snapshot.uci.credentials.provider.type and snapshot.status == snapshot_status.PENDING: log.debug( "[%s] Running general status update on snapshot '%s'" % ( snapshot.uci.credentials.provider.type, snapshot.snapshot_id ) ) @@ -574,11 +590,12 @@ self.delete_snapshot( snapshot ) # Attempt at updating any zombie UCIs (i.e., instances that have been in SUBMITTED state for longer than expected - see below for exact time) - zombies = model.UCI.filter_by( state=uci_states.SUBMITTED ).all() + zombies = self.sa_session.query( model.UCI ).filter_by( state=uci_states.SUBMITTED ).all() for zombie in zombies: - z_instances = model.CloudInstance.filter_by( uci_id=zombie.id) \ - .filter( or_( model.CloudInstance.c.state != instance_states.TERMINATED, - model.CloudInstance.c.state == None ) ) \ + z_instances = self.sa_session.query( model.CloudInstance ) \ + .filter_by( uci_id=zombie.id ) \ + .filter( or_( model.CloudInstance.table.c.state != instance_states.TERMINATED, + model.CloudInstance.table.c.state == None ) ) \ .all() for z_inst in z_instances: if self.type == z_inst.uci.credentials.provider.type: @@ -592,8 +609,8 @@ # Get credentials associated wit this instance uci_id = inst.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) # Get reservations handle for given instance @@ -604,6 +621,8 @@ log.error( err ) uci.error = err uci.state = uci_states.ERROR + self.sa_session.add( uci ) + self.sa_session.flush() return None # Because references to reservations are deleted shortly after instances have been terminated, getting an empty list as a response to a query @@ -618,8 +637,9 @@ inst.state = instance_states.TERMINATED uci.state = uci_states.ERROR uci.launch_time = None - inst.flush() - uci.flush() + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() # Update instance status in local DB with info from cloud provider for r in rl: for i, cInst in enumerate( r.instances ): @@ -628,45 +648,54 @@ log.debug( "Checking state of cloud instance '%s' associated with UCI '%s' and reservation '%s'. State='%s'" % ( cInst, uci.name, r, s ) ) if s != inst.state: inst.state = s - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() # After instance has shut down, ensure UCI is marked as 'available' if s == instance_states.TERMINATED and uci.state != uci_states.ERROR: uci.state = uci_states.AVAILABLE uci.launch_time = None - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. if s != uci.state and s != instance_states.TERMINATED: uci.state = s - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() if cInst.public_dns_name != inst.public_dns: inst.public_dns = cInst.public_dns_name - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() if cInst.private_dns_name != inst.private_dns: inst.private_dns = cInst.private_dns_name - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() except boto.exception.EC2ResponseError, e: err = "Updating instance status from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) log.error( err ) uci.error = err uci.state = uci_states.ERROR + self.sa_session.add( uci ) + self.sa_session.flush() return None def updateStore( self, store ): # Get credentials associated wit this store uci_id = store.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) # Get reservations handle for given store try: + log.debug( "Updating storage volume command: vl = conn.get_all_volumes( [%s] )" % store.volume_id ) vl = conn.get_all_volumes( [store.volume_id] ) except boto.exception.EC2ResponseError, e: err = "Retrieving volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) log.error( err ) uci.error = err uci.state = uci_states.ERROR - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() return None # Update store status in local DB with info from cloud provider @@ -677,29 +706,36 @@ # UCI state remained as 'new', try to remedy this by updating UCI state here if ( store.status == None ) and ( store.volume_id != None ): uci.state = vl[0].status - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() # If UCI was marked in state 'CREATING', update its status to reflect new status elif ( uci.state == uci_states.CREATING ): uci.state = vl[0].status - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() store.status = vl[0].status - store.flush() + self.sa_session.add( store ) + self.sa_session.flush() if store.i_id != vl[0].instance_id: store.i_id = vl[0].instance_id - store.flush() + self.sa_session.add( store ) + self.sa_session.flush() if store.attach_time != vl[0].attach_time: store.attach_time = vl[0].attach_time - store.flush() + self.sa_session.add( store ) + self.sa_session.flush() if store.device != vl[0].device: store.device = vl[0].device - store.flush() + self.sa_session.add( store ) + self.sa_session.flush() except boto.exception.EC2ResponseError, e: err = "Updating status of volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) log.error( err ) uci.error = err uci.state = uci_states.ERROR - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() return None else: err = "No storage volumes returned by cloud provider on general update" @@ -708,14 +744,15 @@ store.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - store.flush() + self.sa_session.add( uci ) + self.sa_session.add( store ) + self.sa_session.flush() def updateSnapshot( self, snapshot ): # Get credentials associated wit this store uci_id = snapshot.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) try: @@ -724,7 +761,8 @@ if len( snap ) > 0: log.debug( "Snapshot '%s' status: %s" % ( snapshot.snapshot_id, snap[0].status ) ) snapshot.status = snap[0].status - snapshot.flush() + self.sa_session.add( snapshot ) + self.sa_session.flush() else: err = "No snapshots returned by EC2 on general update" log.error( "%s for UCI '%s'" % ( err, uci.name ) ) @@ -732,8 +770,9 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() except boto.exception.EC2ResponseError, e: err = "EC2 response error while updating snapshot status: " + str( e ) log.error( err ) @@ -741,8 +780,9 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() except Exception, ex: err = "Error while updating snapshot status: " + str( ex ) log.error( err ) @@ -750,15 +790,16 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() def delete_snapshot( self, snapshot ): if snapshot.status == snapshot_status.DELETE: # Get credentials associated wit this store uci_id = snapshot.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) try: @@ -767,7 +808,8 @@ if snap == True: snapshot.deleted = True snapshot.status = snapshot_status.DELETED - snapshot.flush() + self.sa_session.add( snapshot ) + self.sa_session.flush() return snap except boto.exception.EC2ResponseError, e: err = "EC2 response error while deleting snapshot: " + str( e ) @@ -776,8 +818,9 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() except Exception, ex: err = "Error while deleting snapshot: " + str( ex ) log.error( err ) @@ -785,14 +828,16 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() else: err = "Cannot delete snapshot '"+snapshot.snapshot_id+"' because its status is '"+snapshot.status+"'. Only snapshots with '" + \ snapshot_status.COMPLETED+"' status can be deleted." log.error( err ) snapshot.error = err - snapshot.flush() + self.sa_session.add( snapshot ) + self.sa_session.flush() def processZombie( self, inst ): """ @@ -800,6 +845,10 @@ accordingly or if something else failed and instance was never started. Currently, no automatic repairs are being attempted; instead, appropriate error messages are set. """ + uci_id = inst.uci_id + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) + # Check if any instance-specific information was written to local DB; if 'yes', set instance and UCI's error message # suggesting manual check. if inst.launch_time != None or inst.reservation_id != None or inst.instance_id != None: @@ -820,9 +869,10 @@ try: state = rl[0].instances[0].update() inst.state = state - inst.uci.state = state - inst.flush() - inst.uci.flush() + uci.state = state + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() except: # something failed, so skip pass @@ -830,10 +880,12 @@ try: launch_time = self.format_time( rl[0].instances[0].launch_time ) inst.launch_time = launch_time - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() if inst.uci.launch_time == None: - inst.uci.launch_time = launch_time - inst.uci.flush() + uci.launch_time = launch_time + self.sa_session.add( uci ) + self.sa_session.flush() except: # something failed, so skip pass else: @@ -844,8 +896,9 @@ inst.uci.error = err inst.uci.state = uci_states.ERROR log.error( err ) - inst.flush() - inst.uci.flush() + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() else: #Instance most likely never got processed, so set error message suggesting user to try starting instance again. err = "Starting a machine instance (DB id: '"+str(inst.id)+"') associated with this UCI '" + str(inst.uci.name) + \ @@ -853,11 +906,12 @@ "starting the instance again." inst.error = err inst.state = instance_states.ERROR - inst.uci.error = err - inst.uci.state = uci_states.ERROR + uci.error = err + uci.state = uci_states.ERROR log.error( err ) - inst.flush() - inst.uci.flush() + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() # uw = UCIwrapper( inst.uci ) # log.debug( "Try automatically re-submitting UCI '%s'." % uw.get_name() ) @@ -882,7 +936,8 @@ log.error( err ) uci.error = err uci.state = uci_states.ERROR - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() return None return conn @@ -895,7 +950,7 @@ # conn = self.get_connection( uci ) # # # Update status of storage volumes -# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() +# vl = model.CloudStore.filter( model.CloudInstance.table.c.uci_id == uci.id ).all() # vols = [] # for v in vl: # vols.append( v.volume_id ) @@ -911,7 +966,7 @@ # pass # # # Update status of instances -# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() +# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.table.c.state != 'terminated' ).all() # instanceList = [] # for i in il: # instanceList.append( i.instance_id ) diff -r 7d013eb98022 -r 39502dd3fd23 lib/galaxy/cloud/providers/eucalyptus.py --- a/lib/galaxy/cloud/providers/eucalyptus.py Thu Nov 12 16:36:07 2009 -0500 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Mon Nov 16 20:37:47 2009 -0500 @@ -73,6 +73,7 @@ self.type = "eucalyptus" # cloud provider type (e.g., ec2, eucalyptus, opennebula) self.zone = "epc" self.queue = Queue() + self.sa_session = app.model.context self.threads = [] nworkers = 5 @@ -83,12 +84,28 @@ self.threads.append( worker ) log.debug( "%d eucalyptus cloud workers ready", nworkers ) + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads in eucalyptus cloud manager" ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) + log.info( "eucalyptus cloud manager stopped" ) + + def put( self, uci_wrapper ): + """ + Adds uci_wrapper object to the end of the request queue to be handled by + this cloud provider. + """ + state = uci_wrapper.get_uci_state() + uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) + self.queue.put( uci_wrapper ) + def run_next( self ): - """Run the next job, waiting until one is available if necessary""" + """Process next request, waiting until one is available if necessary.""" cnt = 0 while 1: uci_wrapper = self.queue.get() - uci_state = uci_wrapper.get_state() + uci_state = uci_wrapper.get_uci_state() if uci_state is self.STOP_SIGNAL: return try: @@ -109,7 +126,7 @@ def get_connection( self, uci_wrapper ): """ - Establishes eucalyptus cloud connection using user's credentials associated with given UCI + Establishes cloud connection using user's credentials associated with given UCI """ log.debug( 'Establishing %s cloud connection.' % self.type ) provider = uci_wrapper.get_provider() @@ -137,7 +154,10 @@ def check_key_pair( self, uci_wrapper, conn ): """ - Generate key pair using user's credentials + Check if a key pair associated with this UCI exists on cloud provider. + If yes, return key pair name; otherwise, generate a key pair with the cloud + provider and, again, return key pair name. + Key pair name for given UCI is generated from UCI's name and suffix '_kp' """ kp = None kp_name = uci_wrapper.get_name().replace(' ','_') + "_kp" @@ -185,6 +205,7 @@ return None def create_key_pair( self, conn, kp_name ): + """ Initiate creation of key pair under kp_name by current cloud provider. """ try: return conn.create_key_pair( kp_name ) except boto.exception.EC2ResponseError, e: @@ -192,15 +213,15 @@ def get_mi_id( self, uci_wrapper, i_index ): """ - Get appropriate machine image (mi) based on instance size. + Get appropriate machine image (mi) ID based on instance type. """ - i_type = uci_wrapper.get_type( i_index ) + i_type = uci_wrapper.get_instance_type( i_index ) if i_type=='m1.small' or i_type=='c1.medium': arch = 'i386' else: arch = 'x86_64' - mi = model.CloudImage.filter_by( deleted=False, provider_type=self.type, architecture=arch ).first() + mi = self.sa_session.query( model.CloudImage ).filter_by( deleted=False, provider_type=self.type, architecture=arch ).first() if mi: return mi.image_id else: @@ -209,23 +230,10 @@ uci_wrapper.set_error( err+". Contact site administrator to ensure needed machine image is registered.", True ) return None - def shutdown( self ): - """Attempts to gracefully shut down the monitor thread""" - log.info( "sending stop signal to worker threads in eucalyptus cloud manager" ) - for i in range( len( self.threads ) ): - self.queue.put( self.STOP_SIGNAL ) - log.info( "eucalyptus cloud manager stopped" ) - - def put( self, uci_wrapper ): - # Get rid of UCI from state description - state = uci_wrapper.get_state() - uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) - self.queue.put( uci_wrapper ) - def createUCI( self, uci_wrapper ): """ - Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider - and registers relevant information in Galaxy database. + Create User Configured Instance (UCI) - i.e., create storage volume on cloud provider + and register relevant information in local Galaxy database. """ conn = self.get_connection( uci_wrapper ) @@ -270,8 +278,11 @@ def deleteUCI( self, uci_wrapper ): """ - Deletes UCI. NOTE that this implies deletion of any and all data associated + Delete UCI - i.e., delete all storage volumes associated with this UCI. + NOTE that this implies deletion of any and all data associated with this UCI from the cloud. All data will be deleted. + Information in local Galaxy database is marked as deleted but not actually removed + from the database. """ conn = self.get_connection( uci_wrapper ) vl = [] # volume list @@ -287,7 +298,8 @@ if conn.delete_volume( v.volume_id ): deletedList.append( v.volume_id ) v.deleted = True - v.flush() + self.sa_session.add( v ) + self.sa_session.flush() count += 1 else: failedList.append( v.volume_id ) @@ -301,16 +313,17 @@ if count == len( vl ): uci_wrapper.set_deleted() else: - err = "Deleting following volume(s) failed: "+failedList+". However, these volumes were successfully deleted: "+deletedList+". \ - MANUAL intervention and processing needed." + err = "Deleting following volume(s) failed: "+ str( failedList )+". However, these volumes were successfully deleted: " \ + + str( deletedList ) +". MANUAL intervention and processing needed." log.error( err ) uci_wrapper.set_error( err, True ) def snapshotUCI( self, uci_wrapper ): """ - Creates snapshot of all storage volumes associated with this UCI. + Initiate creation of a snapshot by cloud provider for all storage volumes + associated with this UCI. """ - if uci_wrapper.get_state() != uci_states.ERROR: + if uci_wrapper.get_uci_state() != uci_states.ERROR: conn = self.get_connection( uci_wrapper ) snapshots = uci_wrapper.get_snapshots( status = snapshot_status.SUBMITTED ) @@ -337,7 +350,7 @@ uci_wrapper.change_state( uci_state=uci_states.AVAILABLE ) -# if uci_wrapper.get_state() != uci_states.ERROR: +# if uci_wrapper.get_uci_state() != uci_states.ERROR: # # snapshots = uci_wrapper.get_snapshots( status = 'submitted' ) # for snapshot in snapshots: @@ -363,9 +376,9 @@ def startUCI( self, uci_wrapper ): """ - Starts instance(s) of given UCI on the cloud. + Start instance(s) of given UCI on the cloud. """ - if uci_wrapper.get_state() != uci_states.ERROR: + if uci_wrapper.get_uci_state() != uci_states.ERROR: conn = self.get_connection( uci_wrapper ) self.check_key_pair( uci_wrapper, conn ) if uci_wrapper.get_key_pair_name() == None: @@ -383,16 +396,16 @@ log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name(): %s" % ( mi_id, uci_wrapper.get_key_pair_name() ) ) uci_wrapper.set_mi( i_index, mi_id ) - if uci_wrapper.get_state() != uci_states.ERROR: + if uci_wrapper.get_uci_state() != uci_states.ERROR: # Start an instance log.debug( "Starting UCI instance '%s'" % uci_wrapper.get_name() ) log.debug( "Using following command: conn.run_instances( image_id='%s', key_name='%s', instance_type='%s' )" - % ( mi_id, uci_wrapper.get_key_pair_name(), uci_wrapper.get_type( i_index ) ) ) + % ( mi_id, uci_wrapper.get_key_pair_name(), uci_wrapper.get_instance_type( i_index ) ) ) reservation = None try: reservation = conn.run_instances( image_id=mi_id, key_name=uci_wrapper.get_key_pair_name(), - instance_type=uci_wrapper.get_type( i_index ) ) + instance_type=uci_wrapper.get_instance_type( i_index ) ) except boto.exception.EC2ResponseError, e: err = "EC2 response error when starting UCI '"+ uci_wrapper.get_name() +"': " + str( e ) log.error( err ) @@ -403,9 +416,11 @@ uci_wrapper.set_error( err, True ) # Record newly available instance data into local Galaxy database if reservation: - uci_wrapper.set_launch_time( self.format_time( reservation.instances[0].launch_time ), i_index=i_index ) + l_time = datetime.utcnow() +# uci_wrapper.set_instance_launch_time( self.format_time( reservation.instances[0].launch_time ), i_index=i_index ) + uci_wrapper.set_instance_launch_time( l_time, i_index=i_index ) if not uci_wrapper.uci_launch_time_set(): - uci_wrapper.set_uci_launch_time( self.format_time( reservation.instances[0].launch_time ) ) + uci_wrapper.set_uci_launch_time( l_time ) try: uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) # TODO: if more than a single instance will be started through single reservation, change this reference from element [0] @@ -413,7 +428,7 @@ uci_wrapper.set_instance_id( i_index, i_id ) s = reservation.instances[0].state uci_wrapper.change_state( s, i_id, s ) - log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) + log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_uci_state() ) ) except boto.exception.EC2ResponseError, e: err = "EC2 response error when retrieving instance information for UCI '" + uci_wrapper.get_name() + "': " + str( e ) log.error( err ) @@ -430,12 +445,16 @@ def stopUCI( self, uci_wrapper): """ - Stops all of cloud instances associated with given UCI. + Stop all cloud instances associated with given UCI. """ conn = self.get_connection( uci_wrapper ) # Get all instances associated with given UCI il = uci_wrapper.get_instances_ids() # instance list + # Process list of instances and remove any references to empty instance id's + for i in il: + if i is None: + l.remove( i ) log.debug( 'List of instances being terminated: %s' % il ) rl = conn.get_all_instances( il ) # Reservation list associated with given instances @@ -506,41 +525,41 @@ def update( self ): """ - Runs a global status update on all instances that are in 'running', 'pending', or 'shutting-down' state. - Also, runs update on all storage volumes that are in 'in-use', 'creating', or 'None' state. + Run status update on all instances that are in 'running', 'pending', or 'shutting-down' state. + Run status update on all storage volumes whose status is 'in-use', 'creating', or 'None'. + Run status update on all snapshots whose status is 'pending' or 'delete' + Run status update on any zombie UCIs, i.e., UCI's that is in 'submitted' state for an + extended period of time. + Reason behind this method is to sync state of local DB and real-world resources """ log.debug( "Running general status update for EPC UCIs..." ) # Update instances - instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, - model.CloudInstance.c.state==instance_states.PENDING, - model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all() + instances = self.sa_session.query( model.CloudInstance ) \ + .filter( or_( model.CloudInstance.table.c.state==instance_states.RUNNING, + model.CloudInstance.table.c.state==instance_states.PENDING, + model.CloudInstance.table.c.state==instance_states.SHUTTING_DOWN ) ) \ + .all() for inst in instances: if self.type == inst.uci.credentials.provider.type: log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider.type, inst.instance_id ) ) self.updateInstance( inst ) # Update storage volume(s) - stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_status.IN_USE, - model.CloudStore.c.status==store_status.CREATING, - model.CloudStore.c.status==None ) ).all() + stores = self.sa_session.query( model.CloudStore ) \ + .filter( or_( model.CloudStore.table.c.status==store_status.IN_USE, + model.CloudStore.table.c.status==store_status.CREATING, + model.CloudStore.table.c.status==None ) ) \ + .all() for store in stores: if self.type == store.uci.credentials.provider.type: # and store.volume_id != None: log.debug( "[%s] Running general status update on store with local database ID: '%s'" % ( store.uci.credentials.provider.type, store.id ) ) self.updateStore( store ) -# else: -# log.error( "[%s] There exists an entry for UCI (%s) storage volume without an ID. Storage volume might have been created with " -# "cloud provider though. Manual check is recommended." % ( store.uci.credentials.provider.type, store.uci.name ) ) -# store.uci.error = "There exists an entry in local database for a storage volume without an ID. Storage volume might have been created " \ -# "with cloud provider though. Manual check is recommended. After understanding what happened, local database entry for given " \ -# "storage volume should be updated." -# store.status = store_status.ERROR -# store.uci.state = uci_states.ERROR -# store.uci.flush() -# store.flush() # Update pending snapshots or delete ones marked for deletion - snapshots = model.CloudSnapshot.filter_by( status=snapshot_status.PENDING, status=snapshot_status.DELETE ).all() + snapshots = self.sa_session.query( model.CloudSnapshot ) \ + .filter_by( status=snapshot_status.PENDING, status=snapshot_status.DELETE ) \ + .all() for snapshot in snapshots: if self.type == snapshot.uci.credentials.provider.type and snapshot.status == snapshot_status.PENDING: log.debug( "[%s] Running general status update on snapshot '%s'" % ( snapshot.uci.credentials.provider.type, snapshot.snapshot_id ) ) @@ -550,29 +569,34 @@ self.delete_snapshot( snapshot ) # Attempt at updating any zombie UCIs (i.e., instances that have been in SUBMITTED state for longer than expected - see below for exact time) - zombies = model.UCI.filter_by( state=uci_states.SUBMITTED ).all() + zombies = self.sa_session.query( model.UCI ).filter_by( state=uci_states.SUBMITTED ).all() for zombie in zombies: log.debug( "zombie UCI: %s" % zombie.name ) - z_instances = model.CloudInstance \ - .filter_by( uci_id=zombie.id, state=None ) \ + z_instances = self.sa_session.query( model.CloudInstance ) \ + .filter( or_( model.CloudInstance.table.c.state != instance_states.TERMINATED, + model.CloudInstance.table.c.state == None ) ) \ .all() for z_inst in z_instances: if self.type == z_inst.uci.credentials.provider.type: # log.debug( "z_inst.id: '%s', state: '%s'" % ( z_inst.id, z_inst.state ) ) td = datetime.utcnow() - z_inst.update_time - log.debug( "z_inst.id: %s, time delta is %s sec" % ( z_inst.id, td.seconds ) ) +# log.debug( "z_inst.id: %s, time delta is %s sec" % ( z_inst.id, td.seconds ) ) if td.seconds > 180: # if instance has been in SUBMITTED state for more than 3 minutes log.debug( "[%s](td=%s) Running zombie repair update on instance with DB id '%s'" % ( z_inst.uci.credentials.provider.type, td.seconds, z_inst.id ) ) self.processZombie( z_inst ) def updateInstance( self, inst ): - + """ + Update information in local database for given instance as it is obtained from cloud provider. + Along with updating information about given instance, information about the UCI controlling + this instance is also updated. + """ # Get credentials associated wit this instance uci_id = inst.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) - + # Get reservations handle for given instance try: rl= conn.get_all_instances( [inst.instance_id] ) @@ -581,10 +605,12 @@ log.error( err ) uci.error = err uci.state = uci_states.ERROR + self.sa_session.add( uci ) + self.sa_session.flush() return None - # Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query - # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. Until alternative solution + # Because references to reservations are deleted shortly after instances have been terminated, getting an empty list as a response to a query + # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. Until an alternative solution # is found, below code sets state of given UCI to 'error' to indicate to the user something out of ordinary happened. if len( rl ) == 0: err = "Instance ID '"+inst.instance_id+"' was not found by the cloud provider. Instance might have crashed or otherwise been terminated."+ \ @@ -595,109 +621,151 @@ inst.state = instance_states.TERMINATED uci.state = uci_states.ERROR uci.launch_time = None - inst.flush() - uci.flush() + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() # Update instance status in local DB with info from cloud provider for r in rl: for i, cInst in enumerate( r.instances ): try: s = cInst.update() - log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) ) + log.debug( "Checking state of cloud instance '%s' associated with UCI '%s' and reservation '%s'. State='%s'" % ( cInst, uci.name, r, s ) ) if s != inst.state: inst.state = s - inst.flush() - # After instance has shut down, ensure UCI is marked as 'available' - if s == instance_states.TERMINATED and uci.state != uci_states.ERROR: + self.sa_session.add( inst ) + self.sa_session.flush() + # After instance has shut down, ensure UCI is marked as 'available' + if s == instance_states.TERMINATED and uci.state != uci_states.ERROR: uci.state = uci_states.AVAILABLE uci.launch_time = None - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. if s != uci.state and s != instance_states.TERMINATED: uci.state = s - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() if cInst.public_dns_name != inst.public_dns: inst.public_dns = cInst.public_dns_name - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() if cInst.private_dns_name != inst.private_dns: inst.private_dns = cInst.private_dns_name - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() except boto.exception.EC2ResponseError, e: err = "Updating instance status from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) log.error( err ) uci.error = err uci.state = uci_states.ERROR + self.sa_session.add( uci ) + self.sa_session.flush() return None - + def updateStore( self, store ): + """ + Update information in local database for given storage volume as it is obtained from cloud provider. + Along with updating information about given storage volume, information about the UCI controlling + this storage volume is also updated. + """ # Get credentials associated wit this store uci_id = store.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) - - try: - vl = conn.get_all_volumes( [store.volume_id] ) - except boto.exception.EC2ResponseError, e: - err = "Retrieving volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) - log.error( err ) - uci.error = err - uci.state = uci_states.ERROR - uci.flush() - return None - # Update store status in local DB with info from cloud provider - if len(vl) > 0: + if store.volume_id != None: + # Get reservations handle for given store try: - if store.status != vl[0].status: - # In case something failed during creation of UCI but actual storage volume was created and yet - # UCI state remained as 'new', try to remedy this by updating UCI state here - if ( store.status == None ) and ( store.volume_id != None ): - uci.state = vl[0].status - uci.flush() - # If UCI was marked in state 'CREATING', update its status to reflect new status - elif ( uci.state == uci_states.CREATING ): - # Because Eucalyptus Public Cloud (EPC) deletes volumes immediately after they are created, artificially - # set status of given UCI to 'available' based on storage volume's availability zone (i.e., it's residing - # in EPC as opposed to some other Eucalyptus based cloud that allows creation of storage volumes. - if store.availability_zone == 'epc': - uci.state = uci_states.AVAILABLE - else: - uci.state = vl[0].status - uci.flush() - - store.status = vl[0].status - store.flush() - if store.i_id != vl[0].instance_id: - store.i_id = vl[0].instance_id - store.flush() - if store.attach_time != vl[0].attach_time: - store.attach_time = vl[0].attach_time - store.flush() - if store.device != vl[0].device: - store.device = vl[0].device - store.flush() + log.debug( "Updating storage volume command: vl = conn.get_all_volumes( [%s] )" % store.volume_id ) + vl = conn.get_all_volumes( [store.volume_id] ) except boto.exception.EC2ResponseError, e: - err = "Updating status of volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + err = "Retrieving volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) log.error( err ) uci.error = err uci.state = uci_states.ERROR - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() return None + + # Update store status in local DB with info from cloud provider + if len(vl) > 0: + try: + if store.status != vl[0].status: + # In case something failed during creation of UCI but actual storage volume was created and yet + # UCI state remained as 'new', try to remedy this by updating UCI state here + if ( store.status == None ) and ( store.volume_id != None ): + uci.state = vl[0].status + self.sa_session.add( uci ) + self.sa_session.flush() + # If UCI was marked in state 'CREATING', update its status to reflect new status + elif ( uci.state == uci_states.CREATING ): + # Because Eucalyptus Public Cloud (EPC) deletes volumes immediately after they are created, artificially + # set status of given UCI to 'available' based on storage volume's availability zone (i.e., it's residing + # in EPC as opposed to some other Eucalyptus based cloud that allows creation of storage volumes. + if store.availability_zone == 'epc': + uci.state = uci_states.AVAILABLE + else: + uci.state = vl[0].status + + self.sa_session.add( uci ) + self.sa_session.flush() + + store.status = vl[0].status + self.sa_session.add( store ) + self.sa_session.flush() + if store.i_id != vl[0].instance_id: + store.i_id = vl[0].instance_id + self.sa_session.add( store ) + self.sa_session.flush() + if store.attach_time != vl[0].attach_time: + store.attach_time = vl[0].attach_time + self.sa_session.add( store ) + self.sa_session.flush() + if store.device != vl[0].device: + store.device = vl[0].device + self.sa_session.add( store ) + self.sa_session.flush() + except boto.exception.EC2ResponseError, e: + err = "Updating status of volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + self.sa_session.add( uci ) + self.sa_session.flush() + return None + else: + err = "No storage volumes returned by cloud provider on general update" + log.error( "%s for UCI '%s'" % ( err, uci.name ) ) + store.status = store_status.ERROR + store.error = err + uci.error = err + uci.state = uci_states.ERROR + self.sa_session.add( uci ) + self.sa_session.add( store ) + self.sa_session.flush() else: - err = "No storage volumes returned by cloud provider on general update" - log.error( "%s for UCI '%s'" % ( err, uci.name ) ) + err = "Missing storage volume ID in local database on general update. Manual check is needed to check " \ + "if storage volume was actually created by cloud provider." + log.error( "%s (for UCI '%s')" % ( err, uci.name ) ) store.status = store_status.ERROR store.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - store.flush() - + self.sa_session.add( uci ) + self.sa_session.add( store ) + self.sa_session.flush() + def updateSnapshot( self, snapshot ): + """ + Update information in local database for given snapshot as it is obtained from cloud provider. + Along with updating information about given snapshot, information about the UCI controlling + this snapshot is also updated. + """ # Get credentials associated wit this store uci_id = snapshot.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) try: @@ -706,25 +774,28 @@ if len( snap ) > 0: log.debug( "Snapshot '%s' status: %s" % ( snapshot.snapshot_id, snap[0].status ) ) snapshot.status = snap[0].status - snapshot.flush() + self.sa_session.add( snapshot ) + self.sa_session.flush() else: - err = "No snapshots returned by cloud provider on general update" + err = "No snapshots returned by EC2 on general update" log.error( "%s for UCI '%s'" % ( err, uci.name ) ) snapshot.status = snapshot_status.ERROR snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() except boto.exception.EC2ResponseError, e: - err = "Cloud provider response error while updating snapshot status: " + str( e ) + err = "EC2 response error while updating snapshot status: " + str( e ) log.error( err ) snapshot.status = snapshot_status.ERROR snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() except Exception, ex: err = "Error while updating snapshot status: " + str( ex ) log.error( err ) @@ -732,15 +803,19 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() - + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() + def delete_snapshot( self, snapshot ): + """ + Initiate deletion of given snapshot from cloud provider. + """ if snapshot.status == snapshot_status.DELETE: # Get credentials associated wit this store uci_id = snapshot.uci_id - uci = model.UCI.get( uci_id ) - uci.refresh() + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) conn = self.get_connection_from_uci( uci ) try: @@ -749,7 +824,8 @@ if snap == True: snapshot.deleted = True snapshot.status = snapshot_status.DELETED - snapshot.flush() + self.sa_session.add( snapshot ) + self.sa_session.flush() return snap except boto.exception.EC2ResponseError, e: err = "EC2 response error while deleting snapshot: " + str( e ) @@ -758,8 +834,9 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() except Exception, ex: err = "Error while deleting snapshot: " + str( ex ) log.error( err ) @@ -767,21 +844,27 @@ snapshot.error = err uci.error = err uci.state = uci_states.ERROR - uci.flush() - snapshot.flush() + self.sa_session.add( uci ) + self.sa_session.add( snapshot ) + self.sa_session.flush() else: err = "Cannot delete snapshot '"+snapshot.snapshot_id+"' because its status is '"+snapshot.status+"'. Only snapshots with '" + \ snapshot_status.COMPLETED+"' status can be deleted." log.error( err ) snapshot.error = err - snapshot.flush() - + self.sa_session.add( snapshot ) + self.sa_session.flush() + def processZombie( self, inst ): """ - Attempt at discovering if starting an instance was successful but local database was not updated + Attempt at discovering if starting a cloud instance was successful but local database was not updated accordingly or if something else failed and instance was never started. Currently, no automatic repairs are being attempted; instead, appropriate error messages are set. """ + uci_id = inst.uci_id + uci = self.sa_session.query( model.UCI ).get( uci_id ) + self.sa_session.refresh( uci ) + # Check if any instance-specific information was written to local DB; if 'yes', set instance and UCI's error message # suggesting manual check. if inst.launch_time != None or inst.reservation_id != None or inst.instance_id != None: @@ -790,7 +873,7 @@ # report as error. # Fields attempting to be recovered are: reservation_id, instance status, and launch_time if inst.instance_id != None: - conn = self.get_connection_from_uci( inst.uci ) + conn = self.get_connection_from_uci( uci ) rl = conn.get_all_instances( [inst.instance_id] ) # reservation list # Update local DB with relevant data from instance if inst.reservation_id == None: @@ -802,9 +885,10 @@ try: state = rl[0].instances[0].update() inst.state = state - inst.uci.state = state - inst.flush() - inst.uci.flush() + uci.state = state + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() except: # something failed, so skip pass @@ -812,10 +896,12 @@ try: launch_time = self.format_time( rl[0].instances[0].launch_time ) inst.launch_time = launch_time - inst.flush() + self.sa_session.add( inst ) + self.sa_session.flush() if inst.uci.launch_time == None: - inst.uci.launch_time = launch_time - inst.uci.flush() + uci.launch_time = launch_time + self.sa_session.add( uci ) + self.sa_session.flush() except: # something failed, so skip pass else: @@ -826,8 +912,9 @@ inst.uci.error = err inst.uci.state = uci_states.ERROR log.error( err ) - inst.flush() - inst.uci.flush() + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() else: #Instance most likely never got processed, so set error message suggesting user to try starting instance again. err = "Starting a machine instance (DB id: '"+str(inst.id)+"') associated with this UCI '" + str(inst.uci.name) + \ @@ -835,11 +922,12 @@ "starting the instance again." inst.error = err inst.state = instance_states.ERROR - inst.uci.error = err - inst.uci.state = uci_states.ERROR + uci.error = err + uci.state = uci_states.ERROR log.error( err ) - inst.flush() - inst.uci.flush() + self.sa_session.add( inst ) + self.sa_session.add( uci ) + self.sa_session.flush() # uw = UCIwrapper( inst.uci ) # log.debug( "Try automatically re-submitting UCI '%s'." % uw.get_name() ) @@ -848,16 +936,23 @@ Establishes and returns connection to cloud provider. Information needed to do so is obtained directly from uci database object. """ - log.debug( 'Establishing %s cloud connection.' % self.type ) + log.debug( 'Establishing %s cloud connection' % self.type ) a_key = uci.credentials.access_key s_key = uci.credentials.secret_key # Get connection try: region = RegionInfo( None, uci.credentials.provider.region_name, uci.credentials.provider.region_endpoint ) + log.debug( "[%s] Using following command to connect to cloud provider: " + "conn = EC2Connection( aws_access_key_id=%s, " + "aws_secret_access_key=%s, " + "port=%s, " + "is_secure=%s, " + "region=region, " + "path=%s )" % ( self.type, a_key, s_key, uci.credentials.provider.is_secure, uci.credentials.provider.port, uci.credentials.provider.path ) ) conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, - is_secure=uci.credentials.provider.is_secure, - port=uci.credentials.provider.port, + is_secure=uci.credentials.provider.is_secure, + port=uci.credentials.provider.port, region=region, path=uci.credentials.provider.path ) except boto.exception.EC2ResponseError, e: @@ -865,7 +960,8 @@ log.error( err ) uci.error = err uci.state = uci_states.ERROR - uci.flush() + self.sa_session.add( uci ) + self.sa_session.flush() return None return conn @@ -878,7 +974,7 @@ # conn = self.get_connection( uci ) # # # Update status of storage volumes -# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() +# vl = model.CloudStore.filter( model.CloudInstance.table.c.uci_id == uci.id ).all() # vols = [] # for v in vl: # vols.append( v.volume_id ) @@ -894,7 +990,7 @@ # pass # # # Update status of instances -# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() +# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.table.c.state != 'terminated' ).all() # instanceList = [] # for i in il: # instanceList.append( i.instance_id ) diff -r 7d013eb98022 -r 39502dd3fd23 lib/galaxy/config.py --- a/lib/galaxy/config.py Thu Nov 12 16:36:07 2009 -0500 +++ b/lib/galaxy/config.py Mon Nov 16 20:37:47 2009 -0500 @@ -114,12 +114,11 @@ self.tool_runners = [] self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' ) # Cloud configuration options - self.cloud_controller_instance = string_as_bool( kwargs.get( 'cloud_controller_instance', 'False' ) ) - self.cloud_provider = kwargs.get( 'cloud_provider', None ) - if self.cloud_controller_instance: - self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'True' ) ) + self.cloud_controller_instance = string_as_bool( kwargs.get( 'cloud_controller_instance', False ) ) + if self.cloud_controller_instance == True: + self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', True ) ) else: - self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'False' ) ) + self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', False ) ) def get( self, key, default ): return self.config_dict.get( key, default ) def get_bool( self, key, default ): diff -r 7d013eb98022 -r 39502dd3fd23 lib/galaxy/web/controllers/cloud.py --- a/lib/galaxy/web/controllers/cloud.py Thu Nov 12 16:36:07 2009 -0500 +++ b/lib/galaxy/web/controllers/cloud.py Mon Nov 16 20:37:47 2009 -0500 @@ -91,48 +91,48 @@ cloudCredentials = trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user ) \ - .filter( model.CloudUserCredentials.c.deleted != True ) \ - .order_by( model.CloudUserCredentials.c.name ) \ + .filter( model.CloudUserCredentials.table.c.deleted != True ) \ + .order_by( model.CloudUserCredentials.table.c.name ) \ .all() cloudProviders = trans.sa_session.query( model.CloudProvider ) \ .filter_by( user=user ) \ - .filter( model.CloudProvider.c.deleted != True ) \ - .order_by( model.CloudProvider.c.name ) \ + .filter( model.CloudProvider.table.c.deleted != True ) \ + .order_by( model.CloudProvider.table.c.name ) \ .all() liveInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state==uci_states.RUNNING, - model.UCI.c.state==uci_states.PENDING, - model.UCI.c.state==uci_states.SUBMITTED, - model.UCI.c.state==uci_states.SUBMITTED_UCI, - model.UCI.c.state==uci_states.SHUTTING_DOWN, - model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI ) ) \ - .order_by( desc( model.UCI.c.update_time ) ) \ + .filter( or_( model.UCI.table.c.state==uci_states.RUNNING, + model.UCI.table.c.state==uci_states.PENDING, + model.UCI.table.c.state==uci_states.SUBMITTED, + model.UCI.table.c.state==uci_states.SUBMITTED_UCI, + model.UCI.table.c.state==uci_states.SHUTTING_DOWN, + model.UCI.table.c.state==uci_states.SHUTTING_DOWN_UCI ) ) \ + .order_by( desc( model.UCI.table.c.update_time ) ) \ .all() prevInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user, deleted=False ) \ - .filter( or_( model.UCI.c.state==uci_states.AVAILABLE, - model.UCI.c.state==uci_states.NEW, - model.UCI.c.state==uci_states.NEW_UCI, - model.UCI.c.state==uci_states.CREATING, - model.UCI.c.state==uci_states.ERROR, - model.UCI.c.state==uci_states.DELETED, - model.UCI.c.state==uci_states.DELETING, - model.UCI.c.state==uci_states.DELETING_UCI, - model.UCI.c.state==uci_states.SNAPSHOT, - model.UCI.c.state==uci_states.SNAPSHOT_UCI ) ) \ - .order_by( desc( model.UCI.c.update_time ) ) \ + .filter( or_( model.UCI.table.c.state==uci_states.AVAILABLE, + model.UCI.table.c.state==uci_states.NEW, + model.UCI.table.c.state==uci_states.NEW_UCI, + model.UCI.table.c.state==uci_states.CREATING, + model.UCI.table.c.state==uci_states.ERROR, + model.UCI.table.c.state==uci_states.DELETED, + model.UCI.table.c.state==uci_states.DELETING, + model.UCI.table.c.state==uci_states.DELETING_UCI, + model.UCI.table.c.state==uci_states.SNAPSHOT, + model.UCI.table.c.state==uci_states.SNAPSHOT_UCI ) ) \ + .order_by( desc( model.UCI.table.c.update_time ) ) \ .all() # Check after update there are instances in pending state; if so, display message pendingInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_( model.UCI.c.state==uci_states.PENDING, - model.UCI.c.state==uci_states.SUBMITTED, - model.UCI.c.state==uci_states.SUBMITTED_UCI ) ) \ + .filter( or_( model.UCI.table.c.state==uci_states.PENDING, + model.UCI.table.c.state==uci_states.SUBMITTED, + model.UCI.table.c.state==uci_states.SUBMITTED_UCI ) ) \ .all() if pendingInstances: trans.set_message( "Galaxy instance started. NOTE: Please wait about 5 minutes for the instance to " @@ -173,7 +173,7 @@ # Create new user configured instance try: - if trans.app.model.UCI \ + if trans.sa_session.query( model.UCI ) \ .filter_by (user=user, deleted=False, name=instanceName ) \ .first(): error['inst_error'] = "An instance with that name already exist." @@ -191,8 +191,8 @@ # Capture user configured instance information uci = model.UCI() uci.name = instanceName - uci.credentials = trans.app.model.CloudUserCredentials.filter( - trans.app.model.CloudUserCredentials.table.c.name==credName ).first() + uci.credentials = trans.sa_session.query( model.CloudUserCredentials ) \ + .filter( model.CloudUserCredentials.table.c.name==credName ).first() uci.user= user uci.total_size = volSize # This is OK now because new instance is being created and only one storage volume can be created at UCI creation time uci.state = uci_states.NEW_UCI @@ -204,8 +204,8 @@ storage.availability_zone = zone # Persist session = trans.sa_session - session.save_or_update( uci ) - session.save_or_update( storage ) + session.add( uci ) + session.add( storage ) session.flush() # Log and display the management page trans.log_event( "User configured new cloud instance: '%s'" % instanceName ) @@ -278,8 +278,8 @@ uci.state = uci_states.SUBMITTED_UCI # Persist session = trans.sa_session - session.save_or_update( instance ) - session.save_or_update( uci ) + session.add( instance ) + session.add( uci ) session.flush() # Log trans.log_event ("User initiated starting of UCI '%s'." % uci.name ) @@ -309,7 +309,7 @@ ( uci.state != uci_states.AVAILABLE ): uci.state = uci_states.SHUTTING_DOWN_UCI session = trans.sa_session - session.save_or_update( uci ) + session.add( uci ) session.flush() trans.log_event( "User stopped cloud instance '%s' (id: %s)" % ( uci.name, uci.id ) ) trans.set_message( "Stopping of Galaxy instance '%s' initiated." % uci.name ) @@ -378,7 +378,7 @@ prevInstances = trans.sa_session.query( model.CloudInstance ) \ .filter_by( user=user, state=instance_states.TERMINATED, uci_id=id ) \ - .order_by( desc( model.CloudInstance.c.update_time ) ) \ + .order_by( desc( model.CloudInstance.table.c.update_time ) ) \ .all() return trans.fill_template( "cloud/view_usage.mako", prevInstances = prevInstances ) @@ -396,7 +396,7 @@ name = uci.name uci.state = uci_states.DELETING_UCI session = trans.sa_session - session.save_or_update( uci ) + session.add( uci ) session.flush() trans.log_event( "User marked cloud instance '%s' for deletion." % name ) trans.set_message( "Galaxy instance '%s' marked for deletion." % name ) @@ -431,8 +431,8 @@ uci.state = uci_states.SNAPSHOT_UCI # Persist session = trans.sa_session - session.save_or_update( snapshot ) - session.save_or_update( uci ) + session.add( snapshot ) + session.add( uci ) session.flush() elif len( stores ) == 0: error( "No storage volumes found that are associated with this instance." ) @@ -455,7 +455,7 @@ snaps = trans.sa_session.query( model.CloudSnapshot ) \ .filter_by( user=user, uci_id=id, deleted=False ) \ - .order_by( desc( model.CloudSnapshot.c.update_time ) ) \ + .order_by( desc( model.CloudSnapshot.table.c.update_time ) ) \ .all() return trans.fill_template( "cloud/view_snapshots.mako", @@ -474,7 +474,8 @@ if snap.status == snapshot_status.COMPLETED: snap.status = snapshot_status.DELETE - snap.flush() + trans.sa_session.add( snap ) + trans.sa_session.flush() trans.set_message( "Snapshot '%s' is marked for deletion. Once the deletion is complete, it will no longer be visible in this list. " "Please note that this process may take up to a minute." % snap.snapshot_id ) else: @@ -485,7 +486,7 @@ uci_id = trans.security.decode_id( uci_id ) snaps = trans.sa_session.query( model.CloudSnapshot ) \ .filter_by( user=user, uci_id=uci_id, deleted=False ) \ - .order_by( desc( model.CloudSnapshot.c.update_time ) ) \ + .order_by( desc( model.CloudSnapshot.table.c.update_time ) ) \ .all() return trans.fill_template( "cloud/view_snapshots.mako", @@ -514,9 +515,9 @@ error['provider_error'] = "You must select cloud provider type for this machine image." elif image_id=='' or len( image_id ) > 255: error['id_error'] = "Image ID must be between 1 and 255 characters long." - elif trans.app.model.CloudUserCredentials \ + elif trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( deleted=False ) \ - .filter( trans.app.model.CloudImage.table.c.image_id == image_id ) \ + .filter( model.CloudImage.table.c.image_id == image_id ) \ .first(): error['id_error'] = "Image with ID '" + image_id + "' is already registered. \ Please choose another ID." @@ -531,7 +532,7 @@ image.architecture = architecture # Persist session = trans.sa_session - session.save_or_update( image ) + session.add( image ) session.flush() # Log and display the management page trans.log_event( "New cloud image added: '%s'" % image.image_id ) @@ -557,7 +558,7 @@ @web.expose @web.require_login( "use Galaxy cloud" ) def list_machine_images( self, trans ): - images = trans.sa_session.query( model.CloudImage ).filter( trans.app.model.CloudImage.table.c.deleted != True ).all() + images = trans.sa_session.query( model.CloudImage ).filter( model.CloudImage.table.c.deleted != True ).all() return trans.fill_template( '/cloud/list_images.mako', images=images ) @web.expose @@ -568,7 +569,8 @@ image = trans.sa_session.query( model.CloudImage ).get( id ) image.deleted = True - image.flush() + trans.sa_session.add( image ) + trans.sa_session.flush() return self.list_machine_images( trans ) @web.expose @@ -588,9 +590,9 @@ image = trans.sa_session.query( model.CloudImage ).get( id ) if image_id=='' or len( image_id ) > 255: error['id_error'] = "Image ID must be between 1 and 255 characters in length." - elif trans.app.model.CloudImage \ + elif trans.sa_session.query( model.CloudImage ) \ .filter_by( deleted=False ) \ - .filter( and_( trans.app.model.CloudImage.table.c.id != image.id, trans.app.model.CloudImage.table.c.image_id==image_id ) ) \ + .filter( and_( model.CloudImage.table.c.id != image.id, model.CloudImage.table.c.image_id==image_id ) ) \ .first(): error['id_error'] = "Image with ID '" + image_id + "' already exist. Please choose an alternative name." elif architecture=='' or len( architecture ) > 255: @@ -606,7 +608,7 @@ image.architecture = architecture # Persist session = trans.sa_session - session.save_or_update( image ) + session.add( image ) session.flush() # Log and display the management page trans.set_message( "Machine image '%s' edited." % image.image_id ) @@ -626,9 +628,9 @@ if credName or providerName or accessKey or secretKey: if credName=='' or len( credName ) > 255: error['cred_error'] = "Credentials name must be between 1 and 255 characters in length." - elif trans.app.model.CloudUserCredentials \ + elif trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user, deleted=False ) \ - .filter( trans.app.model.CloudUserCredentials.table.c.name == credName ) \ + .filter( model.CloudUserCredentials.table.c.name == credName ) \ .first(): error['cred_error'] = "Credentials with that name already exist." elif providerName=='': @@ -648,7 +650,7 @@ credentials.provider = provider # Persist session = trans.sa_session - session.save_or_update( credentials ) + session.add( credentials ) session.flush() # Log and display the management page trans.log_event( "User added new credentials" ) @@ -680,9 +682,9 @@ credentials = get_stored_credentials( trans, id ) if credName=='' or len( credName ) > 255: error['cred_error'] = "Credentials name must be between 1 and 255 characters in length." - elif trans.app.model.CloudUserCredentials \ + elif trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user ) \ - .filter( and_( trans.app.model.CloudUserCredentials.table.c.id != credentials.id, trans.app.model.CloudUserCredentials.table.c.name==credName ) ) \ + .filter( and_( model.CloudUserCredentials.table.c.id != credentials.id, model.CloudUserCredentials.table.c.name==credName ) ) \ .first(): error['cred_error'] = "Credentials with name '" + credName + "' already exist. Please choose an alternative name." elif accessKey=='' or len( accessKey ) > 255: @@ -702,7 +704,7 @@ credentials.secret_key = secretKey # Persist session = trans.sa_session - session.save_or_update( credentials ) + session.add( credentials ) session.flush() # Log and display the management page trans.set_message( "Credential '%s' edited." % credentials.name ) @@ -745,7 +747,8 @@ if UCI == None: # Delete and save stored.deleted = True - stored.flush() + trans.sa_session.add( stored ) + trans.sa_session.flush() # Display the management page trans.set_message( "Credentials '%s' deleted." % stored.name ) return self.list( trans ) @@ -769,9 +772,9 @@ except ValueError: error['is_secure_error'] = "Field 'is secure' can only take on an integer value '0' or '1'" - if trans.app.model.CloudProvider \ + if trans.sa_session.query( model.CloudProvider ) \ .filter_by (user=user, name=name) \ - .filter( model.CloudProvider.c.deleted != True ) \ + .filter( model.CloudProvider.table.c.deleted != True ) \ .first(): error['name_error'] = "A provider with that name already exist." elif name=='' or len( name ) > 255: @@ -843,7 +846,7 @@ provider.path = path # Persist session = trans.sa_session - session.save_or_update( provider ) + session.add( provider ) session.flush() # Log and display the management page trans.log_event( "User configured new cloud provider: '%s'" % name ) @@ -909,9 +912,9 @@ if name=='' or len( name ) > 255: error['name_error'] = "Cloud provider name must be between 1 and 255 characters in length." - elif trans.app.model.CloudProvider \ + elif trans.sa_session.query( model.CloudProvider ) \ .filter_by( user=user ) \ - .filter( and_( trans.app.model.CloudProvider.table.c.id != provider.id, trans.app.model.CloudProvider.table.c.name == name ) ) \ + .filter( and_( model.CloudProvider.table.c.id != provider.id, model.CloudProvider.table.c.name == name ) ) \ .first(): error['name_error'] = "Cloud provider with name '" + name + "' already exist. Please choose an alternative name." elif not ( is_secure == 0 or is_secure == 1): @@ -985,7 +988,7 @@ provider.path = None # Persist session = trans.sa_session - session.save_or_update( provider ) + session.add( provider ) session.flush() # Log and display the management page trans.log_event( "User edited cloud provider: '%s'" % name ) @@ -1003,14 +1006,15 @@ provider = get_provider_by_id( trans, id ) creds = trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user, provider_id=provider.id ) \ - .filter( model.CloudUserCredentials.c.deleted != True ) \ + .filter( model.CloudUserCredentials.table.c.deleted != True ) \ .all() if len( creds ) == 0: # Delete and save #sess = trans.sa_session provider.deleted = True - provider.flush() + trans.sa_session.add( provider ) + trans.sa_session.flush() # Display the management page trans.set_message( "Cloud provider '%s' deleted." % provider.name ) return self.list( trans ) @@ -1024,7 +1028,7 @@ @web.json def json_update( self, trans ): user = trans.get_user() - UCIs = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.deleted != True ).all() + UCIs = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.table.c.deleted != True ).all() insd = {} # instance name-state dict for uci in UCIs: dict = {} @@ -1062,7 +1066,7 @@ def get_provider( trans, name ): user = trans.get_user() - return trans.app.model.CloudProvider \ + return trans.sa_session.query( model.CloudProvider ) \ .filter_by (user=user, name=name) \ .first() @@ -1126,19 +1130,6 @@ # Looks good return live -def get_mi( trans, uci, size='m1.small' ): - """ - Get appropriate machine image (mi) based on instance size. - TODO: Dummy method - need to implement logic - For valid sizes, see http://aws.amazon.com/ec2/instance-types/ - """ - if uci.credentials.provider.type == 'ec2': - return trans.app.model.CloudImage.filter( - trans.app.model.CloudImage.table.c.id==2).first() - else: - return trans.app.model.CloudImage.filter( - trans.app.model.CloudImage.table.c.id==1).first() - def get_stores( trans, uci ): """ Get stores objects that are connected to uci object @@ -1146,7 +1137,7 @@ user = trans.get_user() stores = trans.sa_session.query( model.CloudStore ) \ .filter_by( user=user, uci_id=uci.id ) \ - .filter( model.CloudStore.c.status != store_status.ERROR ) \ + .filter( model.CloudStore.table.c.status != store_status.ERROR ) \ .all() return stores @@ -1173,7 +1164,7 @@ # creds = trans.sa_session.query( model.CloudUserCredentials ) \ # .filter_by( user=user, name=credName ) \ # .first() - #.filter( model.CloudUserCredentials.c.deleted != True ) \ MOVE TO LINE ABOVE ONCE DELETE COLUMS ARE IMPLEMENTED + #.filter( model.CloudUserCredentials.table.c.deleted != True ) \ MOVE TO LINE ABOVE ONCE DELETE COLUMS ARE IMPLEMENTED if creds: a_key = creds.access_key diff -r 7d013eb98022 -r 39502dd3fd23 templates/cloud/configure_cloud.mako --- a/templates/cloud/configure_cloud.mako Thu Nov 12 16:36:07 2009 -0500 +++ b/templates/cloud/configure_cloud.mako Mon Nov 16 20:37:47 2009 -0500 @@ -90,8 +90,9 @@ // Update 'state' and 'time alive' fields $(elem + "-state").text( data[i].state ); - if ( ( prev_old_state.match('newUCI') && new_state=='new' ) || \ + if ( ( prev_old_state.match('newUCI') && new_state=='new' ) || ( prev_old_state.match('newUCI') && new_state=='available' ) || \ ( prev_old_state.match('newUCI') && new_state=='creating' ) || ( prev_old_state.match('new') && new_state=='creating' ) || \ + ( prev_old_state.match('new') && new_state=='available' ) || \ ( prev_old_state.match('deletingUCI') && new_state=='deleted' ) || ( prev_old_state.match('deleting') && new_state=='deleted' ) || \ ( prev_old_state.match('available') && new_state=='error' ) || ( prev_old_state.match('deleting') && new_state=='error' ) ) { // TODO: on state change from available->error and deleting->error page should be refreshed but that causes problems with
participants (1)
-
Greg Von Kuster