[hg] galaxy 3067: Added most of the functionality for interactio...
details: http://www.bx.psu.edu/hg/galaxy/rev/7c438fd3cf4a changeset: 3067:7c438fd3cf4a user: Enis Afgan <afgane@gmail.com> date: Fri Oct 02 18:31:32 2009 -0400 description: Added most of the functionality for interaction with Eucalyptus cloud diffstat: lib/galaxy/app.py | 4 +- lib/galaxy/cloud/__init__.py | 262 ++++++++++++--------- lib/galaxy/cloud/providers/eucalyptus.py | 275 +++++++++++++++++++--- lib/galaxy/model/mapping.py | 4 +- lib/galaxy/model/migrate/versions/0014_cloud_tables.py | 16 +- lib/galaxy/web/controllers/cloud.py | 160 ++++++------ 6 files changed, 465 insertions(+), 256 deletions(-) diffs (986 lines): diff -r c9c9adf06e9d -r 7c438fd3cf4a lib/galaxy/app.py --- a/lib/galaxy/app.py Wed Sep 30 17:57:11 2009 -0400 +++ b/lib/galaxy/app.py Fri Oct 02 18:31:32 2009 -0400 @@ -1,6 +1,6 @@ import sys, os, atexit -from galaxy import config, jobs, util, tools, web +from galaxy import config, jobs, util, tools, web, cloud from galaxy.tracks import store from galaxy.web import security import galaxy.model @@ -68,6 +68,8 @@ # FIXME: These are exposed directly for backward compatibility self.job_queue = self.job_manager.job_queue self.job_stop_queue = self.job_manager.job_stop_queue + # Start the cloud manager + self.cloud_manager = cloud.CloudManager( self ) # Track Store self.track_store = store.TrackStoreManager( self.config.track_store_path ) diff -r c9c9adf06e9d -r 7c438fd3cf4a lib/galaxy/cloud/__init__.py --- a/lib/galaxy/cloud/__init__.py Wed Sep 30 17:57:11 2009 -0400 +++ b/lib/galaxy/cloud/__init__.py Fri Oct 02 18:31:32 2009 -0400 @@ -6,7 +6,7 @@ from galaxy.datatypes.tabular import * from galaxy.datatypes.interval import * from galaxy.datatypes import metadata -#from util import Bunch +from galaxy.util.bunch import Bunch import pkg_resources pkg_resources.require( "PasteDeploy" ) @@ -34,7 +34,7 @@ # The dispatcher manager underlying cloud instances self.provider = DefaultCloudProvider( app ) # Monitor for updating status of cloud instances -# self.cloud_monitor = CloudMonitor( self.config, self.provider ) + self.cloud_monitor = CloudMonitor( self.app, self.provider ) # self.job_stop_queue = JobStopQueue( app, self.dispatcher ) else: self.job_queue = self.job_stop_queue = NoopCloudMonitor() @@ -43,32 +43,32 @@ self.cloud_monitor.shutdown() # self.job_stop_queue.shutdown() - def createUCI( self, user, name, storage_size, zone=None): - """ - Createse User Configured Instance (UCI). Essentially, creates storage volume. - """ - self.provider.createUCI( user, name, storage_size, zone ) - - def deleteUCI( self, name ): - """ - Deletes UCI. NOTE that this implies deletion of any and all data associated - with this UCI from the cloud. All data will be deleted. - """ - - def addStorageToUCI( self, name ): - """ Adds more storage to specified UCI """ - - def startUCI( self, name, type ): - """ - Starts an instance of named UCI on the cloud. This implies, mounting of - storage and starting Galaxy instance. - """ - - def stopUCI( self, name ): - """ - Stops cloud instance associated with named UCI. This also implies - stopping of Galaxy and unmounting of the file system. - """ +# def createUCI( self, user, name, storage_size, zone=None): +# """ +# Createse User Configured Instance (UCI). Essentially, creates storage volume. +# """ +# self.provider.createUCI( user, name, storage_size, zone ) +# +# def deleteUCI( self, name ): +# """ +# Deletes UCI. NOTE that this implies deletion of any and all data associated +# with this UCI from the cloud. All data will be deleted. +# """ +# +# def addStorageToUCI( self, name ): +# """ Adds more storage to specified UCI """ +# +# def startUCI( self, name, type ): +# """ +# Starts an instance of named UCI on the cloud. This implies, mounting of +# storage and starting Galaxy instance. +# """ +# +# def stopUCI( self, name ): +# """ +# Stops cloud instance associated with named UCI. This also implies +# stopping of Galaxy and unmounting of the file system. +# """ class Sleeper( object ): """ @@ -149,8 +149,8 @@ while self.running: try: - #self.__monitor_step() - log.debug ( "would be calling monitor_step" ) +# log.debug( "Calling monitor_step" ) + self.__monitor_step() except: log.exception( "Exception in cloud manager monitor_step" ) # Sleep @@ -171,88 +171,110 @@ session = mapping.Session() # Pull all new jobs from the queue at once new_jobs = [] - if self.track_jobs_in_database: - for j in session.query( model.Job ).options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ).filter( model.Job.c.state == model.Job.states.NEW ).all(): - job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self ) - new_jobs.append( job ) - else: - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, tool_id = message - # Create a job wrapper from it - job_entity = session.query( model.Job ).get( job_id ) - job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self ) - # Append to watch queue - new_jobs.append( job ) - except Empty: - pass - # Iterate over new and waiting jobs and look for any that are - # ready to run - new_waiting = [] - for job in ( new_jobs + self.waiting ): - try: - # Clear the session for each job so we get fresh states for - # job and all datasets - session.clear() - # Get the real job entity corresponding to the wrapper (if we - # are tracking in the database this is probably cached in - # the session from the origianl query above) - job_entity = session.query( model.Job ).get( job.job_id ) - # Check the job's dependencies, requeue if they're not done - job_state = self.__check_if_ready_to_run( job, job_entity ) - if job_state == JOB_WAIT: - if not self.track_jobs_in_database: - new_waiting.append( job ) - elif job_state == JOB_ERROR: - log.info( "job %d ended with an error" % job.job_id ) - elif job_state == JOB_INPUT_ERROR: - log.info( "job %d unable to run: one or more inputs in error state" % job.job_id ) - elif job_state == JOB_INPUT_DELETED: - log.info( "job %d unable to run: one or more inputs deleted" % job.job_id ) - elif job_state == JOB_READY: - # If special queuing is enabled, put the ready jobs in the special queue - if self.use_policy : - self.squeue.put( job ) - log.debug( "job %d put in policy queue" % job.job_id ) - else: # or dispatch the job directly - self.dispatcher.put( job ) - log.debug( "job %d dispatched" % job.job_id) - elif job_state == JOB_DELETED: - msg = "job %d deleted by user while still queued" % job.job_id - job.info = msg - log.debug( msg ) - elif job_state == JOB_ADMIN_DELETED: - job.fail( job_entity.info ) - log.info( "job %d deleted by admin while still queued" % job.job_id ) - else: - msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id ) - job.info = msg - log.error( msg ) - except Exception, e: - job.info = "failure running job %d: %s" % ( job.job_id, str( e ) ) - log.exception( "failure running job %d" % job.job_id ) - # Update the waiting list - self.waiting = new_waiting - # If special (e.g. fair) scheduling is enabled, dispatch all jobs - # currently in the special queue - if self.use_policy : - while 1: - try: - sjob = self.squeue.get() - self.dispatcher.put( sjob ) - log.debug( "job %d dispatched" % sjob.job_id ) - except Empty: - # squeue is empty, so stop dispatching - break - except Exception, e: # if something else breaks while dispatching - job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) ) - log.exception( "failure running job %d" % sjob.job_id ) - # Done with the session - mapping.Session.remove() + new_instances = [] + new_UCIs = [] + stop_UCIs = [] + +# for r in session.query( model.cloud_instance ).filter( model.cloud_instance.s.state == model.cloud_instance.states.NEW ).all(): +# new_instances + + for r in session.query( model.UCI ).filter( model.UCI.c.state == "new" ).all(): + new_UCIs.append( r ) + for r in new_UCIs: + self.provider.createUCI( r ) + + for r in session.query( model.UCI ).filter( model.UCI.c.state == "submitted" ).all(): + new_instances.append( r ) + for r in new_instances: + self.provider.startUCI( r ) + + for r in session.query( model.UCI ).filter( model.UCI.c.state == "terminating" ).all(): + stop_UCIs.append( r ) + for r in stop_UCIs: + self.provider.stopUCI( r ) + +# if self.track_jobs_in_database: +# for j in session.query( model.Job ).options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ).filter( model.Job.c.state == model.Job.states.NEW ).all(): +# job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self ) +# new_jobs.append( job ) +# else: +# try: +# while 1: +# message = self.queue.get_nowait() +# if message is self.STOP_SIGNAL: +# return +# # Unpack the message +# job_id, tool_id = message +# # Create a job wrapper from it +# job_entity = session.query( model.Job ).get( job_id ) +# job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self ) +# # Append to watch queue +# new_jobs.append( job ) +# except Empty: +# pass +# # Iterate over new and waiting jobs and look for any that are +# # ready to run +# new_waiting = [] +# for job in ( new_jobs + self.waiting ): +# try: +# # Clear the session for each job so we get fresh states for +# # job and all datasets +# session.clear() +# # Get the real job entity corresponding to the wrapper (if we +# # are tracking in the database this is probably cached in +# # the session from the origianl query above) +# job_entity = session.query( model.Job ).get( job.job_id ) +# # Check the job's dependencies, requeue if they're not done +# job_state = self.__check_if_ready_to_run( job, job_entity ) +# if job_state == JOB_WAIT: +# if not self.track_jobs_in_database: +# new_waiting.append( job ) +# elif job_state == JOB_ERROR: +# log.info( "job %d ended with an error" % job.job_id ) +# elif job_state == JOB_INPUT_ERROR: +# log.info( "job %d unable to run: one or more inputs in error state" % job.job_id ) +# elif job_state == JOB_INPUT_DELETED: +# log.info( "job %d unable to run: one or more inputs deleted" % job.job_id ) +# elif job_state == JOB_READY: +# # If special queuing is enabled, put the ready jobs in the special queue +# if self.use_policy : +# self.squeue.put( job ) +# log.debug( "job %d put in policy queue" % job.job_id ) +# else: # or dispatch the job directly +# self.dispatcher.put( job ) +# log.debug( "job %d dispatched" % job.job_id) +# elif job_state == JOB_DELETED: +# msg = "job %d deleted by user while still queued" % job.job_id +# job.info = msg +# log.debug( msg ) +# elif job_state == JOB_ADMIN_DELETED: +# job.fail( job_entity.info ) +# log.info( "job %d deleted by admin while still queued" % job.job_id ) +# else: +# msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id ) +# job.info = msg +# log.error( msg ) +# except Exception, e: +# job.info = "failure running job %d: %s" % ( job.job_id, str( e ) ) +# log.exception( "failure running job %d" % job.job_id ) +# # Update the waiting list +# self.waiting = new_waiting +# # If special (e.g. fair) scheduling is enabled, dispatch all jobs +# # currently in the special queue +# if self.use_policy : +# while 1: +# try: +# sjob = self.squeue.get() +# self.dispatcher.put( sjob ) +# log.debug( "job %d dispatched" % sjob.job_id ) +# except Empty: +# # squeue is empty, so stop dispatching +# break +# except Exception, e: # if something else breaks while dispatching +# job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) ) +# log.exception( "failure running job %d" % sjob.job_id ) +# # Done with the session +# mapping.Session.remove() def __check_if_ready_to_run( self, job_wrapper, job ): """ @@ -653,12 +675,12 @@ else: log.error( "Unable to start unknown cloud provider: %s" %self.provider_name ) - def createUCI( self, user, uciName, storage_size, zone=None): + def createUCI( self, uci ): """ Createse User Configured Instance (UCI). Essentially, creates storage volume. """ - log.debug( "Creating UCI %s" % uciName ) - self.cloud_provider[self.provider_name].createUCI( user, uciName, storage_size, zone ) + log.debug( "Creating UCI '%s'" % uci.name ) + self.cloud_provider[self.provider_name].createUCI( uci ) def deleteUCI( self, uciName ): """ @@ -669,18 +691,22 @@ def addStorageToUCI( self, uciName ): """ Adds more storage to specified UCI """ - def startUCI( self, uciName, type ): + def startUCI( self, uci ): """ Starts an instance of named UCI on the cloud. This implies, mounting of storage and starting Galaxy instance. """ + log.debug( "Starting UCI '%s'" % uci.name ) + self.cloud_provider[self.provider_name].startUCI( uci ) - def stopUCI( self, uciName ): + def stopUCI( self, uci ): """ Stops cloud instance associated with named UCI. This also implies stopping of Galaxy and unmounting of the file system. """ - + log.debug( "Stopping UCI '%s'" % uci.name ) + self.cloud_provider[self.provider_name].stopUCI( uci ) + def put( self, job_wrapper ): runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) diff -r c9c9adf06e9d -r 7c438fd3cf4a lib/galaxy/cloud/providers/eucalyptus.py --- a/lib/galaxy/cloud/providers/eucalyptus.py Wed Sep 30 17:57:11 2009 -0400 +++ b/lib/galaxy/cloud/providers/eucalyptus.py Fri Oct 02 18:31:32 2009 -0400 @@ -1,12 +1,10 @@ -import subprocess, threading, os, errno +import subprocess, threading, os, errno, time, datetime from Queue import Queue, Empty from datetime import datetime from galaxy import model # Database interaction class from galaxy.datatypes.data import nice_size -from time import sleep - import galaxy.eggs galaxy.eggs.require("boto") from boto.ec2.connection import EC2Connection @@ -19,64 +17,119 @@ """ Eucalyptus-based cloud provider implementation for managing instances. """ - def __init__( self, app, user ): + STOP_SIGNAL = object() + def __init__( self, app ): log.debug( "Using eucalyptus as default cloud provider." ) - self.conn = get_connection( user ) + self.zone = "epc" + self.key_pair = "galaxy-keypair" + + #TODO: Use multiple threads to process requests? - def get_connection( user ): + def get_connection( self, uci ): """ Establishes EC2 connection using user's default credentials """ log.debug( '##### Establishing cloud connection' ) - creds = model.CloudUserCredentials.filter_by( user=user, defaultCred=True ).first() - if creds: - a_key = creds.access_key - s_key = creds.secret_key - # Amazon EC2 - #conn = EC2Connection( a_key, s_key ) - # Eucalyptus Public Cloud - # TODO: Add option in Galaxy config file to specify these values (i.e., for locally manages Eucalyptus deployments) - euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" ) - conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" ) - return conn - else: - log.debug( "User did not specify default credentials." ) - return 0 +# creds = model.CloudUserCredentials.filter_by( user=user, defaultCred=True ).first() + a_key = uci.credentials.access_key + s_key = uci.credentials.secret_key + # Amazon EC2 + #conn = EC2Connection( a_key, s_key ) + # Eucalyptus Public Cloud + # TODO: Add option in Galaxy config file to specify these values (i.e., for locally manages Eucalyptus deployments) + euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" ) + conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" ) + return conn + + def get_keypair_name( self, uci, conn ): + """ + Generate keypair using user's default credentials + """ + log.debug( "Getting user's keypair" ) + kp = conn.get_key_pair( self.key_pair ) + + try: + for i, inst in enumerate( uci.instance ): + uci.instance[i].keypair_name = kp.name + return kp.name + except AttributeError: # No keypair under this name exists so create it + log.debug( "No keypair found, creating keypair '%s'" % self.key_pair ) + kp = conn.create_key_pair( self.key_pair ) + for i, inst in enumerate( uci.instance ): + uci.instance[i].keypair_name = kp.name + uci.instance[i].keypair_material = kp.material + uci.flush() + # TODO: Store key_pair.material into instance table - this is the only time private key can be retrieved + # Actually, probably return key_pair to calling method and store name & key from there... + + return kp.name + + def get_mi( self, type='small' ): + """ + Get appropriate machine image (mi) based on instance size. + TODO: Dummy method - need to implement logic + For valid sizes, see http://aws.amazon.com/ec2/instance-types/ + """ + return model.CloudImage.filter( model.CloudImage.table.c.id==1 ).first() + +# def get_instances( self, uci ): +# """ +# Get objects of instances that are pending or running and are connected to uci object +# """ +# instances = trans.sa_session.query( model.CloudInstance ) \ +# .filter_by( user=user, uci_id=uci.id ) \ +# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \ +# .first() +# #.all() #TODO: return all but need to edit calling method(s) to handle list +# +# instances = uci.instance +# +# return instances - + def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" log.info( "sending stop signal to worker threads in eucalyptus cloud manager" ) self.queue.put( self.STOP_SIGNAL ) log.info( "eucalyptus cloud manager stopped" ) - def createUCI( self, user, name, storage_size, zone=None): + def createUCI( self, uci ): """ Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider and registers relevant information in Galaxy database. """ - conn = getConnection( user ) - # Capture user configured instance information - uci = model.UCI() - uci.name = name - uci.user = user - uci.state = "available" # Valid states include: "available", "running" or "pending" - uci.total_size = storage_size # This is OK now because a new instance is being created. - # Capture store related information - storage = model.CloudStore() - storage.user = user - storage.uci = uci - storage.size = storage_size - storage.availability_zone = "us-east-1a" # TODO: Give user choice here. Also, enable region selection. - #self.conn.create_volume( storage_size, storage.availability_zone, snapshot=None ) - # TODO: get correct value from Eucalyptus - storage.volume_id = "made up" - # Persist + conn = self.get_connection( uci ) + # Temporary code - need to ensure user selects zone at UCI creation time! + if uci.store[0].availability_zone=='': + log.info( "Availability zone for storage volume was not selected, using default zone: %s" % self.zone ) + uci.store[0].availability_zone = self.zone + uci.store[0].flush() + + #TODO: check if volume associated with UCI already exists (if server crashed for example) and don't recreate it + log.debug( "Creating volume in zone '%s'..." % uci.store[0].availability_zone ) + vol = conn.create_volume( uci.store[0].size, uci.store[0].availability_zone, snapshot=None ) + uci.store[0].volume_id = vol.id + + # Wait for a while to ensure volume was created +# vol_status = vol.status +# for i in range( 30 ): +# if vol_status is not "u'available'": +# log.debug( 'Updating volume status; current status: %s' % vol_status ) +# vol_status = vol.status +# time.sleep(3) +# if i is 29: +# log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) ) +# conn.delete_volume( vol.id ) +# uci.state = 'error' +# uci.flush() +# return + + uci.state = 'available' + uci.store[0].status = vol.status + uci.store[0].flush() uci.flush() - storage.flush() - session.flush() - + def deleteUCI( self, name ): """ Deletes UCI. NOTE that this implies deletion of any and all data associated @@ -86,14 +139,142 @@ def addStorageToUCI( self, name ): """ Adds more storage to specified UCI """ - def startUCI( self, name, type ): + def startUCI( self, uci ): """ Starts an instance of named UCI on the cloud. This implies, mounting of storage and starting Galaxy instance. """ + conn = self.get_connection( uci ) - def stopUCI( self, name ): + uci.instance[0].keypair_name = self.get_keypair_name( uci, conn ) + mi = self.get_mi( uci.instance[0].type ) + +# log.debug( "mi: %s, mi.image_id: %s, uci.instance[0].keypair_name: %s" % ( mi, mi.image_id, uci.instance[0].keypair_name ) ) + uci.instance[0].image = mi + +# log.debug( '***** Setting up security group' ) + # If not existent, setup galaxy security group +# try: +# gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.') +# gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port +# gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port +# except: +# pass +# sgs = conn.get_all_security_groups() +# for i in range( len( sgs ) ): +# if sgs[i].name == "galaxy": +# sg.append( sgs[i] ) +# break # only 1 security group w/ this name can exist, so continue + + log.debug( "***** Starting UCI instance '%s'" % uci.name ) +# log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( uci.instance[0].image.image_id, uci.instance[0].keypair_name ) ) + reservation = conn.run_instances( image_id=uci.instance[0].image.image_id, key_name=uci.instance[0].keypair_name ) + #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=['galaxy'], instance_type=instance.type, placement=instance.availability_zone ) + uci.instance[0].launch_time = datetime.utcnow() + uci.launch_time = uci.instance[0].launch_time + uci.instance[0].reservation_id = str( reservation ).split(":")[1] + uci.instance[0].instance_id = str( reservation.instances[0]).split(":")[1] + s = reservation.instances[0].state + uci.instance[0].state = s + uci.state = s + uci.instance[0].flush() + uci.flush() + + # Wait until instance gets running and then update the DB + while s!="running": + log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) ) + time.sleep( 15 ) + s = reservation.instances[0].update() + + uci.instance[0].state = s + uci.state = s + uci.instance[0].public_dns = reservation.instances[0].dns_name + uci.instance[0].private_dns = reservation.instances[0].private_dns_name + uci.instance[0].flush() + uci.flush() + + + def stopUCI( self, uci ): """ - Stops cloud instance associated with named UCI. This also implies - stopping of Galaxy and unmounting of the file system. - """ \ No newline at end of file + Stops all of cloud instances associated with named UCI. + """ + conn = self.get_connection( uci ) + tl = [] # temination list + + for i, inst in enumerate( uci.instance ): + tl.append( uci.instance[i].instance_id ) + + instList = conn.get_all_instances( tl ) +# log.debug( 'instList: %s' % instList ) + + for i, inst in enumerate( instList ): +# log.debug( 'inst: %s' % inst ) + log.debug( 'Before stop - inst.instances[0].update(): %s' % inst.instances[0].update() ) + inst.instances[0].stop() + log.debug( 'After stop - inst.instances[0].update(): %s' % inst.instances[0].update() ) + uci.instance[i].stop_time = datetime.utcnow() + + terminated=0 + while terminated!=len( instList ): + for i, inst in enumerate( instList ): + log.debug( "inst state: %s" % inst.instances[0].state ) + state = inst.instances[0].update() + if state=='terminated': + uci.instance[i].state = state + uci.instance[i].flush() + terminated += 1 + time.sleep ( 5 ) + + uci.state = 'available' + uci.launch_time = None + uci.flush() + + log.debug( "All instances for UCI '%s' were terminated." % uci.name ) + + + +# dbInstances = get_instances( trans, uci ) #TODO: handle list! +# +# # Get actual cloud instance object +# cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) +# +# # TODO: Detach persistent storage volume(s) from instance and update volume data in local database +# stores = get_stores( trans, uci ) +# for i, store in enumerate( stores ): +# log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) ) +# mntDevice = store.device +# volStat = None +## Detaching volume does not work with Eucalyptus Public Cloud, so comment it out +## try: +## volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice ) +## except: +## log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id ) +# store.attach_time = None +# store.device = None +# store.i_id = None +# store.status = volStat +# log.debug ( '***** volume status: %s' % volStat ) +# +# +# # Stop the instance and update status in local database +# cloudInstance.stop() +# dbInstances.stop_time = datetime.utcnow() +# while cloudInstance.state != 'terminated': +# log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) +# time.sleep(3) +# cloudInstance.update() +# dbInstances.state = cloudInstance.state +# +# # Reset relevant UCI fields +# uci.state = 'available' +# uci.launch_time = None +# +# # Persist +# session = trans.sa_session +## session.save_or_update( stores ) +# session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable? +# session.save_or_update( uci ) +# session.flush() +# trans.log_event( "User stopped cloud instance '%s'" % uci.name ) +# trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) + \ No newline at end of file diff -r c9c9adf06e9d -r 7c438fd3cf4a lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py Wed Sep 30 17:57:11 2009 -0400 +++ b/lib/galaxy/model/mapping.py Fri Oct 02 18:31:32 2009 -0400 @@ -432,9 +432,9 @@ Column( "attach_time", DateTime ), Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), Column( "uci_id", Integer, ForeignKey( "uci.id" ), index=True, nullable=False ), - Column( "volume_id", TEXT, nullable=False ), + Column( "volume_id", TEXT ), Column( "size", Integer, nullable=False ), - Column( "availability_zone", TEXT, nullable=False ), + Column( "availability_zone", TEXT ), Column( "i_id", TEXT, ForeignKey( "cloud_instance.instance_id" ), index=True ), Column( "status", TEXT ), Column( "device", TEXT ), diff -r c9c9adf06e9d -r 7c438fd3cf4a lib/galaxy/model/migrate/versions/0014_cloud_tables.py --- a/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Wed Sep 30 17:57:11 2009 -0400 +++ b/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Fri Oct 02 18:31:32 2009 -0400 @@ -38,11 +38,11 @@ Column( "launch_time", DateTime ), Column( "stop_time", DateTime ), Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), - Column( "uci_id", Integer, ForeignKey( "uci.id" ), index=True ), + Column( "uci_id", Integer, ForeignKey( "uci.id" ), index=True, nullable=False ), Column( "type", TEXT ), Column( "reservation_id", TEXT ), Column( "instance_id", TEXT ), - Column( "mi_id", TEXT, ForeignKey( "cloud_image.image_id" ), index=True, nullable=False ), + Column( "mi_id", TEXT, ForeignKey( "cloud_image.image_id" ), index=True ), Column( "state", TEXT ), Column( "public_dns", TEXT ), Column( "private_dns", TEXT ), @@ -57,9 +57,9 @@ Column( "attach_time", DateTime ), Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ), Column( "uci_id", Integer, ForeignKey( "uci.id" ), index=True, nullable=False ), - Column( "volume_id", TEXT, nullable=False ), + Column( "volume_id", TEXT ), Column( "size", Integer, nullable=False ), - Column( "availability_zone", TEXT, nullable=False ), + Column( "availability_zone", TEXT ), Column( "i_id", TEXT, ForeignKey( "cloud_instance.instance_id" ), index=True ), Column( "status", TEXT ), Column( "device", TEXT ), @@ -93,8 +93,8 @@ def downgrade(): metadata.reflect() try: -# log.debug( "Would drop cloud_image table." ) - CloudImage_table.drop() #Enable before release + log.debug( "Would drop cloud_image table." ) +# CloudImage_table.drop() #Enable before release except Exception, e: log.debug( "Dropping cloud_image table failed: %s" % str( e ) ) @@ -109,8 +109,8 @@ log.debug( "Dropping cloud_store table failed: %s" % str( e ) ) try: -# log.debug( "Would drop cloud_user_credentials table." ) - CloudUserCredentials_table.drop() #Enable before putting final version + log.debug( "Would drop cloud_user_credentials table." ) +# CloudUserCredentials_table.drop() #Enable before putting final version except Exception, e: log.debug( "Dropping cloud_user_credentials table failed: %s" % str( e ) ) diff -r c9c9adf06e9d -r 7c438fd3cf4a lib/galaxy/web/controllers/cloud.py --- a/lib/galaxy/web/controllers/cloud.py Wed Sep 30 17:57:11 2009 -0400 +++ b/lib/galaxy/web/controllers/cloud.py Fri Oct 02 18:31:32 2009 -0400 @@ -42,12 +42,12 @@ Render cloud main page (management of cloud resources) """ user = trans.get_user() - pendingInstances = trans.sa_session.query( model.UCI ) \ - .filter_by( user=user, state="pending" ) \ - .all() - - for i in range( len ( pendingInstances ) ): - update_instance_state( trans, pendingInstances[i].id ) +# pendingInstances = trans.sa_session.query( model.UCI ) \ +# .filter_by( user=user, state="pending" ) \ +# .all() +# +# for i inupdate_in range( len ( pendingInstances ) ): +# stance_state( trans, pendingInstances[i].id ) cloudCredentials = trans.sa_session.query( model.CloudUserCredentials ) \ .filter_by( user=user ) \ @@ -56,12 +56,13 @@ liveInstances = trans.sa_session.query( model.UCI ) \ .filter_by( user=user ) \ - .filter( or_(model.UCI.c.state=="running", model.UCI.c.state=="pending") ) \ + .filter( or_( model.UCI.c.state=="running", model.UCI.c.state=="pending", model.UCI.c.state=="terminating" ) ) \ .order_by( desc( model.UCI.c.launch_time ) ) \ .all() prevInstances = trans.sa_session.query( model.UCI ) \ - .filter_by( user=user, state="available" ) \ + .filter_by( user=user ) \ + .filter( or_( model.UCI.c.state=="available", model.UCI.c.state=="new", model.UCI.c.state=="error", model.UCI.c.state=="submitted" ) ) \ .order_by( desc( model.UCI.c.update_time ) ) \ .all() @@ -101,26 +102,26 @@ @web.expose @web.require_login( "start Galaxy cloud instance" ) - def start( self, trans, id, size='small' ): + def start( self, trans, id, type='small' ): """ Start a new cloud resource instance """ - # TODO: Add choice of instance size before starting one - #if size: + # TODO: Add choice of instance type before starting one + #if type: user = trans.get_user() - mi = get_mi( trans, size ) + mi = get_mi( trans, type ) uci = get_uci( trans, id ) stores = get_stores( trans, uci ) - log.debug(self.app.config.job_working_directory) +# log.debug(self.app.config.job_working_directory) if len(stores) is not 0: instance = model.CloudInstance() instance.user = user instance.image = mi instance.uci = uci - instance.keypair_name = get_keypair_name( trans ) instance.availability_zone = stores[0].availability_zone # Bc. all EBS volumes need to be in the same avail. zone, just check 1st - instance.type = size - conn = get_connection( trans ) + instance.type = type +# instance.keypair_name = get_keypair_name( trans ) +# conn = get_connection( trans ) # log.debug( '***** Setting up security group' ) # If not existent, setup galaxy security group # try: @@ -135,26 +136,25 @@ # sg.append( sgs[i] ) # break # only 1 security group w/ this name can exist, so continue - log.debug( '***** Starting an instance' ) - log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( instance.image.image_id, instance.keypair_name ) ) - reservation = conn.run_instances( image_id=instance.image.image_id, key_name=instance.keypair_name ) +# log.debug( '***** Starting an instance' ) +# log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( instance.image.image_id, instance.keypair_name ) ) +# reservation = conn.run_instances( image_id=instance.image.image_id, key_name=instance.keypair_name ) #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=['galaxy'], instance_type=instance.type, placement=instance.availability_zone ) - instance.launch_time = datetime.utcnow() - uci.launch_time = instance.launch_time - instance.reservation_id = str( reservation ).split(":")[1] - instance.instance_id = str( reservation.instances[0]).split(":")[1] +# instance.launch_time = datetime.utcnow() +# uci.launch_time = instance.launch_time +# instance.reservation_id = str( reservation ).split(":")[1] +# instance.instance_id = str( reservation.instances[0]).split(":")[1] # instance.state = "pending" - instance.state = reservation.instances[0].state - uci.state = instance.state - - # TODO: After instance boots up, need to update status, DNS and attach EBS +# instance.state = reservation.instances[0].state + uci.state = 'submitted' # Persist session = trans.sa_session session.save_or_update( instance ) + session.save_or_update( uci ) session.flush() - trans.log_event ("Started new instance. Reservation ID: '%s', Instance ID: '%s'" % (instance.reservation_id, instance.instance_id ) ) + trans.log_event ("User initiated starting of cloud instance '%s'." % uci.name ) return self.list( trans ) # return trans.show_form( @@ -167,55 +167,63 @@ @web.require_login( "stop Galaxy cloud instance" ) def stop( self, trans, id ): """ - Stop a cloud instance. This implies stopping Galaxy server and disconnecting/unmounting relevant file system(s). + Stop a cloud UCI instance. This implies stopping Galaxy server and disconnecting/unmounting relevant file system(s). """ uci = get_uci( trans, id ) - dbInstances = get_instances( trans, uci ) #TODO: handle list! - - conn = get_connection( trans ) - # Get actual cloud instance object - cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) - - # TODO: Detach persistent storage volume(s) from instance and update volume data in local database - stores = get_stores( trans, uci ) - for i, store in enumerate( stores ): - log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) ) - mntDevice = store.device - volStat = None -# Detaching volume does not work with Eucalyptus Public Cloud, so comment it out -# try: -# volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice ) -# except: -# log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id ) - store.attach_time = None - store.device = None - store.i_id = None - store.status = volStat - log.debug ( '***** volume status: %s' % volStat ) - - - # Stop the instance and update status in local database - cloudInstance.stop() - dbInstances.stop_time = datetime.utcnow() - while cloudInstance.state != 'terminated': - log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) - time.sleep(3) - cloudInstance.update() - dbInstances.state = cloudInstance.state - - # Reset relevant UCI fields - uci.state = 'available' - uci.launch_time = None - - # Persist + uci.state = 'terminating' session = trans.sa_session # session.save_or_update( stores ) - session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable? session.save_or_update( uci ) session.flush() trans.log_event( "User stopped cloud instance '%s'" % uci.name ) trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) - + +# dbInstances = get_instances( trans, uci ) #TODO: handle list! +# +# conn = get_connection( trans ) +# # Get actual cloud instance object +# cloudInstance = get_cloud_instance( conn, dbInstances.instance_id ) +# +# # TODO: Detach persistent storage volume(s) from instance and update volume data in local database +# stores = get_stores( trans, uci ) +# for i, store in enumerate( stores ): +# log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) ) +# mntDevice = store.device +# volStat = None +## Detaching volume does not work with Eucalyptus Public Cloud, so comment it out +## try: +## volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice ) +## except: +## log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id ) +# store.attach_time = None +# store.device = None +# store.i_id = None +# store.status = volStat +# log.debug ( '***** volume status: %s' % volStat ) +# +# +# # Stop the instance and update status in local database +# cloudInstance.stop() +# dbInstances.stop_time = datetime.utcnow() +# while cloudInstance.state != 'terminated': +# log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) ) +# time.sleep(3) +# cloudInstance.update() +# dbInstances.state = cloudInstance.state +# +# # Reset relevant UCI fields +# uci.state = 'available' +# uci.launch_time = None +# +# # Persist +# session = trans.sa_session +## session.save_or_update( stores ) +# session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable? +# session.save_or_update( uci ) +# session.flush() +# trans.log_event( "User stopped cloud instance '%s'" % uci.name ) +# trans.set_message( "Galaxy instance '%s' stopped." % uci.name ) +# return self.list( trans ) @web.expose @@ -302,20 +310,12 @@ trans.app.model.CloudUserCredentials.table.c.name==credName ).first() uci.user= user uci.total_size = volSize # This is OK now because new instance is being created. - # Need to flush because connection object accesses uci table - uci.flush() - # Capture store related information + uci.state = "new" # Valid states include: "new, "available", "running", "pending", "submitted", "terminating", or "error" storage = model.CloudStore() storage.user = user storage.uci = uci storage.size = volSize - storage.availability_zone = "us-east-1a" # TODO: Give user choice here. Also, enable region selection. - conn = get_connection( trans, credName ) - #conn.create_volume( volSize, storage.availability_zone, snapshot=None ) - # TODO: get correct value from AWS - storage.volume_id = "made up" - # TODO: If volume creation was successfull, set state to available - uci.state = "available" # Valid states include: "available", "running" or "pending" + storage.availability_zone = zone # TODO: Give user choice here. Also, enable region selection. # Persist session = trans.sa_session session.save_or_update( uci ) @@ -323,7 +323,7 @@ session.flush() # Log and display the management page trans.log_event( "User configured new cloud instance" ) - trans.set_message( "New Galaxy instance '%s' configured." % instanceName ) + trans.set_message( "New Galaxy instance '%s' configured. Once instance status shows 'available' you will be able to start the instance." % instanceName ) return self.list( trans ) except ValueError: vol_error = "Volume size must be specified as an integer value only, between 1 and 1000."
participants (1)
-
Greg Von Kuster