[hg] galaxy 3094: Import cloud management module
details: http://www.bx.psu.edu/hg/galaxy/rev/7d013eb98022 changeset: 3094:7d013eb98022 user: Kanwei Li <kanwei@gmail.com> date: Thu Nov 12 16:36:07 2009 -0500 description: Import cloud management module diffstat: eggs.ini | 2 + lib/galaxy/app.py | 4 +- lib/galaxy/cloud/__init__.py | 620 +++++++++++ lib/galaxy/cloud/providers/ec2.py | 940 +++++++++++++++++ lib/galaxy/cloud/providers/eucalyptus.py | 923 ++++++++++++++++ lib/galaxy/config.py | 7 + lib/galaxy/model/__init__.py | 54 + lib/galaxy/model/mapping.py | 147 ++ lib/galaxy/model/migrate/versions/0026_cloud_tables.py | 152 ++ lib/galaxy/web/controllers/cloud.py | 1193 +++++++++++++++++++++ static/images/silk/resultset_previous.png | templates/admin/index.mako | 9 + templates/base_panels.mako | 14 +- templates/cloud/add_credentials.mako | 110 ++ templates/cloud/add_image.mako | 98 + templates/cloud/add_provider.mako | 252 ++++ templates/cloud/configure_cloud.mako | 367 ++++++ templates/cloud/configure_uci.mako | 116 ++ templates/cloud/edit_credentials.mako | 91 + templates/cloud/edit_image.mako | 92 + templates/cloud/edit_provider.mako | 261 ++++ templates/cloud/index.mako | 16 + templates/cloud/list_images.mako | 90 + templates/cloud/view_credentials.mako | 157 ++ templates/cloud/view_instance.mako | 140 ++ templates/cloud/view_provider.mako | 126 ++ templates/cloud/view_snapshots.mako | 90 + templates/cloud/view_usage.mako | 117 ++ templates/root/index.mako | 13 +- universe_wsgi.ini.sample | 5 + 30 files changed, 6197 insertions(+), 9 deletions(-) diffs (truncated from 6417 to 3000 lines): diff -r 0984c3800775 -r 7d013eb98022 eggs.ini --- a/eggs.ini Thu Nov 12 15:25:48 2009 -0500 +++ b/eggs.ini Thu Nov 12 16:36:07 2009 -0500 @@ -52,6 +52,7 @@ wsgiref = 0.1.2 Babel = 0.9.4 wchartype = 0.1 +boto = 1.8d ; extra version information [tags] @@ -102,3 +103,4 @@ wsgiref = http://pypi.python.org/packages/source/w/wsgiref/wsgiref-0.1.2.zip Babel = http://ftp.edgewall.com/pub/babel/Babel-0.9.4.zip wchartype = http://ginstrom.com/code/wchartype-0.1.zip +boto = http://boto.googlecode.com/files/boto-1.8d.tar.gz \ No newline at end of file diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/app.py --- a/lib/galaxy/app.py Thu Nov 12 15:25:48 2009 -0500 +++ b/lib/galaxy/app.py Thu Nov 12 16:36:07 2009 -0500 @@ -1,6 +1,6 @@ import sys, os, atexit -from galaxy import config, jobs, util, tools, web +from galaxy import config, jobs, util, tools, web, cloud ## from galaxy.tracks import store from galaxy.web import security import galaxy.model @@ -68,6 +68,8 @@ # FIXME: These are exposed directly for backward compatibility self.job_queue = self.job_manager.job_queue self.job_stop_queue = self.job_manager.job_stop_queue + # Start the cloud manager + self.cloud_manager = cloud.CloudManager( self ) # Track Store ## self.track_store = store.TrackStoreManager( self.config.track_store_path ) diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/cloud/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/cloud/__init__.py Thu Nov 12 16:36:07 2009 -0500 @@ -0,0 +1,620 @@ +import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil + +from galaxy import util, model, config +from galaxy.model import mapping +from galaxy.model.orm import lazyload +from galaxy.datatypes.tabular import * +from galaxy.datatypes.interval import * +from galaxy.datatypes import metadata +from galaxy.util.bunch import Bunch +from sqlalchemy import or_ + +import pkg_resources +pkg_resources.require( "PasteDeploy" ) + +from paste.deploy.converters import asbool + +from Queue import Queue, Empty + +log = logging.getLogger( __name__ ) + +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + CREATING = "creating", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + DELETED = "deleted", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + SNAPSHOT_UCI = "snapshotUCI", + SNAPSHOT = "snapshot" +) +instance_states = Bunch( + TERMINATED = "terminated", + SUBMITTED = "submitted", + RUNNING = "running", + PENDING = "pending", + SHUTTING_DOWN = "shutting-down", + ERROR = "error" +) + +snapshot_status = Bunch( + SUBMITTED = 'submitted', + PENDING = 'pending', + COMPLETED = 'completed', + DELETE = 'delete', + DELETED= 'deleted', + ERROR = "error" +) + +class CloudManager( object ): + """ + Highest level interface to cloud management. + """ + def __init__( self, app ): + self.app = app + if self.app.config.get_bool( "enable_cloud_execution", True ): + # The dispatcher manager for underlying cloud instances - implements and contacts individual cloud providers + self.provider = CloudProvider( app ) + # Monitor for updating status of cloud instances + self.cloud_monitor = CloudMonitor( self.app, self.provider ) + else: + self.job_queue = self.job_stop_queue = NoopCloudMonitor() + + def shutdown( self ): + self.cloud_monitor.shutdown() + +class Sleeper( object ): + """ + Provides a 'sleep' method that sleeps for a number of seconds *unless* + the notify method is called (from a different thread). + """ + def __init__( self ): + self.condition = threading.Condition() + def sleep( self, seconds ): + self.condition.acquire() + self.condition.wait( seconds ) + self.condition.release() + def wake( self ): + self.condition.acquire() + self.condition.notify() + self.condition.release() + +class CloudMonitor( object ): + """ + Cloud manager, waits for user to instantiate a cloud instance and then invokes a + CloudProvider. + """ + STOP_SIGNAL = object() + def __init__( self, app, provider ): + """Start the cloud manager""" + self.app = app + # Keep track of the pid that started the cloud manager, only it + # has valid threads + self.parent_pid = os.getpid() + + # Contains requests that are waiting (only use from monitor thread) + self.waiting = [] + + # Helper for interruptable sleep + self.sleeper = Sleeper() + self.running = True + self.provider = provider + self.monitor_thread = threading.Thread( target=self.__monitor ) + self.monitor_thread.start() + log.info( "Cloud manager started" ) + + def __monitor( self ): + """ + Daemon that continuously monitors cloud instance requests as well as state + of running instances. + """ + # HACK: Delay until after forking, we need a way to do post fork notification!!! + time.sleep( 10 ) + + cnt = 0 # Run global update only periodically so keep counter variable + while self.running: + try: +# log.debug( "Calling monitor_step" ) + self.__monitor_step() + if cnt%30 == 0: # Run global update every 30 iterations (1 minute) + self.provider.update() + cnt = 0 + except: + log.exception( "Exception in cloud manager monitor_step" ) + # Sleep + cnt += 1 + self.sleeper.sleep( 2 ) + + def __monitor_step( self ): + """ + Called repeatedly by `monitor` to process cloud instance requests. + TODO: Update following description to match the code + Gets any new cloud instance requests from the database, then iterates + over all new and waiting jobs to check the state of the jobs each + depends on. If the job has dependencies that have not finished, it + it goes to the waiting queue. If the job has dependencies with errors, + it is marked as having errors and removed from the queue. Otherwise, + the job is dispatched. + """ + # Get an orm (object relational mapping) session + session = mapping.Session() + new_requests = [] + + for r in session.query( model.UCI ) \ + .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, + model.UCI.c.state==uci_states.SUBMITTED_UCI, + model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, + model.UCI.c.state==uci_states.DELETING_UCI, + model.UCI.c.state==uci_states.SNAPSHOT_UCI ) ) \ + .all(): + uci_wrapper = UCIwrapper( r ) + new_requests.append( uci_wrapper ) + + for uci_wrapper in new_requests: + session.clear() + self.put( uci_wrapper ) + + # Done with the session + mapping.Session.remove() + + def put( self, uci_wrapper ): + """Add a request to the queue.""" + self.provider.put( uci_wrapper ) + self.sleeper.wake() + + def shutdown( self ): + """Attempts to gracefully shut down the worker thread""" + if self.parent_pid != os.getpid(): + # We're not the real queue, do nothing + return + else: + log.info( "Sending stop signal to worker thread" ) + self.running = False + self.sleeper.wake() + log.info( "cloud manager stopped" ) + self.dispatcher.shutdown() + +class UCIwrapper( object ): + """ + Wraps 'model.UCI' with convenience methods for state management + """ + def __init__( self, uci ): + self.uci_id = uci.id + + # --------- Setter methods ----------------- + + def change_state( self, uci_state=None, instance_id=None, i_state=None ): + """ + Sets state for UCI and/or UCI's instance with instance_id as provided by cloud provider and stored in local + Galaxy database. + Need to provide either: (1) state for the UCI, or (2) instance_id and it's state, or (3) all arguments. + """ +# log.debug( "Changing state - new uci_state: %s, instance_id: %s, i_state: %s" % ( uci_state, instance_id, i_state ) ) + if uci_state is not None: + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.state = uci_state + uci.flush() + if ( instance_id is not None ) and ( i_state is not None ): + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=instance_id).first() + instance.state = i_state + instance.flush() + + def set_mi( self, i_index, mi_id ): + """ + Sets Machine Image (MI), e.g., 'ami-66fa190f', for UCI's instance with given index as it + is stored in local Galaxy database. + """ + mi = model.CloudImage.filter( model.CloudImage.c.image_id==mi_id ).first() + instance = model.CloudInstance.get( i_index ) + instance.image = mi + instance.flush() + + def set_key_pair( self, key_name, key_material=None ): + """ + Sets key pair value for current UCI. + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.key_pair_name = key_name + if key_material is not None: + uci.key_pair_material = key_material + uci.flush() + + def set_launch_time( self, launch_time, i_index=None, i_id=None ): + """ + Stores launch time in local database for instance with specified index - i_index (as it is stored in local + Galaxy database) or with specified instance ID - i_id (as obtained from the cloud provider AND stored + in local Galaxy Database). Either 'i_index' or 'i_id' needs to be provided. + """ + if i_index != None: + instance = model.CloudInstance.get( i_index ) + instance.launch_time = launch_time + instance.flush() + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + instance.launch_time = launch_time + instance.flush() + + def set_uci_launch_time( self, launch_time ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.launch_time = launch_time + uci.flush() + + def set_stop_time( self, stop_time, i_index=None, i_id=None ): + """ + Stores stop time in local database for instance with specified index - i_index (as it is stored in local + Galaxy database) or with specified instance ID - i_id (as obtained from the cloud provider AND stored + in local Galaxy Database). Either 'i_index' or 'i_id' needs to be provided. + """ + if i_index != None: + instance = model.CloudInstance.get( i_index ) + instance.stop_time = stop_time + instance.flush() + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + instance.stop_time = stop_time + instance.flush() + + def reset_uci_launch_time( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.launch_time = None + uci.flush() + + def set_security_group_name( self, security_group_name, i_index=None, i_id=None ): + """ + Stores security group name in local database for instance with specified index - i_index (as it is stored in local + Galaxy database) or with specified instance ID - i_id (as obtained from the cloud provider AND stored + in local Galaxy Database). Either 'i_index' or 'i_id' needs to be provided. + """ + if i_index != None: + instance = model.CloudInstance.get( i_index ) + instance.security_group = security_group_name + instance.flush() + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + instance.security_group = security_group_name + instance.flush() + + def set_reservation_id( self, i_index, reservation_id ): + instance = model.CloudInstance.get( i_index ) + instance.reservation_id = reservation_id + instance.flush() + + def set_instance_id( self, i_index, instance_id ): + """ + i_index refers to UCI's instance ID as stored in local database + instance_id refers to real-world, cloud resource ID (e.g., 'i-78hd823a') + """ + instance = model.CloudInstance.get( i_index ) + instance.instance_id = instance_id + instance.flush() + + def set_public_dns( self, instance_id, public_dns ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.instance[instance_id].public_dns = public_dns + uci.instance[instance_id].flush() + + def set_private_dns( self, instance_id, private_dns ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.instance[instance_id].private_dns = private_dns + uci.instance[instance_id].flush() + + def set_store_device( self, store_id, device ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].device = device + uci.store[store_id].flush() + + def set_store_error( self, error, store_index=None, store_id=None ): + if store_index != None: + store = model.CloudStore.get( store_index ) + elif store_id != None: + store = model.CloudStore.filter_by( volume_id = store_id ).first() + else: + return None + + store.error = error + store.flush() + + def set_store_status( self, vol_id, status ): + vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first() + vol.status = status + vol.flush() + + def set_snapshot_id( self, snap_index, id ): + snap = model.CloudSnapshot.get( snap_index ) + snap.snapshot_id = id + snap.flush() + + def set_snapshot_status( self, status, snap_index=None, snap_id=None ): + if snap_index != None: + snap = model.CloudSnapshot.get( snap_index ) + elif snap_id != None: + snap = model.CloudSnapshot.filter_by( snapshot_id = snap_id).first() + else: + return + snap.status = status + snap.flush() + + def set_snapshot_error( self, error, snap_index=None, snap_id=None, set_status=False ): + if snap_index != None: + snap = model.CloudSnapshot.get( snap_index ) + elif snap_id != None: + snap = model.CloudSnapshot.filter_by( snapshot_id = snap_id).first() + else: + return + snap.error = error + + if set_status: + snap.status = snapshot_status.ERROR + + snap.flush() + + def set_store_availability_zone( self, availability_zone, vol_id=None ): + """ + Sets availability zone of storage volumes for either ALL volumes associated with current + UCI or for the volume whose volume ID (e.g., 'vol-39F80512') is provided as argument. + """ + if vol_id is not None: + vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).all() + else: + vol = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() + + for v in vol: + v.availability_zone = availability_zone + v.flush() + + def set_store_volume_id( self, store_id, volume_id ): + """ + Given store ID associated with this UCI, set volume ID as it is registered + on the cloud provider (e.g., vol-39890501) + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.store[store_id].volume_id = volume_id + uci.store[store_id].flush() + + def set_store_instance( self, vol_id, instance_id ): + """ + Stores instance ID that given store volume is attached to. Store volume ID should + be given in following format: 'vol-78943248' + """ + vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first() + vol.i_id = instance_id + vol.flush() + + def set_error( self, error, set_state=False ): + """ + Sets error field of given UCI in local Galaxy database as well as any instances associated with + this UCI whose state is 'None' or 'SUBMITTED'. If set_state is set to 'true', + method also sets state of give UCI and corresponding instances to 'error' + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.error = error + if set_state: + uci.state = uci_states.ERROR + instances = model.CloudInstance \ + .filter_by( uci=uci ) \ + .filter( or_( model.CloudInstance.c.state==None, model.CloudInstance.c.state==instance_states.SUBMITTED ) ) \ + .all() + for i in instances: + i.error = error + i.state = instance_states.ERROR + i.flush() + uci.flush() + + def set_deleted( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + uci.state = uci_states.DELETED # for bookkeeping reasons, mark as deleted but don't actually delete. + uci.deleted = True + uci.flush() + + # --------- Getter methods ----------------- + + def get_provider_type( self ): + """ Returns type of cloud provider associated with given UCI. """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() +# cred_id = uci.credentials_id +# cred = model.CloudUserCredentials.get( cred_id ) + return uci.credentials.provider.type + + def get_type( self, i_index ): + instance = model.CloudInstance.get( i_index ) + return instance.type + + def get_state( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.state + + def get_instances_indexes( self, state=None ): + """ + Returns indexes of instances associated with given UCI as they are stored in local Galaxy database and + whose state corresponds to passed argument. Returned values enable indexing instances from local Galaxy database. + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + instances = model.CloudInstance.filter_by( uci=uci ).filter( model.CloudInstance.c.state==state ).all() + il = [] + for i in instances: + il.append( i.id ) + + return il + + def get_instance_state( self, instance_id ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].state + + def get_instances_ids( self ): + """ + Returns list IDs of all instances' associated with this UCI that are not in 'terminated' state + (e.g., ['i-402906D2', 'i-q0290dsD2'] ). + """ + il = model.CloudInstance.filter_by( uci_id=self.uci_id ).filter( model.CloudInstance.c.state != 'terminated' ).all() + instanceList = [] + for i in il: + instanceList.append( i.instance_id ) + return instanceList + + def get_name( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.name + + def get_key_pair_name( self ): + """ + Returns keypair name associated with given UCI. + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.key_pair_name + + def get_key_pair_material( self ): + """ + Returns keypair material (i.e., private key) associated with given UCI. + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.key_pair_material + + def get_security_group_name( self, i_index=None, i_id=None ): + """ + Given EITHER instance index as it is stored in local Galaxy database OR instance ID as it is + obtained from cloud provider and stored in local Galaxy database, return security group name associated + with given instance. + """ + if i_index != None: + instance = model.CloudInstance.get( i_index ) + return instance.security_group + elif i_id != None: + instance = model.CloudInstance.filter_by( uci_id=self.uci_id, instance_id=i_id).first() + return instance.security_group + + def get_access_key( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.credentials.access_key + + def get_secret_key( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.credentials.secret_key + + def get_mi_id( self, instance_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].mi_id + + def get_public_dns( self, instance_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].public_dns + + def get_private_dns( self, instance_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.instance[instance_id].private_dns + + def get_uci_availability_zone( self ): + """ + Returns UCI's availability zone. + Because all of storage volumes associated with a given UCI must be in the same + availability zone, availability of a UCI is determined by availability zone of + any one storage volume. + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.store[0].availability_zone + + def get_store_size( self, store_id=0 ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.store[store_id].size + + def get_store_volume_id( self, store_id=0 ): + """ + Given store ID associated with this UCI, get volume ID as it is registered + on the cloud provider (e.g., 'vol-39890501') + """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.store[store_id].volume_id + + def get_all_stores( self ): + """ Returns all storage volumes' database objects associated with this UCI. """ + return model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all() + + def get_snapshots( self, status=None ): + """ Returns database objects for all snapshots associated with this UCI and in given status.""" + return model.CloudSnapshot.filter_by( uci_id=self.uci_id, status=status ).all() + + def get_uci( self ): + """ Returns database object for given UCI. """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci + + def get_provider( self ): + """ Returns database object of cloud provider associated with credentials of given UCI. """ + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.credentials.provider + + def uci_launch_time_set( self ): + uci = model.UCI.get( self.uci_id ) + uci.refresh() + return uci.launch_time + +class CloudProvider( object ): + def __init__( self, app ): + import providers.eucalyptus + import providers.ec2 + + self.app = app + self.cloud_provider = {} + self.cloud_provider["eucalyptus"] = providers.eucalyptus.EucalyptusCloudProvider( app ) + self.cloud_provider["ec2"] = providers.ec2.EC2CloudProvider( app ) + + def put( self, uci_wrapper ): + """ Put given request for UCI manipulation into provider's request queue.""" + self.cloud_provider[uci_wrapper.get_provider_type()].put( uci_wrapper ) + + def update( self ): + """ + Runs a global status update across all providers for all UCIs in state other than 'terminated' and 'available'. + Reason behind this method is to sync state of local DB and real world resources. + """ + for provider in self.cloud_provider.keys(): +# log.debug( "Running global update for provider: '%s'" % provider ) + self.cloud_provider[provider].update() + + def shutdown( self ): + for runner in self.cloud_provider.itervalues(): + runner.shutdown() + +class NoopCloudMonitor( object ): + """ + Implements the CloudMonitor interface but does nothing + """ + def put( self, *args ): + return + def shutdown( self ): + return + diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/cloud/providers/ec2.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/cloud/providers/ec2.py Thu Nov 12 16:36:07 2009 -0500 @@ -0,0 +1,940 @@ +import subprocess, threading, os, errno, time, datetime +from Queue import Queue, Empty +from datetime import datetime + +from galaxy import model # Database interaction class +from galaxy.model import mapping +from galaxy.datatypes.data import nice_size +from galaxy.util.bunch import Bunch +from galaxy.cloud import UCIwrapper +from Queue import Queue +from sqlalchemy import or_, and_ + +import galaxy.eggs +galaxy.eggs.require("boto") +from boto.ec2.connection import EC2Connection +from boto.ec2.regioninfo import RegionInfo +import boto.exception +import boto + +import logging +log = logging.getLogger( __name__ ) + +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + CREATING = "creating", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + DELETED = "deleted", + SNAPSHOT_UCI = "snapshotUCI", + SNAPSHOT = "snapshot" +) + +instance_states = Bunch( + TERMINATED = "terminated", + SUBMITTED = "submitted", + RUNNING = "running", + PENDING = "pending", + SHUTTING_DOWN = "shutting-down", + ERROR = "error" +) + +store_status = Bunch( + IN_USE = "in-use", + CREATING = "creating", + DELETED = 'deleted', + ERROR = "error" +) + +snapshot_status = Bunch( + SUBMITTED = 'submitted', + PENDING = 'pending', + COMPLETED = 'completed', + DELETE = 'delete', + DELETED= 'deleted', + ERROR = "error" +) + +class EC2CloudProvider( object ): + """ + Amazon EC2-based cloud provider implementation for managing instances. + """ + STOP_SIGNAL = object() + def __init__( self, app ): + self.type = "ec2" # cloud provider type (e.g., ec2, eucalyptus, opennebula) + self.zone = "us-east-1a" + self.security_group = "galaxyWeb" + self.queue = Queue() + + self.threads = [] + nworkers = 5 + log.info( "Starting EC2 cloud controller workers..." ) + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.threads.append( worker ) + log.debug( "%d EC2 cloud workers ready", nworkers ) + + def run_next( self ): + """Run the next job, waiting until one is available if necessary""" + cnt = 0 + while 1: + + uci_wrapper = self.queue.get() + uci_state = uci_wrapper.get_state() + if uci_state is self.STOP_SIGNAL: + return + try: + if uci_state==uci_states.NEW: + self.createUCI( uci_wrapper ) + elif uci_state==uci_states.DELETING: + self.deleteUCI( uci_wrapper ) + elif uci_state==uci_states.SUBMITTED: + self.startUCI( uci_wrapper ) + elif uci_state==uci_states.SHUTTING_DOWN: + self.stopUCI( uci_wrapper ) + elif uci_state==uci_states.SNAPSHOT: + self.snapshotUCI( uci_wrapper ) + except: + log.exception( "Uncaught exception executing cloud request." ) + cnt += 1 + + def get_connection( self, uci_wrapper ): + """ + Establishes EC2 cloud connection using user's credentials associated with given UCI + """ + log.debug( 'Establishing %s cloud connection.' % self.type ) + provider = uci_wrapper.get_provider() + try: + region = RegionInfo( None, provider.region_name, provider.region_endpoint ) + except Exception, ex: + err = "Selecting region with cloud provider failed: " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + return None + try: + conn = EC2Connection( aws_access_key_id=uci_wrapper.get_access_key(), + aws_secret_access_key=uci_wrapper.get_secret_key(), + is_secure=provider.is_secure, + region=region, + path=provider.path ) + except boto.exception.EC2ResponseError, e: + err = "Establishing connection with cloud failed: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + return None + + return conn + + def check_key_pair( self, uci_wrapper, conn ): + """ + Generate key pair using user's credentials + """ + kp = None + kp_name = uci_wrapper.get_name().replace(' ','_') + "_kp" + log.debug( "Checking user's key pair: '%s'" % kp_name ) + try: + kp = conn.get_key_pair( kp_name ) + uci_kp_name = uci_wrapper.get_key_pair_name() + uci_material = uci_wrapper.get_key_pair_material() + if kp != None: + if kp.name != uci_kp_name or uci_material == None: + # key pair exists on the cloud but not in local database, so re-generate it (i.e., delete and then create) + try: + conn.delete_key_pair( kp_name ) + kp = self.create_key_pair( conn, kp_name ) + uci_wrapper.set_key_pair( kp.name, kp.material ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while deleting key pair: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + try: + kp = self.create_key_pair( conn, kp_name ) + uci_wrapper.set_key_pair( kp.name, kp.material ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while creating key pair: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + except Exception, ex: + err = "Exception while creating key pair: " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + except boto.exception.EC2ResponseError, e: # No keypair under this name exists so create it + if e.code == 'InvalidKeyPair.NotFound': + log.info( "No keypair found, creating keypair '%s'" % kp_name ) + kp = self.create_key_pair( conn, kp_name ) + uci_wrapper.set_key_pair( kp.name, kp.material ) + else: + err = "EC2 response error while retrieving key pair: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + + if kp != None: + return kp.name + else: + return None + + def create_key_pair( self, conn, kp_name ): + try: + return conn.create_key_pair( kp_name ) + except boto.exception.EC2ResponseError, e: + return None + + def get_mi_id( self, uci_wrapper, i_index ): + """ + Get appropriate machine image (mi) based on instance size. + """ + i_type = uci_wrapper.get_type( i_index ) + if i_type=='m1.small' or i_type=='c1.medium': + arch = 'i386' + else: + arch = 'x86_64' + + mi = model.CloudImage.filter_by( deleted=False, provider_type=self.type, architecture=arch ).first() + if mi: + return mi.image_id + else: + err = "Machine image could not be retrieved" + log.error( "%s for UCI '%s'." % (err, uci_wrapper.get_name() ) ) + uci_wrapper.set_error( err+". Contact site administrator to ensure needed machine image is registered.", True ) + return None + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads in EC2 cloud manager" ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) + log.info( "EC2 cloud manager stopped" ) + + def put( self, uci_wrapper ): + # Get rid of UCI from state description + state = uci_wrapper.get_state() + uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) + self.queue.put( uci_wrapper ) + + def createUCI( self, uci_wrapper ): + """ + Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider + and registers relevant information in Galaxy database. + """ + conn = self.get_connection( uci_wrapper ) + if uci_wrapper.get_uci_availability_zone()=='': + log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone ) + uci_wrapper.set_store_availability_zone( self.zone ) + + log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_uci_availability_zone() ) + # Because only 1 storage volume may be created at UCI config time, index of this storage volume in local Galaxy DB w.r.t + # current UCI is 0, so reference it in following methods + vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone(), snapshot=None ) + uci_wrapper.set_store_volume_id( 0, vol.id ) + + # Wait for a while to ensure volume was created +# vol_status = vol.status +# for i in range( 30 ): +# if vol_status is not "available": +# log.debug( 'Updating volume status; current status: %s' % vol_status ) +# vol_status = vol.status +# time.sleep(3) +# if i is 29: +# log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) ) +# conn.delete_volume( vol.id ) +# uci_wrapper.change_state( uci_state='error' ) +# return + + # Retrieve created volume again to get updated status + try: + vl = conn.get_all_volumes( [vol.id] ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while retrieving (i.e., updating status) of just created storage volume '" + vol.id + "': " + str( e ) + log.error( err ) + uci_wrapper.set_store_status( vol.id, uci_states.ERROR ) + uci_wrapper.set_error( err, True ) + return + except Exception, ex: + err = "Error while retrieving (i.e., updating status) of just created storage volume '" + vol.id + "': " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + return + + if len( vl ) > 0: + uci_wrapper.change_state( uci_state=vl[0].status ) + uci_wrapper.set_store_status( vol.id, vl[0].status ) + else: + err = "Volume '" + vol.id +"' not found by EC2 after being created." + log.error( err ) + uci_wrapper.set_store_status( vol.id, uci_states.ERROR ) + uci_wrapper.set_error( err, True ) + + def deleteUCI( self, uci_wrapper ): + """ + Deletes UCI. NOTE that this implies deletion of any and all data associated + with this UCI from the cloud. All data will be deleted. + """ + conn = self.get_connection( uci_wrapper ) + vl = [] # volume list + count = 0 # counter for checking if all volumes assoc. w/ UCI were deleted + + # Get all volumes assoc. w/ UCI, delete them from cloud as well as in local DB + vl = uci_wrapper.get_all_stores() + deletedList = [] + failedList = [] + for v in vl: + log.debug( "Deleting volume with id='%s'" % v.volume_id ) + try: + if conn.delete_volume( v.volume_id ): + deletedList.append( v.volume_id ) + v.deleted = True + v.flush() + count += 1 + else: + failedList.append( v.volume_id ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while deleting storage volume '" + v.volume_id + "': " + str( e ) + log.error( err ) + uci_wrapper.set_store_error( err, store_id = v.volume_id ) + uci_wrapper.set_error( err, True ) + + # Delete UCI if all of associated + if count == len( vl ): + uci_wrapper.set_deleted() + else: + err = "Deleting following volume(s) failed: "+failedList+". However, these volumes were successfully deleted: "+deletedList+". \ + MANUAL intervention and processing needed." + log.error( err ) + uci_wrapper.set_error( err, True ) + + def snapshotUCI( self, uci_wrapper ): + """ + Creates snapshot of all storage volumes associated with this UCI. + """ + if uci_wrapper.get_state() != uci_states.ERROR: + conn = self.get_connection( uci_wrapper ) + + snapshots = uci_wrapper.get_snapshots( status = snapshot_status.SUBMITTED ) + for snapshot in snapshots: + log.debug( "Snapshot DB id: '%s', volume id: '%s'" % ( snapshot.id, snapshot.store.volume_id ) ) + try: + snap = conn.create_snapshot( volume_id=snapshot.store.volume_id ) + snap_id = str( snap ).split(':')[1] + uci_wrapper.set_snapshot_id( snapshot.id, snap_id ) + sh = conn.get_all_snapshots( snap_id ) # get updated status + uci_wrapper.set_snapshot_status( status=sh[0].status, snap_id=snap_id ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while creating snapshot: " + str( e ) + log.error( err ) + uci_wrapper.set_snapshot_error( error=err, snap_index=snapshot.id, set_status=True ) + uci_wrapper.set_error( err, True ) + return + except Exception, ex: + err = "Error while creating snapshot: " + str( ex ) + log.error( err ) + uci_wrapper.set_snapshot_error( error=err, snap_index=snapshot.id, set_status=True ) + uci_wrapper.set_error( err, True ) + return + + uci_wrapper.change_state( uci_state=uci_states.AVAILABLE ) + + def addStorageToUCI( self, name ): + """ Adds more storage to specified UCI + TODO""" + + def dummyStartUCI( self, uci_wrapper ): + + uci = uci_wrapper.get_uci() + log.debug( "Would be starting instance '%s'" % uci.name ) + uci_wrapper.change_state( uci_state.PENDING ) +# log.debug( "Sleeping a bit... (%s)" % uci.name ) +# time.sleep(20) +# log.debug( "Woke up! (%s)" % uci.name ) + + def startUCI( self, uci_wrapper ): + """ + Starts instance(s) of given UCI on the cloud. + """ + if uci_wrapper.get_state() != uci_states.ERROR: + conn = self.get_connection( uci_wrapper ) + self.check_key_pair( uci_wrapper, conn ) + if uci_wrapper.get_key_pair_name() == None: + err = "Key pair not found" + log.error( "%s for UCI '%s'." % ( err, uci_wrapper.get_name() ) ) + uci_wrapper.set_error( err + ". Try resetting the state and starting the instance again.", True ) + return + + i_indexes = uci_wrapper.get_instances_indexes( state=instance_states.SUBMITTED ) # Get indexes of i_indexes associated with this UCI that are in 'submitted' state + log.debug( "Starting instances with IDs: '%s' associated with UCI '%s' " % ( i_indexes, uci_wrapper.get_name(), ) ) + if len( i_indexes ) > 0: + for i_index in i_indexes: + # Get machine image for current instance + mi_id = self.get_mi_id( uci_wrapper, i_index ) + log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name(): %s" % ( mi_id, uci_wrapper.get_key_pair_name() ) ) + uci_wrapper.set_mi( i_index, mi_id ) + + # Check if galaxy security group exists (and create it if it does not) + log.debug( "Setting up '%s' security group." % self.security_group ) + try: + conn.get_all_security_groups( [self.security_group] ) # security groups + except boto.exception.EC2ResponseError, e: + if e.code == 'InvalidGroup.NotFound': + log.info( "No security group found, creating security group '%s'" % self.security_group ) + try: + gSecurityGroup = conn.create_security_group(self.security_group, 'Security group for Galaxy.') + gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port + gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port + except boto.exception.EC2ResponseError, ee: + err = "EC2 response error while creating security group: " + str( ee ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + err = "EC2 response error while retrieving security group: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + + + if uci_wrapper.get_state() != uci_states.ERROR: + # Start an instance + log.debug( "Starting instance for UCI '%s'" % uci_wrapper.get_name() ) + #TODO: Once multiple volumes can be attached to a single instance, update 'userdata' composition + userdata = uci_wrapper.get_store_volume_id()+"|"+uci_wrapper.get_access_key()+"|"+uci_wrapper.get_secret_key() + log.debug( "Using following command: conn.run_instances( image_id='%s', key_name='%s', security_groups=['%s'], user_data=[OMITTED], instance_type='%s', placement='%s' )" + % ( mi_id, uci_wrapper.get_key_pair_name(), self.security_group, uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) ) + reservation = None + try: + reservation = conn.run_instances( image_id=mi_id, + key_name=uci_wrapper.get_key_pair_name(), + security_groups=[self.security_group], + user_data=userdata, + instance_type=uci_wrapper.get_type( i_index ), + placement=uci_wrapper.get_uci_availability_zone() ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error when starting UCI '"+ uci_wrapper.get_name() +"': " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + except Exception, ex: + err = "Error when starting UCI '" + uci_wrapper.get_name() + "': " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + # Record newly available instance data into local Galaxy database + if reservation: + uci_wrapper.set_launch_time( self.format_time( reservation.instances[0].launch_time ), i_index=i_index ) + if not uci_wrapper.uci_launch_time_set(): + uci_wrapper.set_uci_launch_time( self.format_time( reservation.instances[0].launch_time ) ) + try: + uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) + # TODO: if more than a single instance will be started through single reservation, change this reference to element [0] + i_id = str( reservation.instances[0]).split(":")[1] + uci_wrapper.set_instance_id( i_index, i_id ) + s = reservation.instances[0].state + uci_wrapper.change_state( s, i_id, s ) + uci_wrapper.set_security_group_name( self.security_group, i_id=i_id ) + log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error when retrieving instance information for UCI '" + uci_wrapper.get_name() + "': " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + log.error( "UCI '%s' is in 'error' state, starting instance was aborted." % uci_wrapper.get_name() ) + else: + err = "No instances in state '"+ instance_states.SUBMITTED +"' found for UCI '" + uci_wrapper.get_name() + \ + "'. Nothing to start." + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + log.error( "UCI '%s' is in 'error' state, starting instance was aborted." % uci_wrapper.get_name() ) + + def stopUCI( self, uci_wrapper): + """ + Stops all of cloud instances associated with given UCI. + """ + conn = self.get_connection( uci_wrapper ) + + # Get all instances associated with given UCI + il = uci_wrapper.get_instances_ids() # instance list + rl = conn.get_all_instances( il ) # Reservation list associated with given instances + + # Initiate shutdown of all instances under given UCI + cnt = 0 + stopped = [] + not_stopped = [] + for r in rl: + for inst in r.instances: + log.debug( "Sending stop signal to instance '%s' associated with reservation '%s'." % ( inst, r ) ) + try: + inst.stop() + uci_wrapper.set_stop_time( datetime.utcnow(), i_id=inst.id ) + uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() ) + stopped.append( inst ) + except boto.exception.EC2ResponseError, e: + not_stopped.append( inst ) + err = "EC2 response error when stopping instance '" + inst.instance_id + "': " + str(e) + log.error( err ) + uci_wrapper.set_error( err, True ) + + uci_wrapper.reset_uci_launch_time() + log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() ) + + +# dbInstances = get_instances( trans, uci ) #TODO: handle list! +# +# # Get actual cloud instance object +# cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) +# +# # TODO: Detach persistent storage volume(s) from instance and update volume data in local database +# stores = get_stores( trans, uci ) +# for i, store in enumerate( stores ): +# log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) ) +# mntDevice = store.device +# volStat = None +## Detaching volume does not work with Eucalyptus Public Cloud, so comment it out +## try: +## volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice ) +## except: +## log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id ) +# store.attach_time = None +# store.device = None +# store.i_id = None +# store.status = volStat +# log.debug ( '***** volume status: %s' % volStat ) +# +# +# # Stop the instance and update status in local database +# cloudInstance.stop() +# dbInstances.stop_time = datetime.utcnow() +# while cloudInstance.state != 'terminated': +# log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) +# time.sleep(3) +# cloudInstance.update() +# dbInstances.state = cloudInstance.state +# +# # Reset relevant UCI fields +# uci.state = 'available' +# uci.launch_time = None +# +# # Persist +# session = trans.sa_session +## session.save_or_update( stores ) +# session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable? +# session.save_or_update( uci ) +# session.flush() +# trans.log_event( "User stopped cloud instance '%s'" % uci.name ) +# trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) + + def update( self ): + """ + Runs a global status update on all instances that are in 'running', 'pending', or 'shutting-down' state. + Also, runs update on all storage volumes that are in 'in-use', 'creating', or 'None' state. + Reason behind this method is to sync state of local DB and real-world resources + """ + log.debug( "Running general status update for EC2 UCIs..." ) + # Update instances + instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, + model.CloudInstance.c.state==instance_states.PENDING, + model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all() + for inst in instances: + if self.type == inst.uci.credentials.provider.type: + log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider.type, inst.instance_id ) ) + self.updateInstance( inst ) + + # Update storage volume(s) + stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_status.IN_USE, + model.CloudStore.c.status==store_status.CREATING, + model.CloudStore.c.status==None ) ).all() + for store in stores: + if self.type == store.uci.credentials.provider.type: # and store.volume_id != None: + log.debug( "[%s] Running general status update on store with local database ID: '%s'" % ( store.uci.credentials.provider.type, store.id ) ) + self.updateStore( store ) +# else: +# log.error( "[%s] There exists an entry for UCI (%s) storage volume without an ID. Storage volume might have been created with " +# "cloud provider though. Manual check is recommended." % ( store.uci.credentials.provider.type, store.uci.name ) ) +# store.uci.error = "There exists an entry in local database for a storage volume without an ID. Storage volume might have been created " \ +# "with cloud provider though. Manual check is recommended. After understanding what happened, local database entry for given " \ +# "storage volume should be updated." +# store.status = store_status.ERROR +# store.uci.state = uci_states.ERROR +# store.uci.flush() +# store.flush() + + # Update pending snapshots or delete ones marked for deletion + snapshots = model.CloudSnapshot.filter_by( status=snapshot_status.PENDING, status=snapshot_status.DELETE ).all() + for snapshot in snapshots: + if self.type == snapshot.uci.credentials.provider.type and snapshot.status == snapshot_status.PENDING: + log.debug( "[%s] Running general status update on snapshot '%s'" % ( snapshot.uci.credentials.provider.type, snapshot.snapshot_id ) ) + self.update_snapshot( snapshot ) + elif self.type == snapshot.uci.credentials.provider.type and snapshot.status == snapshot_status.DELETE: + log.debug( "[%s] Initiating deletion of snapshot '%s'" % ( snapshot.uci.credentials.provider.type, snapshot.snapshot_id ) ) + self.delete_snapshot( snapshot ) + + # Attempt at updating any zombie UCIs (i.e., instances that have been in SUBMITTED state for longer than expected - see below for exact time) + zombies = model.UCI.filter_by( state=uci_states.SUBMITTED ).all() + for zombie in zombies: + z_instances = model.CloudInstance.filter_by( uci_id=zombie.id) \ + .filter( or_( model.CloudInstance.c.state != instance_states.TERMINATED, + model.CloudInstance.c.state == None ) ) \ + .all() + for z_inst in z_instances: + if self.type == z_inst.uci.credentials.provider.type: +# log.debug( "z_inst.id: '%s', state: '%s'" % ( z_inst.id, z_inst.state ) ) + td = datetime.utcnow() - z_inst.update_time + if td.seconds > 180: # if instance has been in SUBMITTED state for more than 3 minutes + log.debug( "[%s] Running zombie repair update on instance with DB id '%s'" % ( z_inst.uci.credentials.provider.type, z_inst.id ) ) + self.processZombie( z_inst ) + + def updateInstance( self, inst ): + + # Get credentials associated wit this instance + uci_id = inst.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + # Get reservations handle for given instance + try: + rl= conn.get_all_instances( [inst.instance_id] ) + except boto.exception.EC2ResponseError, e: + err = "Retrieving instance(s) from cloud failed for UCI '"+ uci.name +"' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + return None + + # Because references to reservations are deleted shortly after instances have been terminated, getting an empty list as a response to a query + # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. Until an alternative solution + # is found, below code sets state of given UCI to 'error' to indicate to the user something out of ordinary happened. + if len( rl ) == 0: + err = "Instance ID '"+inst.instance_id+"' was not found by the cloud provider. Instance might have crashed or otherwise been terminated."+ \ + "Manual check is recommended." + log.error( err ) + inst.error = err + uci.error = err + inst.state = instance_states.TERMINATED + uci.state = uci_states.ERROR + uci.launch_time = None + inst.flush() + uci.flush() + # Update instance status in local DB with info from cloud provider + for r in rl: + for i, cInst in enumerate( r.instances ): + try: + s = cInst.update() + log.debug( "Checking state of cloud instance '%s' associated with UCI '%s' and reservation '%s'. State='%s'" % ( cInst, uci.name, r, s ) ) + if s != inst.state: + inst.state = s + inst.flush() + # After instance has shut down, ensure UCI is marked as 'available' + if s == instance_states.TERMINATED and uci.state != uci_states.ERROR: + uci.state = uci_states.AVAILABLE + uci.launch_time = None + uci.flush() + # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. + if s != uci.state and s != instance_states.TERMINATED: + uci.state = s + uci.flush() + if cInst.public_dns_name != inst.public_dns: + inst.public_dns = cInst.public_dns_name + inst.flush() + if cInst.private_dns_name != inst.private_dns: + inst.private_dns = cInst.private_dns_name + inst.flush() + except boto.exception.EC2ResponseError, e: + err = "Updating instance status from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + return None + + def updateStore( self, store ): + # Get credentials associated wit this store + uci_id = store.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + # Get reservations handle for given store + try: + vl = conn.get_all_volumes( [store.volume_id] ) + except boto.exception.EC2ResponseError, e: + err = "Retrieving volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + return None + + # Update store status in local DB with info from cloud provider + if len(vl) > 0: + try: + if store.status != vl[0].status: + # In case something failed during creation of UCI but actual storage volume was created and yet + # UCI state remained as 'new', try to remedy this by updating UCI state here + if ( store.status == None ) and ( store.volume_id != None ): + uci.state = vl[0].status + uci.flush() + # If UCI was marked in state 'CREATING', update its status to reflect new status + elif ( uci.state == uci_states.CREATING ): + uci.state = vl[0].status + uci.flush() + + store.status = vl[0].status + store.flush() + if store.i_id != vl[0].instance_id: + store.i_id = vl[0].instance_id + store.flush() + if store.attach_time != vl[0].attach_time: + store.attach_time = vl[0].attach_time + store.flush() + if store.device != vl[0].device: + store.device = vl[0].device + store.flush() + except boto.exception.EC2ResponseError, e: + err = "Updating status of volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + return None + else: + err = "No storage volumes returned by cloud provider on general update" + log.error( "%s for UCI '%s'" % ( err, uci.name ) ) + store.status = store_status.ERROR + store.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + store.flush() + + def updateSnapshot( self, snapshot ): + # Get credentials associated wit this store + uci_id = snapshot.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + try: + log.debug( "Updating status of snapshot '%s'" % snapshot.snapshot_id ) + snap = conn.get_all_snapshots( [snapshot.snapshot_id] ) + if len( snap ) > 0: + log.debug( "Snapshot '%s' status: %s" % ( snapshot.snapshot_id, snap[0].status ) ) + snapshot.status = snap[0].status + snapshot.flush() + else: + err = "No snapshots returned by EC2 on general update" + log.error( "%s for UCI '%s'" % ( err, uci.name ) ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while updating snapshot status: " + str( e ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + except Exception, ex: + err = "Error while updating snapshot status: " + str( ex ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + + def delete_snapshot( self, snapshot ): + if snapshot.status == snapshot_status.DELETE: + # Get credentials associated wit this store + uci_id = snapshot.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + try: + log.debug( "Deleting snapshot '%s'" % snapshot.snapshot_id ) + snap = conn.delete_snapshot( snapshot.snapshot_id ) + if snap == True: + snapshot.deleted = True + snapshot.status = snapshot_status.DELETED + snapshot.flush() + return snap + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while deleting snapshot: " + str( e ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + except Exception, ex: + err = "Error while deleting snapshot: " + str( ex ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + else: + err = "Cannot delete snapshot '"+snapshot.snapshot_id+"' because its status is '"+snapshot.status+"'. Only snapshots with '" + \ + snapshot_status.COMPLETED+"' status can be deleted." + log.error( err ) + snapshot.error = err + snapshot.flush() + + def processZombie( self, inst ): + """ + Attempt at discovering if starting an instance was successful but local database was not updated + accordingly or if something else failed and instance was never started. Currently, no automatic + repairs are being attempted; instead, appropriate error messages are set. + """ + # Check if any instance-specific information was written to local DB; if 'yes', set instance and UCI's error message + # suggesting manual check. + if inst.launch_time != None or inst.reservation_id != None or inst.instance_id != None: + # Try to recover state - this is best-case effort, so if something does not work immediately, not + # recovery steps are attempted. Recovery is based on hope that instance_id is available in local DB; if not, + # report as error. + # Fields attempting to be recovered are: reservation_id, instance status, and launch_time + if inst.instance_id != None: + conn = self.get_connection_from_uci( uci ) + rl = conn.get_all_instances( [inst.instance_id] ) # reservation list + # Update local DB with relevant data from instance + if inst.reservation_id == None: + try: + inst.reservation_id = str(rl[0]).split(":")[1] + except: # something failed, so skip + pass + + try: + state = rl[0].instances[0].update() + inst.state = state + inst.uci.state = state + inst.flush() + inst.uci.flush() + except: # something failed, so skip + pass + + if inst.launch_time == None: + try: + launch_time = self.format_time( rl[0].instances[0].launch_time ) + inst.launch_time = launch_time + inst.flush() + if inst.uci.launch_time == None: + inst.uci.launch_time = launch_time + inst.uci.flush() + except: # something failed, so skip + pass + else: + err = "Starting a machine instance (DB id: '"+str(inst.id)+"') associated with this UCI '" + str(inst.uci.name) + \ + "' seems to have failed. Because it appears that cloud instance might have gotten started, manual check is recommended." + inst.error = err + inst.state = instance_states.ERROR + inst.uci.error = err + inst.uci.state = uci_states.ERROR + log.error( err ) + inst.flush() + inst.uci.flush() + + else: #Instance most likely never got processed, so set error message suggesting user to try starting instance again. + err = "Starting a machine instance (DB id: '"+str(inst.id)+"') associated with this UCI '" + str(inst.uci.name) + \ + "' seems to have failed. Because it appears that cloud instance never got started, it should be safe to reset state and try " \ + "starting the instance again." + inst.error = err + inst.state = instance_states.ERROR + inst.uci.error = err + inst.uci.state = uci_states.ERROR + log.error( err ) + inst.flush() + inst.uci.flush() +# uw = UCIwrapper( inst.uci ) +# log.debug( "Try automatically re-submitting UCI '%s'." % uw.get_name() ) + + def get_connection_from_uci( self, uci ): + """ + Establishes and returns connection to cloud provider. Information needed to do so is obtained + directly from uci database object. + """ + log.debug( 'Establishing %s cloud connection' % self.type ) + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Get connection + try: + region = RegionInfo( None, uci.credentials.provider.region_name, uci.credentials.provider.region_endpoint ) + conn = EC2Connection( aws_access_key_id=a_key, + aws_secret_access_key=s_key, + is_secure=uci.credentials.provider.is_secure, + region=region, + path=uci.credentials.provider.path ) + except boto.exception.EC2ResponseError, e: + err = "Establishing connection with cloud failed: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + return None + + return conn + +# def updateUCI( self, uci ): +# """ +# Runs a global status update on all storage volumes and all instances that are +# associated with specified UCI +# """ +# conn = self.get_connection( uci ) +# +# # Update status of storage volumes +# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() +# vols = [] +# for v in vl: +# vols.append( v.volume_id ) +# try: +# volumes = conn.get_all_volumes( vols ) +# for i, v in enumerate( volumes ): +# uci.store[i].i_id = v.instance_id +# uci.store[i].status = v.status +# uci.store[i].device = v.device +# uci.store[i].flush() +# except: +# log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name ) +# pass +# +# # Update status of instances +# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() +# instanceList = [] +# for i in il: +# instanceList.append( i.instance_id ) +# log.debug( 'instanceList: %s' % instanceList ) +# try: +# reservations = conn.get_all_instances( instanceList ) +# for i, r in enumerate( reservations ): +# uci.instance[i].state = r.instances[0].update() +# log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) ) +# uci.state = uci.instance[i].state +# uci.instance[i].public_dns = r.instances[0].dns_name +# uci.instance[i].private_dns = r.instances[0].private_dns_name +# uci.instance[i].flush() +# uci.flush() +# except: +# log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name ) +# pass + + # --------- Helper methods ------------ + + def format_time( self, time ): + dict = {'T':' ', 'Z':''} + for i, j in dict.iteritems(): + time = time.replace(i, j) + return time + \ No newline at end of file diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/cloud/providers/eucalyptus.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Thu Nov 12 16:36:07 2009 -0500 @@ -0,0 +1,923 @@ +import subprocess, threading, os, errno, time, datetime +from Queue import Queue, Empty +from datetime import datetime + +from galaxy import model # Database interaction class +from galaxy.model import mapping +from galaxy.datatypes.data import nice_size +from galaxy.util.bunch import Bunch +from galaxy.cloud import UCIwrapper +from Queue import Queue +from sqlalchemy import or_, and_ + +import galaxy.eggs +galaxy.eggs.require("boto") +from boto.ec2.connection import EC2Connection +from boto.ec2.regioninfo import RegionInfo +import boto.exception +import boto + +import logging +log = logging.getLogger( __name__ ) + +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + CREATING = "creating", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + DELETED = "deleted", + SNAPSHOT_UCI = "snapshotUCI", + SNAPSHOT = "snapshot" +) + +instance_states = Bunch( + TERMINATED = "terminated", + SUBMITTED = "submitted", + RUNNING = "running", + PENDING = "pending", + SHUTTING_DOWN = "shutting-down", + ERROR = "error" +) + +store_status = Bunch( + IN_USE = "in-use", + CREATING = "creating", + DELETED = 'deleted', + ERROR = "error" +) + +snapshot_status = Bunch( + SUBMITTED = 'submitted', + PENDING = 'pending', + COMPLETED = 'completed', + DELETE = 'delete', + DELETED= 'deleted', + ERROR = "error" +) + +class EucalyptusCloudProvider( object ): + """ + Eucalyptus-based cloud provider implementation for managing instances. + """ + STOP_SIGNAL = object() + def __init__( self, app ): + self.type = "eucalyptus" # cloud provider type (e.g., ec2, eucalyptus, opennebula) + self.zone = "epc" + self.queue = Queue() + + self.threads = [] + nworkers = 5 + log.info( "Starting eucalyptus cloud controller workers..." ) + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.threads.append( worker ) + log.debug( "%d eucalyptus cloud workers ready", nworkers ) + + def run_next( self ): + """Run the next job, waiting until one is available if necessary""" + cnt = 0 + while 1: + uci_wrapper = self.queue.get() + uci_state = uci_wrapper.get_state() + if uci_state is self.STOP_SIGNAL: + return + try: + if uci_state==uci_states.NEW: + self.createUCI( uci_wrapper ) + elif uci_state==uci_states.DELETING: + self.deleteUCI( uci_wrapper ) + elif uci_state==uci_states.SUBMITTED: + self.startUCI( uci_wrapper ) + #self.dummyStartUCI( uci_wrapper ) + elif uci_state==uci_states.SHUTTING_DOWN: + self.stopUCI( uci_wrapper ) + elif uci_state==uci_states.SNAPSHOT: + self.snapshotUCI( uci_wrapper ) + except: + log.exception( "Uncaught exception executing cloud request." ) + cnt += 1 + + def get_connection( self, uci_wrapper ): + """ + Establishes eucalyptus cloud connection using user's credentials associated with given UCI + """ + log.debug( 'Establishing %s cloud connection.' % self.type ) + provider = uci_wrapper.get_provider() + try: + region = RegionInfo( None, provider.region_name, provider.region_endpoint ) + except Exception, ex: + err = "Selecting region with cloud provider failed: " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + return None + try: + conn = EC2Connection( aws_access_key_id=uci_wrapper.get_access_key(), + aws_secret_access_key=uci_wrapper.get_secret_key(), + is_secure=provider.is_secure, + port=provider.port, + region=region, + path=provider.path ) + except boto.exception.EC2ResponseError, e: + err = "Establishing connection with cloud failed: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + return None + + return conn + + def check_key_pair( self, uci_wrapper, conn ): + """ + Generate key pair using user's credentials + """ + kp = None + kp_name = uci_wrapper.get_name().replace(' ','_') + "_kp" + log.debug( "Checking user's key pair: '%s'" % kp_name ) + try: + kp = conn.get_key_pair( kp_name ) + uci_kp_name = uci_wrapper.get_key_pair_name() + uci_material = uci_wrapper.get_key_pair_material() + if kp != None: + if kp.name != uci_kp_name or uci_material == None: + # key pair exists on the cloud but not in local database, so re-generate it (i.e., delete and then create) + try: + conn.delete_key_pair( kp_name ) + kp = self.create_key_pair( conn, kp_name ) + uci_wrapper.set_key_pair( kp.name, kp.material ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while deleting key pair: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + try: + kp = self.create_key_pair( conn, kp_name ) + uci_wrapper.set_key_pair( kp.name, kp.material ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while creating key pair: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + except Exception, ex: + err = "Exception while creating key pair: " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + except boto.exception.EC2ResponseError, e: # No keypair under this name exists so create it + if e.code == 'InvalidKeyPair.NotFound': + log.info( "No keypair found, creating keypair '%s'" % kp_name ) + kp = self.create_key_pair( conn, kp_name ) + uci_wrapper.set_key_pair( kp.name, kp.material ) + else: + err = "EC2 response error while retrieving key pair: " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + + if kp != None: + return kp.name + else: + return None + + def create_key_pair( self, conn, kp_name ): + try: + return conn.create_key_pair( kp_name ) + except boto.exception.EC2ResponseError, e: + return None + + def get_mi_id( self, uci_wrapper, i_index ): + """ + Get appropriate machine image (mi) based on instance size. + """ + i_type = uci_wrapper.get_type( i_index ) + if i_type=='m1.small' or i_type=='c1.medium': + arch = 'i386' + else: + arch = 'x86_64' + + mi = model.CloudImage.filter_by( deleted=False, provider_type=self.type, architecture=arch ).first() + if mi: + return mi.image_id + else: + err = "Machine image could not be retrieved" + log.error( "%s for UCI '%s'." % (err, uci_wrapper.get_name() ) ) + uci_wrapper.set_error( err+". Contact site administrator to ensure needed machine image is registered.", True ) + return None + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads in eucalyptus cloud manager" ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) + log.info( "eucalyptus cloud manager stopped" ) + + def put( self, uci_wrapper ): + # Get rid of UCI from state description + state = uci_wrapper.get_state() + uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing) + self.queue.put( uci_wrapper ) + + def createUCI( self, uci_wrapper ): + """ + Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider + and registers relevant information in Galaxy database. + """ + conn = self.get_connection( uci_wrapper ) + + # Because only 1 storage volume may be created at UCI config time, index of this storage volume in local Galaxy DB w.r.t + # current UCI is 0; therefore, it can be referenced in following code + log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_uci_availability_zone() ) + if uci_wrapper.get_uci_availability_zone()=='': + log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone ) + uci_wrapper.set_store_availability_zone( self.zone ) + + log.debug( "Creating volume; using command: conn.create_volume( %s, '%s', snapshot=None )" % ( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone() )) + vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone(), snapshot=None ) + uci_wrapper.set_store_volume_id( 0, vol.id ) + + # Retrieve created volume again to get updated status + try: + vl = conn.get_all_volumes( [vol.id] ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while retrieving (i.e., updating status) of just created storage volume '" + vol.id + "': " + str( e ) + log.error( err ) + uci_wrapper.set_store_status( vol.id, uci_states.ERROR ) + uci_wrapper.set_error( err, True ) + return + except Exception, ex: + err = "Error while retrieving (i.e., updating status) of just created storage volume '" + vol.id + "': " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + return + + if len( vl ) > 0: + # EPC does not allow creation of storage volumes (it deletes one as soon as it is created, so manually set uci_state here) + if vl[0].status == store_status.DELETED: + uci_wrapper.change_state( uci_state=uci_states.AVAILABLE ) + else: + uci_wrapper.change_state( uci_state=vl[0].status ) + uci_wrapper.set_store_status( vol.id, vl[0].status ) + else: + err = "Volume '" + vol.id +"' not found by EC2 after being created." + log.error( err ) + uci_wrapper.set_store_status( vol.id, uci_states.ERROR ) + uci_wrapper.set_error( err, True ) + + def deleteUCI( self, uci_wrapper ): + """ + Deletes UCI. NOTE that this implies deletion of any and all data associated + with this UCI from the cloud. All data will be deleted. + """ + conn = self.get_connection( uci_wrapper ) + vl = [] # volume list + count = 0 # counter for checking if all volumes assoc. w/ UCI were deleted + + # Get all volumes assoc. w/ UCI, delete them from cloud as well as in local DB + vl = uci_wrapper.get_all_stores() + deletedList = [] + failedList = [] + for v in vl: + log.debug( "Deleting volume with id='%s'" % v.volume_id ) + try: + if conn.delete_volume( v.volume_id ): + deletedList.append( v.volume_id ) + v.deleted = True + v.flush() + count += 1 + else: + failedList.append( v.volume_id ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while deleting storage volume '" + v.volume_id + "': " + str( e ) + log.error( err ) + uci_wrapper.set_store_error( err, store_id = v.volume_id ) + uci_wrapper.set_error( err, True ) + + # Delete UCI if all of associated + if count == len( vl ): + uci_wrapper.set_deleted() + else: + err = "Deleting following volume(s) failed: "+failedList+". However, these volumes were successfully deleted: "+deletedList+". \ + MANUAL intervention and processing needed." + log.error( err ) + uci_wrapper.set_error( err, True ) + + def snapshotUCI( self, uci_wrapper ): + """ + Creates snapshot of all storage volumes associated with this UCI. + """ + if uci_wrapper.get_state() != uci_states.ERROR: + conn = self.get_connection( uci_wrapper ) + + snapshots = uci_wrapper.get_snapshots( status = snapshot_status.SUBMITTED ) + for snapshot in snapshots: + log.debug( "Snapshot DB id: '%s', volume id: '%s'" % ( snapshot.id, snapshot.store.volume_id ) ) + try: + snap = conn.create_snapshot( volume_id=snapshot.store.volume_id ) + snap_id = str( snap ).split(':')[1] + uci_wrapper.set_snapshot_id( snapshot.id, snap_id ) + sh = conn.get_all_snapshots( snap_id ) # get updated status + uci_wrapper.set_snapshot_status( status=sh[0].status, snap_id=snap_id ) + except boto.exception.EC2ResponseError, e: + err = "Cloud provider response error while creating snapshot: " + str( e ) + log.error( err ) + uci_wrapper.set_snapshot_error( error=err, snap_index=snapshot.id, set_status=True ) + uci_wrapper.set_error( err, True ) + return + except Exception, ex: + err = "Error while creating snapshot: " + str( ex ) + log.error( err ) + uci_wrapper.set_snapshot_error( error=err, snap_index=snapshot.id, set_status=True ) + uci_wrapper.set_error( err, True ) + return + + uci_wrapper.change_state( uci_state=uci_states.AVAILABLE ) + +# if uci_wrapper.get_state() != uci_states.ERROR: +# +# snapshots = uci_wrapper.get_snapshots( status = 'submitted' ) +# for snapshot in snapshots: +# uci_wrapper.set_snapshot_id( snapshot.id, None, 'euca_error' ) +# +# log.debug( "Eucalyptus snapshot attempted by user for UCI '%s'" % uci_wrapper.get_name() ) +# uci_wrapper.set_error( "Eucalyptus does not support creation of snapshots at this moment. No snapshot or other changes were performed. \ +# Feel free to resent state of this instance and use it normally.", True ) + + + def addStorageToUCI( self, uci_wrapper ): + """ Adds more storage to specified UCI """ + + def dummyStartUCI( self, uci_wrapper ): + + uci = uci_wrapper.get_uci() + log.debug( "Would be starting instance '%s'" % uci.name ) +# uci_wrapper.change_state( uci_states.SUBMITTED_UCI ) +# log.debug( "Set UCI state to SUBMITTED_UCI" ) + log.debug( "Sleeping a bit... (%s)" % uci.name ) + time.sleep(10) + log.debug( "Woke up! (%s)" % uci.name ) + + def startUCI( self, uci_wrapper ): + """ + Starts instance(s) of given UCI on the cloud. + """ + if uci_wrapper.get_state() != uci_states.ERROR: + conn = self.get_connection( uci_wrapper ) + self.check_key_pair( uci_wrapper, conn ) + if uci_wrapper.get_key_pair_name() == None: + err = "Key pair not found" + log.error( "%s for UCI '%s'." % ( err, uci_wrapper.get_name() ) ) + uci_wrapper.set_error( err + ". Try resetting the state and starting the instance again.", True ) + return + + i_indexes = uci_wrapper.get_instances_indexes( state=instance_states.SUBMITTED ) # Get indexes of i_indexes associated with this UCI that are in 'submitted' state + log.debug( "Starting instances with IDs: '%s' associated with UCI '%s' " % ( i_indexes, uci_wrapper.get_name(), ) ) + if len( i_indexes ) > 0: + for i_index in i_indexes: + # Get machine image for current instance + mi_id = self.get_mi_id( uci_wrapper, i_index ) + log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name(): %s" % ( mi_id, uci_wrapper.get_key_pair_name() ) ) + uci_wrapper.set_mi( i_index, mi_id ) + + if uci_wrapper.get_state() != uci_states.ERROR: + # Start an instance + log.debug( "Starting UCI instance '%s'" % uci_wrapper.get_name() ) + log.debug( "Using following command: conn.run_instances( image_id='%s', key_name='%s', instance_type='%s' )" + % ( mi_id, uci_wrapper.get_key_pair_name(), uci_wrapper.get_type( i_index ) ) ) + reservation = None + try: + reservation = conn.run_instances( image_id=mi_id, + key_name=uci_wrapper.get_key_pair_name(), + instance_type=uci_wrapper.get_type( i_index ) ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error when starting UCI '"+ uci_wrapper.get_name() +"': " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + except Exception, ex: + err = "Error when starting UCI '" + uci_wrapper.get_name() + "': " + str( ex ) + log.error( err ) + uci_wrapper.set_error( err, True ) + # Record newly available instance data into local Galaxy database + if reservation: + uci_wrapper.set_launch_time( self.format_time( reservation.instances[0].launch_time ), i_index=i_index ) + if not uci_wrapper.uci_launch_time_set(): + uci_wrapper.set_uci_launch_time( self.format_time( reservation.instances[0].launch_time ) ) + try: + uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] ) + # TODO: if more than a single instance will be started through single reservation, change this reference from element [0] + i_id = str( reservation.instances[0]).split(":")[1] + uci_wrapper.set_instance_id( i_index, i_id ) + s = reservation.instances[0].state + uci_wrapper.change_state( s, i_id, s ) + log.debug( "Instance of UCI '%s' started, current state: '%s'" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) ) + except boto.exception.EC2ResponseError, e: + err = "EC2 response error when retrieving instance information for UCI '" + uci_wrapper.get_name() + "': " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + log.error( "UCI '%s' is in 'error' state, starting instance was aborted." % uci_wrapper.get_name() ) + else: + err = "No instances in state '"+ instance_states.SUBMITTED +"' found for UCI '" + uci_wrapper.get_name() + \ + "'. Nothing to start." + log.error( err ) + uci_wrapper.set_error( err, True ) + else: + log.error( "UCI '%s' is in 'error' state, starting instance was aborted." % uci_wrapper.get_name() ) + + def stopUCI( self, uci_wrapper): + """ + Stops all of cloud instances associated with given UCI. + """ + conn = self.get_connection( uci_wrapper ) + + # Get all instances associated with given UCI + il = uci_wrapper.get_instances_ids() # instance list + log.debug( 'List of instances being terminated: %s' % il ) + rl = conn.get_all_instances( il ) # Reservation list associated with given instances + + # Initiate shutdown of all instances under given UCI + cnt = 0 + stopped = [] + not_stopped = [] + for r in rl: + for inst in r.instances: + log.debug( "Sending stop signal to instance '%s' associated with reservation '%s' (UCI: %s)." % ( inst, r, uci_wrapper.get_name() ) ) + try: + inst.stop() + uci_wrapper.set_stop_time( datetime.utcnow(), i_id=inst.id ) + uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() ) + stopped.append( inst ) + except boto.exception.EC2ResponseError, e: + not_stopped.append( inst ) + err = "EC2 response error when stopping instance '" + inst.instance_id + "': " + str( e ) + log.error( err ) + uci_wrapper.set_error( err, True ) + + uci_wrapper.reset_uci_launch_time() + log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() ) + +# dbInstances = get_instances( trans, uci ) #TODO: handle list! +# +# # Get actual cloud instance object +# cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) +# +# # TODO: Detach persistent storage volume(s) from instance and update volume data in local database +# stores = get_stores( trans, uci ) +# for i, store in enumerate( stores ): +# log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) ) +# mntDevice = store.device +# volStat = None +## Detaching volume does not work with Eucalyptus Public Cloud, so comment it out +## try: +## volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice ) +## except: +## log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id ) +# store.attach_time = None +# store.device = None +# store.i_id = None +# store.status = volStat +# log.debug ( '***** volume status: %s' % volStat ) +# +# # Stop the instance and update status in local database +# cloudInstance.stop() +# dbInstances.stop_time = datetime.utcnow() +# while cloudInstance.state != 'terminated': +# log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) +# time.sleep(3) +# cloudInstance.update() +# dbInstances.state = cloudInstance.state +# +# # Reset relevant UCI fields +# uci.state = 'available' +# uci.launch_time = None +# +# # Persist +# session = trans.sa_session +## session.save_or_update( stores ) +# session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable? +# session.save_or_update( uci ) +# session.flush() +# trans.log_event( "User stopped cloud instance '%s'" % uci.name ) +# trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) + + def update( self ): + """ + Runs a global status update on all instances that are in 'running', 'pending', or 'shutting-down' state. + Also, runs update on all storage volumes that are in 'in-use', 'creating', or 'None' state. + Reason behind this method is to sync state of local DB and real-world resources + """ + log.debug( "Running general status update for EPC UCIs..." ) + # Update instances + instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, + model.CloudInstance.c.state==instance_states.PENDING, + model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all() + for inst in instances: + if self.type == inst.uci.credentials.provider.type: + log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider.type, inst.instance_id ) ) + self.updateInstance( inst ) + + # Update storage volume(s) + stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_status.IN_USE, + model.CloudStore.c.status==store_status.CREATING, + model.CloudStore.c.status==None ) ).all() + for store in stores: + if self.type == store.uci.credentials.provider.type: # and store.volume_id != None: + log.debug( "[%s] Running general status update on store with local database ID: '%s'" % ( store.uci.credentials.provider.type, store.id ) ) + self.updateStore( store ) +# else: +# log.error( "[%s] There exists an entry for UCI (%s) storage volume without an ID. Storage volume might have been created with " +# "cloud provider though. Manual check is recommended." % ( store.uci.credentials.provider.type, store.uci.name ) ) +# store.uci.error = "There exists an entry in local database for a storage volume without an ID. Storage volume might have been created " \ +# "with cloud provider though. Manual check is recommended. After understanding what happened, local database entry for given " \ +# "storage volume should be updated." +# store.status = store_status.ERROR +# store.uci.state = uci_states.ERROR +# store.uci.flush() +# store.flush() + + # Update pending snapshots or delete ones marked for deletion + snapshots = model.CloudSnapshot.filter_by( status=snapshot_status.PENDING, status=snapshot_status.DELETE ).all() + for snapshot in snapshots: + if self.type == snapshot.uci.credentials.provider.type and snapshot.status == snapshot_status.PENDING: + log.debug( "[%s] Running general status update on snapshot '%s'" % ( snapshot.uci.credentials.provider.type, snapshot.snapshot_id ) ) + self.update_snapshot( snapshot ) + elif self.type == snapshot.uci.credentials.provider.type and snapshot.status == snapshot_status.DELETE: + log.debug( "[%s] Initiating deletion of snapshot '%s'" % ( snapshot.uci.credentials.provider.type, snapshot.snapshot_id ) ) + self.delete_snapshot( snapshot ) + + # Attempt at updating any zombie UCIs (i.e., instances that have been in SUBMITTED state for longer than expected - see below for exact time) + zombies = model.UCI.filter_by( state=uci_states.SUBMITTED ).all() + for zombie in zombies: + log.debug( "zombie UCI: %s" % zombie.name ) + z_instances = model.CloudInstance \ + .filter_by( uci_id=zombie.id, state=None ) \ + .all() + for z_inst in z_instances: + if self.type == z_inst.uci.credentials.provider.type: +# log.debug( "z_inst.id: '%s', state: '%s'" % ( z_inst.id, z_inst.state ) ) + td = datetime.utcnow() - z_inst.update_time + log.debug( "z_inst.id: %s, time delta is %s sec" % ( z_inst.id, td.seconds ) ) + if td.seconds > 180: # if instance has been in SUBMITTED state for more than 3 minutes + log.debug( "[%s](td=%s) Running zombie repair update on instance with DB id '%s'" % ( z_inst.uci.credentials.provider.type, td.seconds, z_inst.id ) ) + self.processZombie( z_inst ) + + def updateInstance( self, inst ): + + # Get credentials associated wit this instance + uci_id = inst.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + # Get reservations handle for given instance + try: + rl= conn.get_all_instances( [inst.instance_id] ) + except boto.exception.EC2ResponseError, e: + err = "Retrieving instance(s) from cloud failed for UCI '"+ uci.name +"' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + return None + + # Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query + # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. Until alternative solution + # is found, below code sets state of given UCI to 'error' to indicate to the user something out of ordinary happened. + if len( rl ) == 0: + err = "Instance ID '"+inst.instance_id+"' was not found by the cloud provider. Instance might have crashed or otherwise been terminated."+ \ + "Manual check is recommended." + log.error( err ) + inst.error = err + uci.error = err + inst.state = instance_states.TERMINATED + uci.state = uci_states.ERROR + uci.launch_time = None + inst.flush() + uci.flush() + # Update instance status in local DB with info from cloud provider + for r in rl: + for i, cInst in enumerate( r.instances ): + try: + s = cInst.update() + log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) ) + if s != inst.state: + inst.state = s + inst.flush() + # After instance has shut down, ensure UCI is marked as 'available' + if s == instance_states.TERMINATED and uci.state != uci_states.ERROR: + uci.state = uci_states.AVAILABLE + uci.launch_time = None + uci.flush() + # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed. + if s != uci.state and s != instance_states.TERMINATED: + uci.state = s + uci.flush() + if cInst.public_dns_name != inst.public_dns: + inst.public_dns = cInst.public_dns_name + inst.flush() + if cInst.private_dns_name != inst.private_dns: + inst.private_dns = cInst.private_dns_name + inst.flush() + except boto.exception.EC2ResponseError, e: + err = "Updating instance status from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + return None + + def updateStore( self, store ): + # Get credentials associated wit this store + uci_id = store.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + try: + vl = conn.get_all_volumes( [store.volume_id] ) + except boto.exception.EC2ResponseError, e: + err = "Retrieving volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + return None + + # Update store status in local DB with info from cloud provider + if len(vl) > 0: + try: + if store.status != vl[0].status: + # In case something failed during creation of UCI but actual storage volume was created and yet + # UCI state remained as 'new', try to remedy this by updating UCI state here + if ( store.status == None ) and ( store.volume_id != None ): + uci.state = vl[0].status + uci.flush() + # If UCI was marked in state 'CREATING', update its status to reflect new status + elif ( uci.state == uci_states.CREATING ): + # Because Eucalyptus Public Cloud (EPC) deletes volumes immediately after they are created, artificially + # set status of given UCI to 'available' based on storage volume's availability zone (i.e., it's residing + # in EPC as opposed to some other Eucalyptus based cloud that allows creation of storage volumes. + if store.availability_zone == 'epc': + uci.state = uci_states.AVAILABLE + else: + uci.state = vl[0].status + uci.flush() + + store.status = vl[0].status + store.flush() + if store.i_id != vl[0].instance_id: + store.i_id = vl[0].instance_id + store.flush() + if store.attach_time != vl[0].attach_time: + store.attach_time = vl[0].attach_time + store.flush() + if store.device != vl[0].device: + store.device = vl[0].device + store.flush() + except boto.exception.EC2ResponseError, e: + err = "Updating status of volume(s) from cloud failed for UCI '"+ uci.name + "' during general status update: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + return None + else: + err = "No storage volumes returned by cloud provider on general update" + log.error( "%s for UCI '%s'" % ( err, uci.name ) ) + store.status = store_status.ERROR + store.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + store.flush() + + def updateSnapshot( self, snapshot ): + # Get credentials associated wit this store + uci_id = snapshot.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + try: + log.debug( "Updating status of snapshot '%s'" % snapshot.snapshot_id ) + snap = conn.get_all_snapshots( [snapshot.snapshot_id] ) + if len( snap ) > 0: + log.debug( "Snapshot '%s' status: %s" % ( snapshot.snapshot_id, snap[0].status ) ) + snapshot.status = snap[0].status + snapshot.flush() + else: + err = "No snapshots returned by cloud provider on general update" + log.error( "%s for UCI '%s'" % ( err, uci.name ) ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + except boto.exception.EC2ResponseError, e: + err = "Cloud provider response error while updating snapshot status: " + str( e ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + except Exception, ex: + err = "Error while updating snapshot status: " + str( ex ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + + def delete_snapshot( self, snapshot ): + if snapshot.status == snapshot_status.DELETE: + # Get credentials associated wit this store + uci_id = snapshot.uci_id + uci = model.UCI.get( uci_id ) + uci.refresh() + conn = self.get_connection_from_uci( uci ) + + try: + log.debug( "Deleting snapshot '%s'" % snapshot.snapshot_id ) + snap = conn.delete_snapshot( snapshot.snapshot_id ) + if snap == True: + snapshot.deleted = True + snapshot.status = snapshot_status.DELETED + snapshot.flush() + return snap + except boto.exception.EC2ResponseError, e: + err = "EC2 response error while deleting snapshot: " + str( e ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + except Exception, ex: + err = "Error while deleting snapshot: " + str( ex ) + log.error( err ) + snapshot.status = snapshot_status.ERROR + snapshot.error = err + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + snapshot.flush() + else: + err = "Cannot delete snapshot '"+snapshot.snapshot_id+"' because its status is '"+snapshot.status+"'. Only snapshots with '" + \ + snapshot_status.COMPLETED+"' status can be deleted." + log.error( err ) + snapshot.error = err + snapshot.flush() + + def processZombie( self, inst ): + """ + Attempt at discovering if starting an instance was successful but local database was not updated + accordingly or if something else failed and instance was never started. Currently, no automatic + repairs are being attempted; instead, appropriate error messages are set. + """ + # Check if any instance-specific information was written to local DB; if 'yes', set instance and UCI's error message + # suggesting manual check. + if inst.launch_time != None or inst.reservation_id != None or inst.instance_id != None: + # Try to recover state - this is best-case effort, so if something does not work immediately, not + # recovery steps are attempted. Recovery is based on hope that instance_id is available in local DB; if not, + # report as error. + # Fields attempting to be recovered are: reservation_id, instance status, and launch_time + if inst.instance_id != None: + conn = self.get_connection_from_uci( inst.uci ) + rl = conn.get_all_instances( [inst.instance_id] ) # reservation list + # Update local DB with relevant data from instance + if inst.reservation_id == None: + try: + inst.reservation_id = str(rl[0]).split(":")[1] + except: # something failed, so skip + pass + + try: + state = rl[0].instances[0].update() + inst.state = state + inst.uci.state = state + inst.flush() + inst.uci.flush() + except: # something failed, so skip + pass + + if inst.launch_time == None: + try: + launch_time = self.format_time( rl[0].instances[0].launch_time ) + inst.launch_time = launch_time + inst.flush() + if inst.uci.launch_time == None: + inst.uci.launch_time = launch_time + inst.uci.flush() + except: # something failed, so skip + pass + else: + err = "Starting a machine instance (DB id: '"+str(inst.id)+"') associated with this UCI '" + str(inst.uci.name) + \ + "' seems to have failed. Because it appears that cloud instance might have gotten started, manual check is recommended." + inst.error = err + inst.state = instance_states.ERROR + inst.uci.error = err + inst.uci.state = uci_states.ERROR + log.error( err ) + inst.flush() + inst.uci.flush() + + else: #Instance most likely never got processed, so set error message suggesting user to try starting instance again. + err = "Starting a machine instance (DB id: '"+str(inst.id)+"') associated with this UCI '" + str(inst.uci.name) + \ + "' seems to have failed. Because it appears that cloud instance never got started, it should be safe to reset state and try " \ + "starting the instance again." + inst.error = err + inst.state = instance_states.ERROR + inst.uci.error = err + inst.uci.state = uci_states.ERROR + log.error( err ) + inst.flush() + inst.uci.flush() +# uw = UCIwrapper( inst.uci ) +# log.debug( "Try automatically re-submitting UCI '%s'." % uw.get_name() ) + + def get_connection_from_uci( self, uci ): + """ + Establishes and returns connection to cloud provider. Information needed to do so is obtained + directly from uci database object. + """ + log.debug( 'Establishing %s cloud connection.' % self.type ) + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Get connection + try: + region = RegionInfo( None, uci.credentials.provider.region_name, uci.credentials.provider.region_endpoint ) + conn = EC2Connection( aws_access_key_id=a_key, + aws_secret_access_key=s_key, + is_secure=uci.credentials.provider.is_secure, + port=uci.credentials.provider.port, + region=region, + path=uci.credentials.provider.path ) + except boto.exception.EC2ResponseError, e: + err = "Establishing connection with cloud failed: " + str( e ) + log.error( err ) + uci.error = err + uci.state = uci_states.ERROR + uci.flush() + return None + + return conn + +# def updateUCI( self, uci ): +# """ +# Runs a global status update on all storage volumes and all instances that are +# associated with specified UCI +# """ +# conn = self.get_connection( uci ) +# +# # Update status of storage volumes +# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all() +# vols = [] +# for v in vl: +# vols.append( v.volume_id ) +# try: +# volumes = conn.get_all_volumes( vols ) +# for i, v in enumerate( volumes ): +# uci.store[i].i_id = v.instance_id +# uci.store[i].status = v.status +# uci.store[i].device = v.device +# uci.store[i].flush() +# except: +# log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name ) +# pass +# +# # Update status of instances +# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all() +# instanceList = [] +# for i in il: +# instanceList.append( i.instance_id ) +# log.debug( 'instanceList: %s' % instanceList ) +# try: +# reservations = conn.get_all_instances( instanceList ) +# for i, r in enumerate( reservations ): +# uci.instance[i].state = r.instances[0].update() +# log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) ) +# uci.state = uci.instance[i].state +# uci.instance[i].public_dns = r.instances[0].dns_name +# uci.instance[i].private_dns = r.instances[0].private_dns_name +# uci.instance[i].flush() +# uci.flush() +# except: +# log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name ) +# pass + + # --------- Helper methods ------------ + + def format_time( self, time ): + dict = {'T':' ', 'Z':''} + for i, j in dict.iteritems(): + time = time.replace(i, j) + return time + \ No newline at end of file diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/config.py --- a/lib/galaxy/config.py Thu Nov 12 15:25:48 2009 -0500 +++ b/lib/galaxy/config.py Thu Nov 12 16:36:07 2009 -0500 @@ -113,6 +113,13 @@ except ConfigParser.NoSectionError: self.tool_runners = [] self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' ) + # Cloud configuration options + self.cloud_controller_instance = string_as_bool( kwargs.get( 'cloud_controller_instance', 'False' ) ) + self.cloud_provider = kwargs.get( 'cloud_provider', None ) + if self.cloud_controller_instance: + self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'True' ) ) + else: + self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'False' ) ) def get( self, key, default ): return self.config_dict.get( key, default ) def get_bool( self, key, default ): diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py Thu Nov 12 15:25:48 2009 -0500 +++ b/lib/galaxy/model/__init__.py Thu Nov 12 16:36:07 2009 -0500 @@ -38,6 +38,7 @@ self.username = None # Relationships self.histories = [] + self.credentials = [] def set_password_cleartext( self, cleartext ): """Set 'self.password' to the digest of 'cleartext'.""" @@ -1049,7 +1050,60 @@ def __init__( self, galaxy_session, history ): self.galaxy_session = galaxy_session self.history = history + +class CloudImage( object ): + def __init__( self ): + self.id = None + self.instance_id = None + self.state = None +class UCI( object ): + def __init__( self ): + self.id = None + self.user = None + +class CloudInstance( object ): + def __init__( self ): + self.id = None + self.user = None + self.name = None + self.instance_id = None + self.mi = None + self.state = None + self.public_dns = None + self.availability_zone = None + +class CloudStore( object ): + def __init__( self ): + self.id = None + self.volume_id = None + self.i_id = None + self.user = None + self.size = None + self.availability_zone = None + +class CloudSnapshot( object ): + def __init__( self ): + self.id = None + self.user = None + self.store_id = None + self.snapshot_id = None + +class CloudProvider( object ): + def __init__( self ): + self.id = None + self.user = None + self.type = None + +class CloudUserCredentials( object ): + def __init__( self ): + self.id = None + self.user = None + self.name = None + self.accessKey = None + self.secretKey = None + self.credentials = [] + class StoredWorkflow( object ): def __init__( self ): self.id = None diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py Thu Nov 12 15:25:48 2009 -0500 +++ b/lib/galaxy/model/mapping.py Thu Nov 12 16:36:07 2009 -0500 @@ -390,6 +390,117 @@ Column( "session_id", Integer, ForeignKey( "galaxy_session.id" ), index=True ), Column( "history_id", Integer, ForeignKey( "history.id" ), index=True ) ) +# *************************** Start cloud tables*********************************** +CloudImage.table = Table( "cloud_image", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "provider_type", TEXT ), + Column( "image_id", TEXT, nullable=False ), + Column( "manifest", TEXT ), + Column( "state", TEXT ), + Column( "architecture", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +""" UserConfiguredInstance (UCI) table """ +UCI.table = Table( "cloud_uci", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "credentials_id", Integer, ForeignKey( "cloud_user_credentials.id" ), index=True ), + Column( "key_pair_name", TEXT ), + Column( "key_pair_material", TEXT ), + Column( "name", TEXT ), + Column( "state", TEXT ), + Column( "error", TEXT ), + Column( "total_size", Integer ), + Column( "launch_time", DateTime ), + Column( "deleted", Boolean, default=False ) ) + +CloudInstance.table = Table( "cloud_instance", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "launch_time", DateTime ), + Column( "stop_time", DateTime ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "uci_id", Integer, ForeignKey( "cloud_uci.id" ), index=True ), + Column( "type", TEXT ), + Column( "reservation_id", TEXT ), + Column( "instance_id", TEXT ), + Column( "mi_id", TEXT, ForeignKey( "cloud_image.image_id" ), index=True ), + Column( "state", TEXT ), + Column( "error", TEXT ), + Column( "public_dns", TEXT ), + Column( "private_dns", TEXT ), + Column( "security_group", TEXT ), + Column( "availability_zone", TEXT ) ) + +CloudStore.table = Table( "cloud_store", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "attach_time", DateTime ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "uci_id", Integer, ForeignKey( "cloud_uci.id" ), index=True, nullable=False ), + Column( "volume_id", TEXT ), + Column( "size", Integer, nullable=False ), + Column( "availability_zone", TEXT ), + Column( "i_id", TEXT, ForeignKey( "cloud_instance.instance_id" ) ), + Column( "status", TEXT ), + Column( "device", TEXT ), + Column( "space_consumed", Integer ), + Column( "error", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +CloudSnapshot.table = Table( "cloud_snapshot", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "uci_id", Integer, ForeignKey( "cloud_uci.id" ), index=True ), + Column( "store_id", Integer, ForeignKey( "cloud_store.id" ), index=True, nullable=False ), + Column( "snapshot_id", TEXT ), + Column( "status", TEXT ), + Column( "description", TEXT ), + Column( "error", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +CloudUserCredentials.table = Table( "cloud_user_credentials", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "provider_id", Integer, ForeignKey( "cloud_provider.id" ), index=True, nullable=False ), + Column( "name", TEXT ), + Column( "access_key", TEXT ), + Column( "secret_key", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +CloudProvider.table = Table( "cloud_provider", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "type", TEXT, nullable=False ), + Column( "name", TEXT ), + Column( "region_connection", TEXT ), + Column( "region_name", TEXT ), + Column( "region_endpoint", TEXT ), + Column( "is_secure", Boolean ), + Column( "host", TEXT ), + Column( "port", Integer ), + Column( "proxy", TEXT ), + Column( "proxy_port", TEXT ), + Column( "proxy_user", TEXT ), + Column( "proxy_pass", TEXT ), + Column( "debug", Integer ), + Column( "https_connection_factory", TEXT ), + Column( "path", TEXT ), + Column( "deleted", Boolean, default=False ) ) +# *************************** End cloud tables*********************************** + StoredWorkflow.table = Table( "stored_workflow", metadata, Column( "id", Integer, primary_key=True ), Column( "create_time", DateTime, default=now ), @@ -1004,6 +1115,42 @@ output_step=relation( WorkflowStep, backref="output_connections", cascade="all", primaryjoin=( WorkflowStepConnection.table.c.output_step_id == WorkflowStep.table.c.id ) ) ) ) +# vvvvvvvvvvvvvvvv Start cloud table mappings vvvvvvvvvvvvvvvv +assign_mapper( context, CloudImage, CloudImage.table ) + +assign_mapper( context, UCI, UCI.table, + properties=dict( user=relation( User ), + credentials=relation( CloudUserCredentials ), + instance=relation( CloudInstance, backref='uci' ), + store=relation( CloudStore, backref='uci', cascade='all, delete-orphan' ), + snapshot=relation( CloudSnapshot, backref='uci' ) + ) ) + +assign_mapper( context, CloudInstance, CloudInstance.table, + properties=dict( user=relation( User ), + image=relation( CloudImage ) + ) ) + +assign_mapper( context, CloudStore, CloudStore.table, + properties=dict( user=relation( User ), + i=relation( CloudInstance ), + snapshot=relation( CloudSnapshot, backref="store" ) + ) ) + +assign_mapper( context, CloudSnapshot, CloudSnapshot.table, + properties=dict( user=relation( User ) + ) ) + +assign_mapper( context, CloudProvider, CloudProvider.table, + properties=dict( user=relation( User ) + ) ) + +assign_mapper( context, CloudUserCredentials, CloudUserCredentials.table, + properties=dict( user=relation( User), + provider=relation( CloudProvider ) + ) ) +# ^^^^^^^^^^^^^^^ End cloud table mappings ^^^^^^^^^^^^^^^^^^ + assign_mapper( context, StoredWorkflow, StoredWorkflow.table, properties=dict( user=relation( User ), workflows=relation( Workflow, backref='stored_workflow', diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/model/migrate/versions/0026_cloud_tables.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/model/migrate/versions/0026_cloud_tables.py Thu Nov 12 16:36:07 2009 -0500 @@ -0,0 +1,152 @@ +from sqlalchemy import * +from migrate import * + +import datetime +now = datetime.datetime.utcnow + +# Need our custom types, but don't import anything else from model +from galaxy.model.custom_types import * + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData( migrate_engine ) + +def display_migration_details(): + print + print "========================================" + print "This script adds tables needed for Galaxy cloud functionality." + print "========================================" + +CloudImage_table = Table( "cloud_image", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "provider_type", TEXT ), + Column( "image_id", TEXT, nullable=False ), + Column( "manifest", TEXT ), + Column( "state", TEXT ), + Column( "architecture", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +""" UserConfiguredInstance (UCI) table """ +UCI_table = Table( "cloud_uci", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "credentials_id", Integer, ForeignKey( "cloud_user_credentials.id" ), index=True ), + Column( "key_pair_name", TEXT ), + Column( "key_pair_material", TEXT ), + Column( "name", TEXT ), + Column( "state", TEXT ), + Column( "error", TEXT ), + Column( "total_size", Integer ), + Column( "launch_time", DateTime ), + Column( "deleted", Boolean, default=False ) ) + +CloudInstance_table = Table( "cloud_instance", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "launch_time", DateTime ), + Column( "stop_time", DateTime ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "uci_id", Integer, ForeignKey( "cloud_uci.id" ), index=True ), + Column( "type", TEXT ), + Column( "reservation_id", TEXT ), + Column( "instance_id", TEXT ), + Column( "mi_id", TEXT, ForeignKey( "cloud_image.image_id" ), index=True ), + Column( "state", TEXT ), + Column( "error", TEXT ), + Column( "public_dns", TEXT ), + Column( "private_dns", TEXT ), + Column( "security_group", TEXT ), + Column( "availability_zone", TEXT ) ) + +CloudStore_table = Table( "cloud_store", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "attach_time", DateTime ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "uci_id", Integer, ForeignKey( "cloud_uci.id" ), index=True, nullable=False ), + Column( "volume_id", TEXT ), + Column( "size", Integer, nullable=False ), + Column( "availability_zone", TEXT ), + Column( "i_id", TEXT, ForeignKey( "cloud_instance.instance_id" ) ), + Column( "status", TEXT ), + Column( "device", TEXT ), + Column( "space_consumed", Integer ), + Column( "error", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +CloudSnapshot_table = Table( "cloud_snapshot", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "uci_id", Integer, ForeignKey( "cloud_uci.id" ), index=True ), + Column( "store_id", Integer, ForeignKey( "cloud_store.id" ), index=True, nullable=False ), + Column( "snapshot_id", TEXT ), + Column( "status", TEXT ), + Column( "description", TEXT ), + Column( "error", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +CloudUserCredentials_table = Table( "cloud_user_credentials", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "provider_id", Integer, ForeignKey( "cloud_provider.id" ), index=True, nullable=False ), + Column( "name", TEXT ), + Column( "access_key", TEXT ), + Column( "secret_key", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +CloudProvider_table = Table( "cloud_provider", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), + Column( "type", TEXT, nullable=False ), + Column( "name", TEXT ), + Column( "region_connection", TEXT ), + Column( "region_name", TEXT ), + Column( "region_endpoint", TEXT ), + Column( "is_secure", Boolean ), + Column( "host", TEXT ), + Column( "port", Integer ), + Column( "proxy", TEXT ), + Column( "proxy_port", TEXT ), + Column( "proxy_user", TEXT ), + Column( "proxy_pass", TEXT ), + Column( "debug", Integer ), + Column( "https_connection_factory", TEXT ), + Column( "path", TEXT ), + Column( "deleted", Boolean, default=False ) ) + +def upgrade(): + display_migration_details() + # Load existing tables + metadata.reflect() + + CloudImage_table.create() + UCI_table.create() + CloudUserCredentials_table.create() + CloudStore_table.create() + CloudSnapshot_table.create() + CloudInstance_table.create() + CloudProvider_table.create() + +def downgrade(): + metadata.reflect() + + CloudImage_table.drop() + CloudInstance_table.drop() + CloudStore_table.drop() + CloudSnapshot_table.drop() + CloudUserCredentials_table.drop() + UCI_table.drop() + CloudProvider_table.drop() \ No newline at end of file diff -r 0984c3800775 -r 7d013eb98022 lib/galaxy/web/controllers/cloud.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/web/controllers/cloud.py Thu Nov 12 16:36:07 2009 -0500 @@ -0,0 +1,1193 @@ +from galaxy.web.base.controller import * + +import pkg_resources +pkg_resources.require( "simplejson" ) +import simplejson +import urllib2 + +from galaxy.tools.parameters import * +from galaxy.tools import DefaultToolState +from galaxy.tools.parameters.grouping import Repeat, Conditional +from galaxy.datatypes.data import Data +from galaxy.util.odict import odict +from galaxy.util.bunch import Bunch +from galaxy.util.topsort import topsort, topsort_levels, CycleError +from galaxy.model.mapping import desc +from galaxy.model.orm import * +from datetime import datetime, timedelta + +pkg_resources.require( "WebHelpers" ) +from webhelpers import * + +# Required for Cloud tab +import galaxy.eggs +galaxy.eggs.require("boto") +from boto.ec2.connection import EC2Connection +from boto.ec2.regioninfo import RegionInfo +from galaxy.cloud import CloudManager +import boto.exception +import boto + +import logging +log = logging.getLogger( __name__ ) + +uci_states = Bunch( + NEW_UCI = "newUCI", + NEW = "new", + CREATING = "creating", + DELETING_UCI = "deletingUCI", + DELETING = "deleting", + SUBMITTED_UCI = "submittedUCI", + SUBMITTED = "submitted", + SHUTTING_DOWN_UCI = "shutting-downUCI", + SHUTTING_DOWN = "shutting-down", + AVAILABLE = "available", + RUNNING = "running", + PENDING = "pending", + ERROR = "error", + DELETED = "deleted", + SNAPSHOT_UCI = "snapshotUCI", + SNAPSHOT = "snapshot" +) +
participants (1)
-
Greg Von Kuster