galaxy-dev
Threads by month
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2009 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2008 -----
- December
- November
- October
- September
- August
November 2009
- 26 participants
- 233 discussions
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/049083fee997
changeset: 3070:049083fee997
user: Enis Afgan <afgane(a)gmail.com>
date: Mon Oct 19 17:45:49 2009 -0400
description:
Verified functionality of EC2 provider code and it WORKS! Note that there is one hard dependency on user data file w/ EC2.
diffstat:
lib/galaxy/cloud/__init__.py | 92 ++++++++++++------
lib/galaxy/cloud/providers/ec2.py | 152 ++++++++++++++++++++----------
lib/galaxy/cloud/providers/eucalyptus.py | 172 +++++++++++++++++++++-------------
lib/galaxy/web/controllers/cloud.py | 86 +++++++++++------
templates/cloud/configure_cloud.mako | 2 +-
5 files changed, 322 insertions(+), 182 deletions(-)
diffs (939 lines):
diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/cloud/__init__.py
--- a/lib/galaxy/cloud/__init__.py Fri Oct 16 13:06:44 2009 -0400
+++ b/lib/galaxy/cloud/__init__.py Mon Oct 19 17:45:49 2009 -0400
@@ -18,11 +18,22 @@
log = logging.getLogger( __name__ )
-# States for running a job. These are NOT the same as data states
-#messages = {
-# JOB_WAIT
-#
-# }
+uci_states = Bunch(
+ NEW_UCI = "newUCI",
+ NEW = "new",
+ DELETING_UCI = "deletingUCI",
+ DELETING = "deleting",
+ SUBMITTED_UCI = "submittedUCI",
+ SUBMITTED = "submitted",
+ SHUTTING_DOWN_UCI = "shutting-downUCI",
+ SHUTTING_DOWN = "shutting-down",
+ AVAILABLE = "available",
+ RUNNING = "running",
+ PENDING = "pending",
+ ERROR = "error",
+ CREATING = "creating"
+)
+
JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted'
class CloudManager( object ):
@@ -32,10 +43,10 @@
def __init__( self, app ):
self.app = app
if self.app.config.get_bool( "enable_cloud_execution", True ):
- # The dispatcher manager underlying cloud instances
-# self.provider = CloudProvider( app )
+ # The dispatcher manager for underlying cloud instances - implements and contacts individual cloud providers
+ self.provider = CloudProvider( app )
# Monitor for updating status of cloud instances
- self.cloud_monitor = CloudMonitor( self.app )
+ self.cloud_monitor = CloudMonitor( self.app, self.provider )
# self.job_stop_queue = JobStopQueue( app, self.dispatcher )
else:
self.job_queue = self.job_stop_queue = NoopCloudMonitor()
@@ -93,7 +104,7 @@
CloudProvider.
"""
STOP_SIGNAL = object()
- def __init__( self, app ):
+ def __init__( self, app, provider ):
"""Start the cloud manager"""
self.app = app
# Keep track of the pid that started the cloud manager, only it
@@ -186,20 +197,20 @@
# new_instances
for r in session.query( model.UCI ) \
- .filter( or_( model.UCI.c.state=="newUCI",
- model.UCI.c.state=="submittedUCI",
- model.UCI.c.state=="shutting-downUCI",
- model.UCI.c.state=="deletingUCI" ) ) \
+ .filter( or_( model.UCI.c.state==uci_states.NEW_UCI, #"newUCI",
+ model.UCI.c.state==uci_states.SUBMITTED_UCI, #"submittedUCI",
+ model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI, #"shutting-downUCI",
+ model.UCI.c.state==uci_states.DELETING_UCI ) ) \
.all():
- uci = UCIwrapper( r )
- new_requests.append( uci )
+ uci_wrapper = UCIwrapper( r )
+ new_requests.append( uci_wrapper )
# log.debug( 'new_requests: %s' % new_requests )
- for uci in new_requests:
+ for uci_wrapper in new_requests:
session.clear()
# log.debug( 'r.name: %s, state: %s' % ( r.name, r.state ) )
# session.save_or_update( r )
# session.flush()
- self.provider.put( uci )
+ self.provider.put( uci_wrapper )
# Done with the session
mapping.Session.remove()
@@ -511,6 +522,14 @@
# --------- Getter methods -----------------
+ def get_provider_name( self ):
+ """ Returns name of cloud provider associated with given UCI. """
+ uci = model.UCI.get( self.uci_id )
+ uci.refresh()
+ cred_id = uci.credentials_id
+ cred = model.CloudUserCredentials.get( cred_id )
+ return cred.provider_name
+
def get_instances_indexes( self, state=None ):
"""
Returns indexes of instances associated with given UCI as they are stored in local Galaxy database and
@@ -977,26 +996,35 @@
class CloudProvider( object ):
def __init__( self, app ):
+ import providers.eucalyptus
+ import providers.ec2
+
self.app = app
self.cloud_provider = {}
+ self.cloud_provider["eucalyptus"] = providers.eucalyptus.EucalyptusCloudProvider( app )
+ self.cloud_provider["ec2"] = providers.ec2.EC2CloudProvider( app )
+
# start_cloud_provider = None
# if app.config.start_job_runners is not None:
# start_cloud_provider.extend( app.config.start_job_runners.split(",") )
# for provider_name in start_cloud_provider:
- self.provider_name = app.config.cloud_provider
- if self.provider_name == "eucalyptus":
- import providers.eucalyptus
- self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app )
- elif self.provider_name == "ec2":
- import providers.ec2
- self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app )
- else:
- log.error( "Unable to start unknown cloud provider: %s" %self.provider_name )
+# self.provider_name = app.config.cloud_provider
+# if self.provider_name == "eucalyptus":
+# import providers.eucalyptus
+# self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app )
+# elif self.provider_name == "ec2":
+# import providers.ec2
+# self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app )
+# else:
+# log.error( "Unable to start unknown cloud provider: %s" %self.provider_name )
def put( self, uci_wrapper ):
""" Put given request for UCI manipulation into provider's request queue."""
# log.debug( "Adding UCI '%s' manipulation request into cloud manager's queue." % uci_wrapper.name )
- self.cloud_provider[self.provider_name].put( uci_wrapper )
+ self.cloud_provider[uci_wrapper.get_provider_name()].put( uci_wrapper )
+
+
+
def createUCI( self, uci ):
"""
@@ -1034,12 +1062,12 @@
def update( self ):
"""
- Runs a global status update on all storage volumes and all instances whose UCI is
- 'running' state.
- Reason behind this method is to sync state of local DB and real world resources
+ Runs a global status update across all providers for all UCIs in state other than 'terminated' and 'available'.
+ Reason behind this method is to sync state of local DB and real world resources.
"""
-# log.debug( "Running global update" )
- self.cloud_provider[self.provider_name].update()
+ for provider in self.cloud_provider.keys():
+# log.debug( "Running global update for provider: '%s'" % provider )
+ self.cloud_provider[provider].update()
def recover( self, job, job_wrapper ):
runner_name = ( job.job_runner_name.split(":", 1) )[0]
diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/cloud/providers/ec2.py
--- a/lib/galaxy/cloud/providers/ec2.py Fri Oct 16 13:06:44 2009 -0400
+++ b/lib/galaxy/cloud/providers/ec2.py Mon Oct 19 17:45:49 2009 -0400
@@ -5,23 +5,53 @@
from galaxy import model # Database interaction class
from galaxy.model import mapping
from galaxy.datatypes.data import nice_size
+from galaxy.util.bunch import Bunch
from Queue import Queue
from sqlalchemy import or_
import galaxy.eggs
galaxy.eggs.require("boto")
from boto.ec2.connection import EC2Connection
-from boto.ec2.regioninfo import RegionInfo
+import boto.exception
import logging
log = logging.getLogger( __name__ )
+uci_states = Bunch(
+ NEW_UCI = "newUCI",
+ NEW = "new",
+ DELETING_UCI = "deletingUCI",
+ DELETING = "deleting",
+ SUBMITTED_UCI = "submittedUCI",
+ SUBMITTED = "submitted",
+ SHUTTING_DOWN_UCI = "shutting-downUCI",
+ SHUTTING_DOWN = "shutting-down",
+ AVAILABLE = "available",
+ RUNNING = "running",
+ PENDING = "pending",
+ ERROR = "error",
+ DELETED = "deleted"
+)
+
+instance_states = Bunch(
+ TERMINATED = "terminated",
+ RUNNING = "running",
+ PENDING = "pending",
+ SHUTTING_DOWN = "shutting-down"
+)
+
+store_states = Bunch(
+ IN_USE = "in-use",
+ CREATING = "creating"
+)
+
class EC2CloudProvider( object ):
"""
Amazon EC2-based cloud provider implementation for managing instances.
"""
STOP_SIGNAL = object()
def __init__( self, app ):
+ self.name = "ec2"
self.zone = "us-east-1a"
self.key_pair = "galaxy-keypair"
self.queue = Queue()
@@ -48,15 +78,13 @@
if uci_state is self.STOP_SIGNAL:
return
try:
- if uci_state=="new":
- log.debug( "Calling create UCI" )
+ if uci_state==uci_states.NEW: # "new":
self.createUCI( uci_wrapper )
- elif uci_state=="deleting":
+ elif uci_state==uci_states.DELETING: #"deleting":
self.deleteUCI( uci_wrapper )
- elif uci_state=="submitted":
- log.debug( "Calling start UCI" )
+ elif uci_state==uci_states.SUBMITTED: #"submitted":
self.startUCI( uci_wrapper )
- elif uci_state=="shutting-down":
+ elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down":
self.stopUCI( uci_wrapper )
except:
log.exception( "Uncaught exception executing request." )
@@ -75,19 +103,21 @@
Generate keypair using user's default credentials
"""
log.debug( "Getting user's keypair" )
- kp = conn.get_key_pair( self.key_pair )
instances = uci_wrapper.get_instances_indexes()
-
try:
+ kp = conn.get_key_pair( self.key_pair )
for inst in instances:
- log.debug("inst: '%s'" % inst )
+# log.debug("inst: '%s'" % inst )
uci_wrapper.set_key_pair( inst, kp.name )
return kp.name
- except AttributeError: # No keypair under this name exists so create it
- log.info( "No keypair found, creating keypair '%s'" % self.key_pair )
- kp = conn.create_key_pair( self.key_pair )
- for inst in instances:
- uci_wrapper.set_key_pair( inst, kp.name, kp.material )
+ except boto.exception.EC2ResponseError, e: # No keypair under this name exists so create it
+ if e.code == 'InvalidKeyPair.NotFound':
+ log.info( "No keypair found, creating keypair '%s'" % self.key_pair )
+ kp = conn.create_key_pair( self.key_pair )
+ for inst in instances:
+ uci_wrapper.set_key_pair( inst, kp.name, kp.material )
+ else:
+ log.error( "EC2 response error: '%s'" % e )
return kp.name
@@ -146,20 +176,24 @@
uci_wrapper.set_store_volume_id( 0, vol.id )
# Wait for a while to ensure volume was created
- vol_status = vol.status
- for i in range( 30 ):
- if vol_status is not "available":
- log.debug( 'Updating volume status; current status: %s' % vol_status )
- vol_status = vol.status
- time.sleep(3)
- if i is 29:
- log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) )
- conn.delete_volume( vol.id )
- uci_wrapper.change_state( uci_state='error' )
- return
-
- uci_wrapper.change_state( uci_state='available' )
- uci_wrapper.set_store_status( vol.id, vol_status )
+# vol_status = vol.status
+# for i in range( 30 ):
+# if vol_status is not "available":
+# log.debug( 'Updating volume status; current status: %s' % vol_status )
+# vol_status = vol.status
+# time.sleep(3)
+# if i is 29:
+# log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) )
+# conn.delete_volume( vol.id )
+# uci_wrapper.change_state( uci_state='error' )
+# return
+ vl = conn.get_all_volumes( [vol.id] )
+ if len( vl ) > 0:
+ uci_wrapper.change_state( uci_state=vl[0].status )
+ uci_wrapper.set_store_status( vol.id, vl[0].status )
+ else:
+ uci_wrapper.change_state( uci_state=uci_states.ERROR )
+ uci_wrapper.set_store_status( vol.id, uci_states.ERROR )
def deleteUCI( self, uci_wrapper ):
"""
@@ -191,7 +225,7 @@
else:
log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \
MANUAL intervention and processing needed." % ( failedList, deletedList ) )
- uci_wrapper.change_state( uci_state="error" )
+ uci_wrapper.change_state( uci_state=uci_state.ERROR )
def addStorageToUCI( self, name ):
""" Adds more storage to specified UCI
@@ -201,7 +235,7 @@
uci = uci_wrapper.get_uci()
log.debug( "Would be starting instance '%s'" % uci.name )
- uci_wrapper.change_state( 'pending' )
+ uci_wrapper.change_state( uci_state.PENDING )
# log.debug( "Sleeping a bit... (%s)" % uci.name )
# time.sleep(20)
# log.debug( "Woke up! (%s)" % uci.name )
@@ -213,16 +247,16 @@
conn = self.get_connection( uci_wrapper )
#
self.set_keypair( uci_wrapper, conn )
- i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of *new* i_indexes associated with this UCI
+ i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of i_indexes associated with this UCI whose state is 'None'
log.debug( "Starting instances with IDs: '%s' associated with UCI '%s' " % ( uci_wrapper.get_name(), i_indexes ) )
for i_index in i_indexes:
mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) )
- log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) )
+# log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) )
uci_wrapper.set_mi( i_index, mi_id )
# Check if galaxy security group exists (and create it if it does not)
- log.debug( '***** Setting up security group' )
+# log.debug( '***** Setting up security group' )
security_group = 'galaxyWeb'
sgs = conn.get_all_security_groups() # security groups
gsgt = False # galaxy security group test
@@ -235,12 +269,15 @@
gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port
gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
# Start an instance
- log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() )
+ log.debug( "***** Starting instance for UCI '%s'" % uci_wrapper.get_name() )
+ #TODO: Get customization scripts remotley and pass volID and user credential data only as user data from here.
+ userdata = open('/Users/afgane/Dropbox/Galaxy/EC2startupScripts/web/ec2autorun.zip', 'rb').read()
log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s, security_groups=[%s], instance_type=%s, placement=%s )'
% ( mi_id, uci_wrapper.get_key_pair_name( i_index ), [security_group], uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) )
reservation = conn.run_instances( image_id=mi_id,
key_name=uci_wrapper.get_key_pair_name( i_index ),
security_groups=[security_group],
+ user_data=userdata,
instance_type=uci_wrapper.get_type( i_index ),
placement=uci_wrapper.get_uci_availability_zone() )
# Record newly available instance data into local Galaxy database
@@ -337,8 +374,7 @@
# uci.launch_time = None
# uci.flush()
#
- log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() )
-
+ log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() )
# dbInstances = get_instances( trans, uci ) #TODO: handle list!
@@ -388,20 +424,26 @@
def update( self ):
"""
- Runs a global status update on all storage volumes and all instances whose UCI is in
- 'running', 'pending', or 'shutting-down' state.
+ Runs a global status update on all instances that are in 'running', 'pending', "creating", or 'shutting-down' state.
+ Also, runs update on all storage volumes that are in "in-use", "creating", or 'None' state.
Reason behind this method is to sync state of local DB and real-world resources
"""
- log.debug( "Running general status update for EPC UCIs." )
- instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all()
+ log.debug( "Running general status update for EC2 UCIs..." )
+ instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, #"running",
+ model.CloudInstance.c.state==instance_states.PENDING, #"pending",
+ model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all()
for inst in instances:
- log.debug( "Running general status update on instance '%s'" % inst.instance_id )
- self.updateInstance( inst )
+ if self.name == inst.uci.credentials.provider_name:
+ log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider_name, inst.instance_id ) )
+ self.updateInstance( inst )
- stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all()
+ stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_states.IN_USE,
+ model.CloudStore.c.status==store_states.CREATING,
+ model.CloudStore.c.status==None ) ).all()
for store in stores:
- log.debug( "Running general status update on store '%s'" % store.volume_id )
- self.updateStore( store )
+ if self.name == store.uci.credentials.provider_name:
+ log.debug( "[%s] Running general status update on store '%s'" % ( store.uci.credentials.provider_name, store.volume_id ) )
+ self.updateStore( store )
def updateInstance( self, inst ):
@@ -420,8 +462,8 @@
# marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference...
if len( rl ) == 0:
log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id )
- inst.state = 'terminated'
- uci.state = 'available'
+ inst.state = instance_states.TERMINATED
+ uci.state = uci_states.AVAILABLE
uci.launch_time = None
inst.flush()
uci.flush()
@@ -429,14 +471,14 @@
for r in rl:
for i, cInst in enumerate( r.instances ):
s = cInst.update()
- log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) )
+ log.debug( "Checking state of cloud instance '%s' associated with UCI '%s' and reservation '%s'. State='%s'" % ( cInst, uci.name, r, s ) )
if s != inst.state:
inst.state = s
inst.flush()
- if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available'
- uci.state = 'available'
+ if s == instance_states.TERMINATED: # After instance has shut down, ensure UCI is marked as 'available'
+ uci.state = uci_states.AVAILABLE
uci.flush()
- if s != uci.state and s != 'terminated':
+ if s != uci.state and s != instance_states.TERMINATED:
# Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
uci.state = s
uci.flush()
@@ -461,6 +503,12 @@
# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) )
# Update store status in local DB with info from cloud provider
if store.status != vl[0].status:
+ # In case something failed during creation of UCI but actual storage volume was created and yet
+ # UCI state remained as 'new', try to remedy this by updating UCI state here
+ if ( store.status == None ) and ( store.volume_id != None ):
+ uci.state = vl[0].status
+ uci.flush()
+
store.status = vl[0].status
store.flush()
if store.i_id != vl[0].instance_id:
diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/cloud/providers/eucalyptus.py
--- a/lib/galaxy/cloud/providers/eucalyptus.py Fri Oct 16 13:06:44 2009 -0400
+++ b/lib/galaxy/cloud/providers/eucalyptus.py Mon Oct 19 17:45:49 2009 -0400
@@ -5,6 +5,7 @@
from galaxy import model # Database interaction class
from galaxy.model import mapping
from galaxy.datatypes.data import nice_size
+from galaxy.util.bunch import Bunch
from Queue import Queue
from sqlalchemy import or_
@@ -16,12 +17,41 @@
import logging
log = logging.getLogger( __name__ )
+uci_states = Bunch(
+ NEW_UCI = "newUCI",
+ NEW = "new",
+ DELETING_UCI = "deletingUCI",
+ DELETING = "deleting",
+ SUBMITTED_UCI = "submittedUCI",
+ SUBMITTED = "submitted",
+ SHUTTING_DOWN_UCI = "shutting-downUCI",
+ SHUTTING_DOWN = "shutting-down",
+ AVAILABLE = "available",
+ RUNNING = "running",
+ PENDING = "pending",
+ ERROR = "error",
+ DELETED = "deleted"
+)
+
+instance_states = Bunch(
+ TERMINATED = "terminated",
+ RUNNING = "running",
+ PENDING = "pending",
+ SHUTTING_DOWN = "shutting-down"
+)
+
+store_states = Bunch(
+ IN_USE = "in-use",
+ CREATING = "creating"
+)
+
class EucalyptusCloudProvider( object ):
"""
Eucalyptus-based cloud provider implementation for managing instances.
"""
STOP_SIGNAL = object()
def __init__( self, app ):
+ self.name = "eucalyptus"
self.zone = "epc"
self.key_pair = "galaxy-keypair"
self.queue = Queue()
@@ -48,15 +78,13 @@
if uci_state is self.STOP_SIGNAL:
return
try:
- if uci_state=="new":
- log.debug( "Calling create UCI" )
+ if uci_state==uci_states.NEW: # "new":
self.createUCI( uci_wrapper )
- elif uci_state=="deleting":
+ elif uci_state==uci_states.DELETING: #"deleting":
self.deleteUCI( uci_wrapper )
- elif uci_state=="submitted":
- log.debug( "Calling start UCI" )
+ elif uci_state==uci_states.SUBMITTED: #"submitted":
self.startUCI( uci_wrapper )
- elif uci_state=="shutting-down":
+ elif uci_state==uci_states.SHUTTING_DOWN: #"shutting-down":
self.stopUCI( uci_wrapper )
except:
log.exception( "Uncaught exception executing request." )
@@ -149,7 +177,7 @@
uci_wrapper.set_store_volume_id( 0, vol.id )
# EPC does not allow creation of storage volumes (it deletes one as soon as it is created, so manually set uci_state here)
- uci_wrapper.change_state( uci_state='available' )
+ uci_wrapper.change_state( uci_state=uci_states.AVAILABLE )
uci_wrapper.set_store_status( vol.id, vol.status )
def deleteUCI( self, uci_wrapper ):
@@ -182,7 +210,7 @@
else:
log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \
MANUAL intervention and processing needed." % ( failedList, deletedList ) )
- uci_wrapper.change_state( uci_state="error" )
+ uci_wrapper.change_state( uci_state=uci_states.ERROR )
def addStorageToUCI( self, name ):
""" Adds more storage to specified UCI """
@@ -191,7 +219,7 @@
uci = uci_wrapper.get_uci()
log.debug( "Would be starting instance '%s'" % uci.name )
- uci_wrapper.change_state( 'pending' )
+ uci_wrapper.change_state( uci_states.PENDING )
# log.debug( "Sleeping a bit... (%s)" % uci.name )
# time.sleep(20)
# log.debug( "Woke up! (%s)" % uci.name )
@@ -322,7 +350,7 @@
# uci.launch_time = None
# uci.flush()
#
- log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() )
+ log.debug( "Termination was initiated for all instances of UCI '%s'." % uci_wrapper.get_name() )
@@ -373,20 +401,26 @@
def update( self ):
"""
- Runs a global status update on all storage volumes and all instances whose UCI is in
- 'running', 'pending', or 'shutting-down' state.
+ Runs a global status update on all instances that are in 'running', 'pending', "creating", or 'shutting-down' state.
+ Also, runs update on all storage volumes that are in "in-use", "creating", or 'None' state.
Reason behind this method is to sync state of local DB and real-world resources
"""
- log.debug( "Running general status update for EPC UCIs." )
- instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all()
+ log.debug( "Running general status update for EPC UCIs..." )
+ instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state==instance_states.RUNNING, #"running",
+ model.CloudInstance.c.state==instance_states.PENDING, #"pending",
+ model.CloudInstance.c.state==instance_states.SHUTTING_DOWN ) ).all()
for inst in instances:
- log.debug( "Running general status update on instance '%s'" % inst.instance_id )
- self.updateInstance( inst )
+ if self.name == inst.uci.credentials.provider_name:
+ log.debug( "[%s] Running general status update on instance '%s'" % ( inst.uci.credentials.provider_name, inst.instance_id ) )
+ self.updateInstance( inst )
- stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all()
+ stores = model.CloudStore.filter( or_( model.CloudStore.c.status==store_states.IN_USE,
+ model.CloudStore.c.status==store_states.CREATING,
+ model.CloudStore.c.status==None ) ).all()
for store in stores:
- log.debug( "Running general status update on store '%s'" % store.volume_id )
- self.updateStore( store )
+ if self.name == store.uci.credentials.provider_name:
+ log.debug( "[%s] Running general status update on store '%s'" % ( store.uci.credentials.provider_name, store.volume_id ) )
+ self.updateStore( store )
def updateInstance( self, inst ):
@@ -406,8 +440,8 @@
# marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference...
if len( rl ) == 0:
log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id )
- inst.state = 'terminated'
- uci.state = 'available'
+ inst.state = instance_states.TERMINATED
+ uci.state = uci_states.AVAILABLE
uci.launch_time = None
inst.flush()
uci.flush()
@@ -419,10 +453,10 @@
if s != inst.state:
inst.state = s
inst.flush()
- if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available'
- uci.state = 'available'
+ if s == instance_states.TERMINATED: # After instance has shut down, ensure UCI is marked as 'available'
+ uci.state = uci_states.AVAILABLE
uci.flush()
- if s != uci.state and s != 'terminated':
+ if s != uci.state and s != instance_states.TERMINATED:
# Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
uci.state = s
uci.flush()
@@ -448,6 +482,12 @@
# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) )
# Update store status in local DB with info from cloud provider
if store.status != vl[0].status:
+ # In case something failed during creation of UCI but actual storage volume was created and yet
+ # UCI state remained as 'new', try to remedy this by updating UCI state here
+ if ( store.status == None ) and ( store.volume_id != None ):
+ uci.state = vl[0].status
+ uci.flush()
+
store.status = vl[0].status
store.flush()
if store.i_id != vl[0].instance_id:
@@ -460,48 +500,48 @@
store.device = vl[0].device
store.flush()
- def updateUCI( self, uci ):
- """
- Runs a global status update on all storage volumes and all instances that are
- associated with specified UCI
- """
- conn = self.get_connection( uci )
-
- # Update status of storage volumes
- vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all()
- vols = []
- for v in vl:
- vols.append( v.volume_id )
- try:
- volumes = conn.get_all_volumes( vols )
- for i, v in enumerate( volumes ):
- uci.store[i].i_id = v.instance_id
- uci.store[i].status = v.status
- uci.store[i].device = v.device
- uci.store[i].flush()
- except:
- log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name )
- pass
-
- # Update status of instances
- il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all()
- instanceList = []
- for i in il:
- instanceList.append( i.instance_id )
- log.debug( 'instanceList: %s' % instanceList )
- try:
- reservations = conn.get_all_instances( instanceList )
- for i, r in enumerate( reservations ):
- uci.instance[i].state = r.instances[0].update()
- log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) )
- uci.state = uci.instance[i].state
- uci.instance[i].public_dns = r.instances[0].dns_name
- uci.instance[i].private_dns = r.instances[0].private_dns_name
- uci.instance[i].flush()
- uci.flush()
- except:
- log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name )
- pass
+# def updateUCI( self, uci ):
+# """
+# Runs a global status update on all storage volumes and all instances that are
+# associated with specified UCI
+# """
+# conn = self.get_connection( uci )
+#
+# # Update status of storage volumes
+# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all()
+# vols = []
+# for v in vl:
+# vols.append( v.volume_id )
+# try:
+# volumes = conn.get_all_volumes( vols )
+# for i, v in enumerate( volumes ):
+# uci.store[i].i_id = v.instance_id
+# uci.store[i].status = v.status
+# uci.store[i].device = v.device
+# uci.store[i].flush()
+# except:
+# log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name )
+# pass
+#
+# # Update status of instances
+# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all()
+# instanceList = []
+# for i in il:
+# instanceList.append( i.instance_id )
+# log.debug( 'instanceList: %s' % instanceList )
+# try:
+# reservations = conn.get_all_instances( instanceList )
+# for i, r in enumerate( reservations ):
+# uci.instance[i].state = r.instances[0].update()
+# log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) )
+# uci.state = uci.instance[i].state
+# uci.instance[i].public_dns = r.instances[0].dns_name
+# uci.instance[i].private_dns = r.instances[0].private_dns_name
+# uci.instance[i].flush()
+# uci.flush()
+# except:
+# log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name )
+# pass
# --------- Helper methods ------------
diff -r 9881b0df3252 -r 049083fee997 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Fri Oct 16 13:06:44 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Mon Oct 19 17:45:49 2009 -0400
@@ -29,6 +29,29 @@
import logging
log = logging.getLogger( __name__ )
+uci_states = Bunch(
+ NEW_UCI = "newUCI",
+ NEW = "new",
+ DELETING_UCI = "deletingUCI",
+ DELETING = "deleting",
+ SUBMITTED_UCI = "submittedUCI",
+ SUBMITTED = "submitted",
+ SHUTTING_DOWN_UCI = "shutting-downUCI",
+ SHUTTING_DOWN = "shutting-down",
+ AVAILABLE = "available",
+ RUNNING = "running",
+ PENDING = "pending",
+ ERROR = "error",
+ DELETED = "deleted"
+)
+
+instance_states = Bunch(
+ TERMINATED = "terminated",
+ RUNNING = "running",
+ PENDING = "pending",
+ SHUTTING_DOWN = "shutting-down"
+)
+
class CloudController( BaseController ):
# def __init__( self ):
@@ -59,23 +82,23 @@
liveInstances = trans.sa_session.query( model.UCI ) \
.filter_by( user=user ) \
- .filter( or_( model.UCI.c.state=="running",
- model.UCI.c.state=="pending",
- model.UCI.c.state=="submitted",
- model.UCI.c.state=="submittedUCI",
- model.UCI.c.state=="shutting-down",
- model.UCI.c.state=="shutting-downUCI" ) ) \
+ .filter( or_( model.UCI.c.state==uci_states.RUNNING, #"running",
+ model.UCI.c.state==uci_states.PENDING, #"pending",
+ model.UCI.c.state==uci_states.SUBMITTED, #"submitted",
+ model.UCI.c.state==uci_states.SUBMITTED_UCI, #"submittedUCI",
+ model.UCI.c.state==uci_states.SHUTTING_DOWN, #"shutting-down",
+ model.UCI.c.state==uci_states.SHUTTING_DOWN_UCI ) ) \
.order_by( desc( model.UCI.c.update_time ) ) \
.all()
prevInstances = trans.sa_session.query( model.UCI ) \
.filter_by( user=user ) \
- .filter( or_( model.UCI.c.state=="available",
- model.UCI.c.state=="new",
- model.UCI.c.state=="newUCI",
- model.UCI.c.state=="error",
- model.UCI.c.state=="deleting",
- model.UCI.c.state=="deletingUCI" ) ) \
+ .filter( or_( model.UCI.c.state==uci_states.AVAILABLE, #"available",
+ model.UCI.c.state==uci_states.NEW, #"new",
+ model.UCI.c.state==uci_states.NEW_UCI, #"newUCI",
+ model.UCI.c.state==uci_states.ERROR, #"error",
+ model.UCI.c.state==uci_states.DELETING, #"deleting",
+ model.UCI.c.state==uci_states.DELETING_UCI ) ) \
.order_by( desc( model.UCI.c.update_time ) ) \
.all()
@@ -83,9 +106,9 @@
# TODO: Auto-refresh once instance is running
pendingInstances = trans.sa_session.query( model.UCI ) \
.filter_by( user=user ) \
- .filter( or_( model.UCI.c.state=="pending" , \
- model.UCI.c.state=="submitted" , \
- model.UCI.c.state=="submittedUCI" ) ) \
+ .filter( or_( model.UCI.c.state==uci_states.PENDING, #"pending" , \
+ model.UCI.c.state==uci_states.SUBMITTED, #"submitted" , \
+ model.UCI.c.state==uci_states.SUBMITTED_UCI ) ) \
.all()
if pendingInstances:
trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to "
@@ -131,12 +154,12 @@
stores = get_stores( trans, uci )
# log.debug(self.app.config.job_working_directory)
if ( len(stores) is not 0 ) and \
- ( uci.state != 'submitted' ) and \
- ( uci.state != 'submittedUCI' ) and \
- ( uci.state != 'pending' ) and \
- ( uci.state != 'deleting' ) and \
- ( uci.state != 'deletingUCI' ) and \
- ( uci.state != 'error' ):
+ ( uci.state != uci_states.SUBMITTED ) and \
+ ( uci.state != uci_states.SUBMITTED_UCI ) and \
+ ( uci.state != uci_states.PENDING ) and \
+ ( uci.state != uci_states.DELETING ) and \
+ ( uci.state != uci_states.DELETING_UCI ) and \
+ ( uci.state != uci_states.ERROR ):
instance = model.CloudInstance()
instance.user = user
instance.image = mi
@@ -169,7 +192,7 @@
# instance.instance_id = str( reservation.instances[0]).split(":")[1]
# instance.state = "pending"
# instance.state = reservation.instances[0].state
- uci.state = 'submittedUCI'
+ uci.state = uci_states.SUBMITTED_UCI
# Persist
session = trans.sa_session
@@ -181,6 +204,7 @@
trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to "
"start up and then refresh this page. A button to connect to the instance will then appear alongside "
"instance description." )
+ time.sleep(1) # Wait for initial update to occur to avoid immediate page reload
return self.list( trans )
trans.show_error_message( "Cannot start instance that is in state '%s'." % uci.state )
@@ -199,7 +223,7 @@
Stop a cloud UCI instance. This implies stopping Galaxy server and disconnecting/unmounting relevant file system(s).
"""
uci = get_uci( trans, id )
- uci.state = 'shutting-downUCI'
+ uci.state = uci_states.SHUTTING_DOWN_UCI
session = trans.sa_session
# session.save_or_update( stores )
session.save_or_update( uci )
@@ -252,7 +276,6 @@
# session.flush()
# trans.log_event( "User stopped cloud instance '%s'" % uci.name )
# trans.set_message( "Galaxy instance '%s' stopped." % uci.name )
-#
return self.list( trans )
@web.expose
@@ -264,9 +287,9 @@
"""
uci = get_uci( trans, id )
- if ( uci.state != 'deletingUCI' ) and ( uci.state != 'deleting' ) and ( uci.state != 'error' ):
+ if ( uci.state != uci_states.DELETING_UCI ) and ( uci.state != uci_states.DELETING ) and ( uci.state != uci_states.ERROR ):
name = uci.name
- uci.state = "deletingUCI"
+ uci.state = uci_states.DELETING_UCI
# dbInstances = get_instances( trans, uci ) #TODO: handle list!
#
# conn = get_connection( trans )
@@ -332,7 +355,7 @@
if instanceName:
# Create new user configured instance
try:
- if trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first():
+ if trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!=uci_states.DELETED ) ).first():
error['inst_error'] = "An instance with that name already exist."
elif instanceName=='' or len( instanceName ) > 255:
error['inst_error'] = "Instance name must be between 1 and 255 characters long."
@@ -355,7 +378,7 @@
trans.app.model.CloudUserCredentials.table.c.name==credName ).first()
uci.user= user
uci.total_size = volSize # This is OK now because new instance is being created.
- uci.state = "newUCI"
+ uci.state = uci_states.NEW_UCI
storage = model.CloudStore()
storage.user = user
@@ -370,6 +393,7 @@
# Log and display the management page
trans.log_event( "User configured new cloud instance" )
trans.set_message( "New Galaxy instance '%s' configured. Once instance status shows 'available' you will be able to start the instance." % instanceName )
+ time.sleep(1) # Wait for initial update to occur to avoid immediate page reload
return self.list( trans )
except ValueError:
vol_error = "Volume size must be specified as an integer value only, between 1 and 1000."
@@ -993,7 +1017,7 @@
def get_UCIs_state( trans ):
user = trans.get_user()
- instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.state != "deleted" ).all()
+ instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.state != uci_states.DELETED ).all()
insd = {} # instance name-state dict
for inst in instances:
insd[inst.name] = inst.state
@@ -1088,7 +1112,7 @@
user = trans.get_user()
instances = trans.sa_session.query( model.CloudInstance ) \
.filter_by( user=user, uci_id=uci.id ) \
- .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \
+ .filter( or_(model.CloudInstance.table.c.state==instance_states.RUNNING, model.CloudInstance.table.c.state==instance_states.PENDING ) ) \
.first()
#.all() #TODO: return all but need to edit calling method(s) to handle list
@@ -1172,7 +1196,7 @@
session.flush()
# If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS)
- if oldState=="pending" and dbInstances.state=="running":
+ if oldState==instance_states.PENDING and dbInstances.state==instance_states.RUNNING:
update_instance( trans, dbInstances, cloudInstance, conn, uci )
diff -r 9881b0df3252 -r 049083fee997 templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Fri Oct 16 13:06:44 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Mon Oct 19 17:45:49 2009 -0400
@@ -26,7 +26,7 @@
<li>
<a class="action-button" href="${h.url_for( action='add' )}">
<img src="${h.url_for('/static/images/silk/add.png')}" />
- <span>Add AWS credentials</span>
+ <span>Add credentials</span>
</a>
</li>
</ul>
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/9881b0df3252
changeset: 3069:9881b0df3252
user: Enis Afgan <afgane(a)gmail.com>
date: Fri Oct 16 13:06:44 2009 -0400
description:
Web UI functionality improved when adding credentials and configuring new UCIs. Added support for EC2 as a cloud provider; however, provider selection based on user credentials alone does not work and thus cannot verify EC2 functionality yet.
diffstat:
lib/galaxy/cloud/__init__.py | 729 ++++++++++++++++---------------
lib/galaxy/cloud/providers/ec2.py | 526 ++++++++++++++++++++++
lib/galaxy/cloud/providers/eucalyptus.py | 49 +-
lib/galaxy/web/controllers/cloud.py | 87 ++-
lib/galaxy/web/framework/helpers/__init__.py | 1 +
templates/cloud/add_credentials.mako | 8 +-
templates/cloud/configure_uci.mako | 116 +++++
7 files changed, 1100 insertions(+), 416 deletions(-)
diffs (1762 lines):
diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/cloud/__init__.py
--- a/lib/galaxy/cloud/__init__.py Wed Oct 14 19:20:11 2009 -0400
+++ b/lib/galaxy/cloud/__init__.py Fri Oct 16 13:06:44 2009 -0400
@@ -33,9 +33,9 @@
self.app = app
if self.app.config.get_bool( "enable_cloud_execution", True ):
# The dispatcher manager underlying cloud instances
- self.provider = DefaultCloudProvider( app )
+# self.provider = CloudProvider( app )
# Monitor for updating status of cloud instances
- self.cloud_monitor = CloudMonitor( self.app, self.provider )
+ self.cloud_monitor = CloudMonitor( self.app )
# self.job_stop_queue = JobStopQueue( app, self.dispatcher )
else:
self.job_queue = self.job_stop_queue = NoopCloudMonitor()
@@ -93,7 +93,7 @@
CloudProvider.
"""
STOP_SIGNAL = object()
- def __init__( self, app, provider ):
+ def __init__( self, app ):
"""Start the cloud manager"""
self.app = app
# Keep track of the pid that started the cloud manager, only it
@@ -153,7 +153,7 @@
try:
# log.debug( "Calling monitor_step" )
self.__monitor_step()
- if cnt%30 == 0: # Run global update every 30 seconds
+ if cnt%30 == 0: # Run global update every 30 seconds (1 minute)
self.provider.update()
cnt = 0
except:
@@ -471,18 +471,25 @@
uci.store[store_id].device = device
uci.store[store_id].flush()
- def set_store_status( self, store_id, status ):
- uci = model.UCI.get( self.uci_id )
- uci.refresh()
- uci.store[store_id].status = status
- uci.store[store_id].flush()
-
- def set_store_availability_zone( self, store_id, availability_zone ):
- uci = model.UCI.get( self.uci_id )
- uci.refresh()
- uci.store[store_id].availability_zone = availability_zone
- uci.store[store_id].flush()
-
+ def set_store_status( self, vol_id, status ):
+ vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first()
+ vol.status = status
+ vol.flush()
+
+ def set_store_availability_zone( self, availability_zone, vol_id=None ):
+ """
+ Sets availability zone of storage volumes for either ALL volumes associated with current
+ UCI or for the volume whose volume ID (e.g., 'vol-39F80512') is provided as argument.
+ """
+ if vol_id is not None:
+ vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).all()
+ else:
+ vol = model.CloudStore.filter( model.CloudStore.c.uci_id == self.uci_id ).all()
+
+ for v in vol:
+ v.availability_zone = availability_zone
+ v.flush()
+
def set_store_volume_id( self, store_id, volume_id ):
"""
Given store ID associated with this UCI, set volume ID as it is registered
@@ -493,13 +500,15 @@
uci.store[store_id].volume_id = volume_id
uci.store[store_id].flush()
- def set_store_instance( self, store_id, instance_id ):
- """ Stores instance ID that given store volume is attached to. """
- uci = model.UCI.get( self.uci_id )
- uci.refresh()
- uci.store[store_id].i_id = instance_id
- uci.store[store_id].flush()
-
+ def set_store_instance( self, vol_id, instance_id ):
+ """
+ Stores instance ID that given store volume is attached to. Store volume ID should
+ be given in following format: 'vol-78943248'
+ """
+ vol = model.CloudStore.filter( model.CloudStore.c.volume_id == vol_id ).first()
+ vol.i_id = instance_id
+ vol.flush()
+
# --------- Getter methods -----------------
def get_instances_indexes( self, state=None ):
@@ -584,17 +593,23 @@
uci.refresh()
return uci.instance[instance_id].private_dns
- def get_store_availability_zone( self, store_id ):
+ def get_uci_availability_zone( self ):
+ """
+ Returns UCI's availability zone.
+ Because all of storage volumes associated with a given UCI must be in the same
+ availability zone, availability of a UCI is determined by availability zone of
+ any one storage volume.
+ """
uci = model.UCI.get( self.uci_id )
uci.refresh()
- return uci.store[store_id].availability_zone
+ return uci.store[0].availability_zone
- def get_store_size( self, store_id ):
+ def get_store_size( self, store_id=0 ):
uci = model.UCI.get( self.uci_id )
uci.refresh()
return uci.store[store_id].size
- def get_store_volume_id( self, store_id ):
+ def get_store_volume_id( self, store_id=0 ):
"""
Given store ID associated with this UCI, get volume ID as it is registered
on the cloud provider (e.g., 'vol-39890501')
@@ -630,337 +645,337 @@
uci.state = 'deleted' # for bookkeeping reasons, mark as deleted but don't actually delete.
uci.flush()
-class JobWrapper( object ):
- """
- Wraps a 'model.Job' with convience methods for running processes and
- state management.
- """
- def __init__(self, job, tool, queue ):
- self.job_id = job.id
- # This is immutable, we cache it for the scheduling policy to use if needed
- self.session_id = job.session_id
- self.tool = tool
- self.queue = queue
- self.app = queue.app
- self.extra_filenames = []
- self.command_line = None
- self.galaxy_lib_dir = None
- # With job outputs in the working directory, we need the working
- # directory to be set before prepare is run, or else premature deletion
- # and job recovery fail.
- self.working_directory = \
- os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
- self.output_paths = None
- self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally
-
- def get_param_dict( self ):
- """
- Restore the dictionary of parameters from the database.
- """
- job = model.Job.get( self.job_id )
- param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] )
- param_dict = self.tool.params_from_strings( param_dict, self.app )
- return param_dict
-
- def prepare( self ):
- """
- Prepare the job to run by creating the working directory and the
- config files.
- """
- mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner
- if not os.path.exists( self.working_directory ):
- os.mkdir( self.working_directory )
- # Restore parameters from the database
- job = model.Job.get( self.job_id )
- incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] )
- incoming = self.tool.params_from_strings( incoming, self.app )
- # Do any validation that could not be done at job creation
- self.tool.handle_unvalidated_param_values( incoming, self.app )
- # Restore input / output data lists
- inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
- out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
- # These can be passed on the command line if wanted as $userId $userEmail
- if job.history.user: # check for anonymous user!
- userId = '%d' % job.history.user.id
- userEmail = str(job.history.user.email)
- else:
- userId = 'Anonymous'
- userEmail = 'Anonymous'
- incoming['userId'] = userId
- incoming['userEmail'] = userEmail
- # Build params, done before hook so hook can use
- param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory )
- # Certain tools require tasks to be completed prior to job execution
- # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
- if self.tool.tool_type is not None:
- out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict )
- # Run the before queue ("exec_before_job") hook
- self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data,
- out_data=out_data, tool=self.tool, param_dict=incoming)
- mapping.context.current.flush()
- # Build any required config files
- config_filenames = self.tool.build_config_files( param_dict, self.working_directory )
- # FIXME: Build the param file (might return None, DEPRECATED)
- param_filename = self.tool.build_param_file( param_dict, self.working_directory )
- # Build the job's command line
- self.command_line = self.tool.build_command_line( param_dict )
- # FIXME: for now, tools get Galaxy's lib dir in their path
- if self.command_line and self.command_line.startswith( 'python' ):
- self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root
- # We need command_line persisted to the db in order for Galaxy to re-queue the job
- # if the server was stopped and restarted before the job finished
- job.command_line = self.command_line
- job.flush()
- # Return list of all extra files
- extra_filenames = config_filenames
- if param_filename is not None:
- extra_filenames.append( param_filename )
- self.param_dict = param_dict
- self.extra_filenames = extra_filenames
- return extra_filenames
+#class JobWrapper( object ):
+# """
+# Wraps a 'model.Job' with convience methods for running processes and
+# state management.
+# """
+# def __init__(self, job, tool, queue ):
+# self.job_id = job.id
+# # This is immutable, we cache it for the scheduling policy to use if needed
+# self.session_id = job.session_id
+# self.tool = tool
+# self.queue = queue
+# self.app = queue.app
+# self.extra_filenames = []
+# self.command_line = None
+# self.galaxy_lib_dir = None
+# # With job outputs in the working directory, we need the working
+# # directory to be set before prepare is run, or else premature deletion
+# # and job recovery fail.
+# self.working_directory = \
+# os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
+# self.output_paths = None
+# self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally
+#
+# def get_param_dict( self ):
+# """
+# Restore the dictionary of parameters from the database.
+# """
+# job = model.Job.get( self.job_id )
+# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] )
+# param_dict = self.tool.params_from_strings( param_dict, self.app )
+# return param_dict
+#
+# def prepare( self ):
+# """
+# Prepare the job to run by creating the working directory and the
+# config files.
+# """
+# mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner
+# if not os.path.exists( self.working_directory ):
+# os.mkdir( self.working_directory )
+# # Restore parameters from the database
+# job = model.Job.get( self.job_id )
+# incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] )
+# incoming = self.tool.params_from_strings( incoming, self.app )
+# # Do any validation that could not be done at job creation
+# self.tool.handle_unvalidated_param_values( incoming, self.app )
+# # Restore input / output data lists
+# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
+# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
+# # These can be passed on the command line if wanted as $userId $userEmail
+# if job.history.user: # check for anonymous user!
+# userId = '%d' % job.history.user.id
+# userEmail = str(job.history.user.email)
+# else:
+# userId = 'Anonymous'
+# userEmail = 'Anonymous'
+# incoming['userId'] = userId
+# incoming['userEmail'] = userEmail
+# # Build params, done before hook so hook can use
+# param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory )
+# # Certain tools require tasks to be completed prior to job execution
+# # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
+# if self.tool.tool_type is not None:
+# out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict )
+# # Run the before queue ("exec_before_job") hook
+# self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data,
+# out_data=out_data, tool=self.tool, param_dict=incoming)
+# mapping.context.current.flush()
+# # Build any required config files
+# config_filenames = self.tool.build_config_files( param_dict, self.working_directory )
+# # FIXME: Build the param file (might return None, DEPRECATED)
+# param_filename = self.tool.build_param_file( param_dict, self.working_directory )
+# # Build the job's command line
+# self.command_line = self.tool.build_command_line( param_dict )
+# # FIXME: for now, tools get Galaxy's lib dir in their path
+# if self.command_line and self.command_line.startswith( 'python' ):
+# self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root
+# # We need command_line persisted to the db in order for Galaxy to re-queue the job
+# # if the server was stopped and restarted before the job finished
+# job.command_line = self.command_line
+# job.flush()
+# # Return list of all extra files
+# extra_filenames = config_filenames
+# if param_filename is not None:
+# extra_filenames.append( param_filename )
+# self.param_dict = param_dict
+# self.extra_filenames = extra_filenames
+# return extra_filenames
+#
+# def fail( self, message, exception=False ):
+# """
+# Indicate job failure by setting state and message on all output
+# datasets.
+# """
+# job = model.Job.get( self.job_id )
+# job.refresh()
+# # if the job was deleted, don't fail it
+# if not job.state == model.Job.states.DELETED:
+# # Check if the failure is due to an exception
+# if exception:
+# # Save the traceback immediately in case we generate another
+# # below
+# job.traceback = traceback.format_exc()
+# # Get the exception and let the tool attempt to generate
+# # a better message
+# etype, evalue, tb = sys.exc_info()
+# m = self.tool.handle_job_failure_exception( evalue )
+# if m:
+# message = m
+# if self.app.config.outputs_to_working_directory:
+# for dataset_path in self.get_output_fnames():
+# try:
+# shutil.move( dataset_path.false_path, dataset_path.real_path )
+# log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
+# except ( IOError, OSError ), e:
+# log.error( "fail(): Missing output file in working directory: %s" % e )
+# for dataset_assoc in job.output_datasets:
+# dataset = dataset_assoc.dataset
+# dataset.refresh()
+# dataset.state = dataset.states.ERROR
+# dataset.blurb = 'tool error'
+# dataset.info = message
+# dataset.set_size()
+# dataset.flush()
+# job.state = model.Job.states.ERROR
+# job.command_line = self.command_line
+# job.info = message
+# job.flush()
+# # If the job was deleted, just clean up
+# self.cleanup()
+#
+# def change_state( self, state, info = False ):
+# job = model.Job.get( self.job_id )
+# job.refresh()
+# for dataset_assoc in job.output_datasets:
+# dataset = dataset_assoc.dataset
+# dataset.refresh()
+# dataset.state = state
+# if info:
+# dataset.info = info
+# dataset.flush()
+# if info:
+# job.info = info
+# job.state = state
+# job.flush()
+#
+# def get_state( self ):
+# job = model.Job.get( self.job_id )
+# job.refresh()
+# return job.state
+#
+# def set_runner( self, runner_url, external_id ):
+# job = model.Job.get( self.job_id )
+# job.refresh()
+# job.job_runner_name = runner_url
+# job.job_runner_external_id = external_id
+# job.flush()
+#
+# def finish( self, stdout, stderr ):
+# """
+# Called to indicate that the associated command has been run. Updates
+# the output datasets based on stderr and stdout from the command, and
+# the contents of the output files.
+# """
+# # default post job setup
+# mapping.context.current.clear()
+# job = model.Job.get( self.job_id )
+# # if the job was deleted, don't finish it
+# if job.state == job.states.DELETED:
+# self.cleanup()
+# return
+# elif job.state == job.states.ERROR:
+# # Job was deleted by an administrator
+# self.fail( job.info )
+# return
+# if stderr:
+# job.state = "error"
+# else:
+# job.state = 'ok'
+# if self.app.config.outputs_to_working_directory:
+# for dataset_path in self.get_output_fnames():
+# try:
+# shutil.move( dataset_path.false_path, dataset_path.real_path )
+# log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
+# except ( IOError, OSError ):
+# self.fail( "Job %s's output dataset(s) could not be read" % job.id )
+# return
+# for dataset_assoc in job.output_datasets:
+# #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur
+# for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running
+# dataset.blurb = 'done'
+# dataset.peek = 'no peek'
+# dataset.info = stdout + stderr
+# dataset.set_size()
+# if stderr:
+# dataset.blurb = "error"
+# elif dataset.has_data():
+# #if a dataset was copied, it won't appear in our dictionary:
+# #either use the metadata from originating output dataset, or call set_meta on the copies
+# #it would be quicker to just copy the metadata from the originating output dataset,
+# #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
+# if not self.external_output_metadata.external_metadata_set_successfully( dataset ):
+# # Only set metadata values if they are missing...
+# dataset.set_meta( overwrite = False )
+# else:
+# #load metadata from file
+# #we need to no longer allow metadata to be edited while the job is still running,
+# #since if it is edited, the metadata changed on the running output will no longer match
+# #the metadata that was stored to disk for use via the external process,
+# #and the changes made by the user will be lost, without warning or notice
+# dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out )
+# if self.tool.is_multi_byte:
+# dataset.set_multi_byte_peek()
+# else:
+# dataset.set_peek()
+# else:
+# dataset.blurb = "empty"
+# dataset.flush()
+# if stderr:
+# dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
+# else:
+# dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
+# dataset_assoc.dataset.dataset.flush()
+#
+# # Save stdout and stderr
+# if len( stdout ) > 32768:
+# log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
+# job.stdout = stdout[:32768]
+# if len( stderr ) > 32768:
+# log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
+# job.stderr = stderr[:32768]
+# # custom post process setup
+# inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
+# out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
+# param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows
+# param_dict = self.tool.params_from_strings( param_dict, self.app )
+# # Check for and move associated_files
+# self.tool.collect_associated_files(out_data, self.working_directory)
+# # Create generated output children and primary datasets and add to param_dict
+# collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)}
+# param_dict.update({'__collected_datasets__':collected_datasets})
+# # Certain tools require tasks to be completed after job execution
+# # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ).
+# if self.tool.tool_type is not None:
+# self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job )
+# # Call 'exec_after_process' hook
+# self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data,
+# out_data=out_data, param_dict=param_dict,
+# tool=self.tool, stdout=stdout, stderr=stderr )
+# # TODO
+# # validate output datasets
+# job.command_line = self.command_line
+# mapping.context.current.flush()
+# log.debug( 'job %d ended' % self.job_id )
+# self.cleanup()
+#
+# def cleanup( self ):
+# # remove temporary files
+# try:
+# for fname in self.extra_filenames:
+# os.remove( fname )
+# if self.working_directory is not None:
+# shutil.rmtree( self.working_directory )
+# if self.app.config.set_metadata_externally:
+# self.external_output_metadata.cleanup_external_metadata()
+# except:
+# log.exception( "Unable to cleanup job %d" % self.job_id )
+#
+# def get_command_line( self ):
+# return self.command_line
+#
+# def get_session_id( self ):
+# return self.session_id
+#
+# def get_input_fnames( self ):
+# job = model.Job.get( self.job_id )
+# filenames = []
+# for da in job.input_datasets: #da is JobToInputDatasetAssociation object
+# if da.dataset:
+# filenames.append( da.dataset.file_name )
+# #we will need to stage in metadata file names also
+# #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.)
+# for key, value in da.dataset.metadata.items():
+# if isinstance( value, model.MetadataFile ):
+# filenames.append( value.file_name )
+# return filenames
+#
+# def get_output_fnames( self ):
+# if self.output_paths is not None:
+# return self.output_paths
+#
+# class DatasetPath( object ):
+# def __init__( self, real_path, false_path = None ):
+# self.real_path = real_path
+# self.false_path = false_path
+# def __str__( self ):
+# if self.false_path is None:
+# return self.real_path
+# else:
+# return self.false_path
+#
+# job = model.Job.get( self.job_id )
+# if self.app.config.outputs_to_working_directory:
+# self.output_paths = []
+# for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]:
+# false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
+# self.output_paths.append( DatasetPath( data.file_name, false_path ) )
+# else:
+# self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ]
+# return self.output_paths
+#
+# def check_output_sizes( self ):
+# sizes = []
+# output_paths = self.get_output_fnames()
+# for outfile in [ str( o ) for o in output_paths ]:
+# sizes.append( ( outfile, os.stat( outfile ).st_size ) )
+# return sizes
+# def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ):
+# if tmp_dir is None:
+# #this dir should should relative to the exec_dir
+# tmp_dir = self.app.config.new_file_path
+# if dataset_files_path is None:
+# dataset_files_path = self.app.model.Dataset.file_path
+# if config_root is None:
+# config_root = self.app.config.root
+# if datatypes_config is None:
+# datatypes_config = self.app.config.datatypes_config
+# job = model.Job.get( self.job_id )
+# return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds )
- def fail( self, message, exception=False ):
- """
- Indicate job failure by setting state and message on all output
- datasets.
- """
- job = model.Job.get( self.job_id )
- job.refresh()
- # if the job was deleted, don't fail it
- if not job.state == model.Job.states.DELETED:
- # Check if the failure is due to an exception
- if exception:
- # Save the traceback immediately in case we generate another
- # below
- job.traceback = traceback.format_exc()
- # Get the exception and let the tool attempt to generate
- # a better message
- etype, evalue, tb = sys.exc_info()
- m = self.tool.handle_job_failure_exception( evalue )
- if m:
- message = m
- if self.app.config.outputs_to_working_directory:
- for dataset_path in self.get_output_fnames():
- try:
- shutil.move( dataset_path.false_path, dataset_path.real_path )
- log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
- except ( IOError, OSError ), e:
- log.error( "fail(): Missing output file in working directory: %s" % e )
- for dataset_assoc in job.output_datasets:
- dataset = dataset_assoc.dataset
- dataset.refresh()
- dataset.state = dataset.states.ERROR
- dataset.blurb = 'tool error'
- dataset.info = message
- dataset.set_size()
- dataset.flush()
- job.state = model.Job.states.ERROR
- job.command_line = self.command_line
- job.info = message
- job.flush()
- # If the job was deleted, just clean up
- self.cleanup()
-
- def change_state( self, state, info = False ):
- job = model.Job.get( self.job_id )
- job.refresh()
- for dataset_assoc in job.output_datasets:
- dataset = dataset_assoc.dataset
- dataset.refresh()
- dataset.state = state
- if info:
- dataset.info = info
- dataset.flush()
- if info:
- job.info = info
- job.state = state
- job.flush()
-
- def get_state( self ):
- job = model.Job.get( self.job_id )
- job.refresh()
- return job.state
-
- def set_runner( self, runner_url, external_id ):
- job = model.Job.get( self.job_id )
- job.refresh()
- job.job_runner_name = runner_url
- job.job_runner_external_id = external_id
- job.flush()
-
- def finish( self, stdout, stderr ):
- """
- Called to indicate that the associated command has been run. Updates
- the output datasets based on stderr and stdout from the command, and
- the contents of the output files.
- """
- # default post job setup
- mapping.context.current.clear()
- job = model.Job.get( self.job_id )
- # if the job was deleted, don't finish it
- if job.state == job.states.DELETED:
- self.cleanup()
- return
- elif job.state == job.states.ERROR:
- # Job was deleted by an administrator
- self.fail( job.info )
- return
- if stderr:
- job.state = "error"
- else:
- job.state = 'ok'
- if self.app.config.outputs_to_working_directory:
- for dataset_path in self.get_output_fnames():
- try:
- shutil.move( dataset_path.false_path, dataset_path.real_path )
- log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
- except ( IOError, OSError ):
- self.fail( "Job %s's output dataset(s) could not be read" % job.id )
- return
- for dataset_assoc in job.output_datasets:
- #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur
- for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running
- dataset.blurb = 'done'
- dataset.peek = 'no peek'
- dataset.info = stdout + stderr
- dataset.set_size()
- if stderr:
- dataset.blurb = "error"
- elif dataset.has_data():
- #if a dataset was copied, it won't appear in our dictionary:
- #either use the metadata from originating output dataset, or call set_meta on the copies
- #it would be quicker to just copy the metadata from the originating output dataset,
- #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
- if not self.external_output_metadata.external_metadata_set_successfully( dataset ):
- # Only set metadata values if they are missing...
- dataset.set_meta( overwrite = False )
- else:
- #load metadata from file
- #we need to no longer allow metadata to be edited while the job is still running,
- #since if it is edited, the metadata changed on the running output will no longer match
- #the metadata that was stored to disk for use via the external process,
- #and the changes made by the user will be lost, without warning or notice
- dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out )
- if self.tool.is_multi_byte:
- dataset.set_multi_byte_peek()
- else:
- dataset.set_peek()
- else:
- dataset.blurb = "empty"
- dataset.flush()
- if stderr:
- dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
- else:
- dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
- dataset_assoc.dataset.dataset.flush()
-
- # Save stdout and stderr
- if len( stdout ) > 32768:
- log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
- job.stdout = stdout[:32768]
- if len( stderr ) > 32768:
- log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
- job.stderr = stderr[:32768]
- # custom post process setup
- inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
- out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
- param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows
- param_dict = self.tool.params_from_strings( param_dict, self.app )
- # Check for and move associated_files
- self.tool.collect_associated_files(out_data, self.working_directory)
- # Create generated output children and primary datasets and add to param_dict
- collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)}
- param_dict.update({'__collected_datasets__':collected_datasets})
- # Certain tools require tasks to be completed after job execution
- # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ).
- if self.tool.tool_type is not None:
- self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job )
- # Call 'exec_after_process' hook
- self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data,
- out_data=out_data, param_dict=param_dict,
- tool=self.tool, stdout=stdout, stderr=stderr )
- # TODO
- # validate output datasets
- job.command_line = self.command_line
- mapping.context.current.flush()
- log.debug( 'job %d ended' % self.job_id )
- self.cleanup()
-
- def cleanup( self ):
- # remove temporary files
- try:
- for fname in self.extra_filenames:
- os.remove( fname )
- if self.working_directory is not None:
- shutil.rmtree( self.working_directory )
- if self.app.config.set_metadata_externally:
- self.external_output_metadata.cleanup_external_metadata()
- except:
- log.exception( "Unable to cleanup job %d" % self.job_id )
-
- def get_command_line( self ):
- return self.command_line
-
- def get_session_id( self ):
- return self.session_id
-
- def get_input_fnames( self ):
- job = model.Job.get( self.job_id )
- filenames = []
- for da in job.input_datasets: #da is JobToInputDatasetAssociation object
- if da.dataset:
- filenames.append( da.dataset.file_name )
- #we will need to stage in metadata file names also
- #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.)
- for key, value in da.dataset.metadata.items():
- if isinstance( value, model.MetadataFile ):
- filenames.append( value.file_name )
- return filenames
-
- def get_output_fnames( self ):
- if self.output_paths is not None:
- return self.output_paths
-
- class DatasetPath( object ):
- def __init__( self, real_path, false_path = None ):
- self.real_path = real_path
- self.false_path = false_path
- def __str__( self ):
- if self.false_path is None:
- return self.real_path
- else:
- return self.false_path
-
- job = model.Job.get( self.job_id )
- if self.app.config.outputs_to_working_directory:
- self.output_paths = []
- for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]:
- false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
- self.output_paths.append( DatasetPath( data.file_name, false_path ) )
- else:
- self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ]
- return self.output_paths
-
- def check_output_sizes( self ):
- sizes = []
- output_paths = self.get_output_fnames()
- for outfile in [ str( o ) for o in output_paths ]:
- sizes.append( ( outfile, os.stat( outfile ).st_size ) )
- return sizes
- def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ):
- if tmp_dir is None:
- #this dir should should relative to the exec_dir
- tmp_dir = self.app.config.new_file_path
- if dataset_files_path is None:
- dataset_files_path = self.app.model.Dataset.file_path
- if config_root is None:
- config_root = self.app.config.root
- if datatypes_config is None:
- datatypes_config = self.app.config.datatypes_config
- job = model.Job.get( self.job_id )
- return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds )
-
-class DefaultCloudProvider( object ):
+class CloudProvider( object ):
def __init__( self, app ):
self.app = app
self.cloud_provider = {}
diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/cloud/providers/ec2.py
--- a/lib/galaxy/cloud/providers/ec2.py Wed Oct 14 19:20:11 2009 -0400
+++ b/lib/galaxy/cloud/providers/ec2.py Fri Oct 16 13:06:44 2009 -0400
@@ -0,0 +1,526 @@
+import subprocess, threading, os, errno, time, datetime
+from Queue import Queue, Empty
+from datetime import datetime
+
+from galaxy import model # Database interaction class
+from galaxy.model import mapping
+from galaxy.datatypes.data import nice_size
+from Queue import Queue
+from sqlalchemy import or_
+
+import galaxy.eggs
+galaxy.eggs.require("boto")
+from boto.ec2.connection import EC2Connection
+from boto.ec2.regioninfo import RegionInfo
+
+import logging
+log = logging.getLogger( __name__ )
+
+class EC2CloudProvider( object ):
+ """
+ Amazon EC2-based cloud provider implementation for managing instances.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app ):
+ self.zone = "us-east-1a"
+ self.key_pair = "galaxy-keypair"
+ self.queue = Queue()
+
+ #TODO: Use multiple threads to process requests?
+ self.threads = []
+ nworkers = 5
+ log.info( "Starting EC2 cloud controller workers" )
+ for i in range( nworkers ):
+ worker = threading.Thread( target=self.run_next )
+ worker.start()
+ self.threads.append( worker )
+ log.debug( "%d EC2 cloud workers ready", nworkers )
+
+ def run_next( self ):
+ """Run the next job, waiting until one is available if necessary"""
+ cnt = 0
+ while 1:
+
+ uci_wrapper = self.queue.get()
+# uci = uci_wrapper.get_uci()
+ log.debug( '[%d] uci name: %s' % ( cnt, uci_wrapper.get_name() ) )
+ uci_state = uci_wrapper.get_state()
+ if uci_state is self.STOP_SIGNAL:
+ return
+ try:
+ if uci_state=="new":
+ log.debug( "Calling create UCI" )
+ self.createUCI( uci_wrapper )
+ elif uci_state=="deleting":
+ self.deleteUCI( uci_wrapper )
+ elif uci_state=="submitted":
+ log.debug( "Calling start UCI" )
+ self.startUCI( uci_wrapper )
+ elif uci_state=="shutting-down":
+ self.stopUCI( uci_wrapper )
+ except:
+ log.exception( "Uncaught exception executing request." )
+ cnt += 1
+
+ def get_connection( self, uci_wrapper ):
+ """
+ Establishes EC2 cloud connection using user's credentials associated with given UCI
+ """
+ log.debug( '##### Establishing EC2 cloud connection' )
+ conn = EC2Connection( uci_wrapper.get_access_key(), uci_wrapper.get_secret_key() )
+ return conn
+
+ def set_keypair( self, uci_wrapper, conn ):
+ """
+ Generate keypair using user's default credentials
+ """
+ log.debug( "Getting user's keypair" )
+ kp = conn.get_key_pair( self.key_pair )
+ instances = uci_wrapper.get_instances_indexes()
+
+ try:
+ for inst in instances:
+ log.debug("inst: '%s'" % inst )
+ uci_wrapper.set_key_pair( inst, kp.name )
+ return kp.name
+ except AttributeError: # No keypair under this name exists so create it
+ log.info( "No keypair found, creating keypair '%s'" % self.key_pair )
+ kp = conn.create_key_pair( self.key_pair )
+ for inst in instances:
+ uci_wrapper.set_key_pair( inst, kp.name, kp.material )
+
+ return kp.name
+
+ def get_mi_id( self, type ):
+ """
+ Get appropriate machine image (mi) based on instance size.
+ TODO: Dummy method - need to implement logic
+ For valid sizes, see http://aws.amazon.com/ec2/instance-types/
+ """
+ return model.CloudImage.filter( model.CloudImage.table.c.id==2 ).first().image_id
+
+# def get_instances( self, uci ):
+# """
+# Get objects of instances that are pending or running and are connected to uci object
+# """
+# instances = trans.sa_session.query( model.CloudInstance ) \
+# .filter_by( user=user, uci_id=uci.id ) \
+# .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \
+# .first()
+# #.all() #TODO: return all but need to edit calling method(s) to handle list
+#
+# instances = uci.instance
+#
+# return instances
+
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the monitor thread"""
+ log.info( "sending stop signal to worker threads in EC2 cloud manager" )
+ for i in range( len( self.threads ) ):
+ self.queue.put( self.STOP_SIGNAL )
+ log.info( "EC2 cloud manager stopped" )
+
+ def put( self, uci_wrapper ):
+ # Get rid of UCI from state description
+ state = uci_wrapper.get_state()
+ uci_wrapper.change_state( state.split('U')[0] ) # remove 'UCI' from end of state description (i.e., mark as accepted and ready for processing)
+ self.queue.put( uci_wrapper )
+
+ def createUCI( self, uci_wrapper ):
+ """
+ Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider
+ and registers relevant information in Galaxy database.
+ """
+ conn = self.get_connection( uci_wrapper )
+ # Temporary code - need to ensure user selects zone at UCI creation time!
+ if uci_wrapper.get_uci_availability_zone()=='':
+ log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone )
+ uci_wrapper.set_store_availability_zone( self.zone )
+
+ #TODO: check if volume associated with UCI already exists (if server crashed for example) and don't recreate it
+ log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_uci_availability_zone() )
+ # Because only 1 storage volume may be created at UCI config time, index of this storage volume in local Galaxy DB w.r.t
+ # current UCI is 0, so reference it in following methods
+ vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone(), snapshot=None )
+ uci_wrapper.set_store_volume_id( 0, vol.id )
+
+ # Wait for a while to ensure volume was created
+ vol_status = vol.status
+ for i in range( 30 ):
+ if vol_status is not "available":
+ log.debug( 'Updating volume status; current status: %s' % vol_status )
+ vol_status = vol.status
+ time.sleep(3)
+ if i is 29:
+ log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) )
+ conn.delete_volume( vol.id )
+ uci_wrapper.change_state( uci_state='error' )
+ return
+
+ uci_wrapper.change_state( uci_state='available' )
+ uci_wrapper.set_store_status( vol.id, vol_status )
+
+ def deleteUCI( self, uci_wrapper ):
+ """
+ Deletes UCI. NOTE that this implies deletion of any and all data associated
+ with this UCI from the cloud. All data will be deleted.
+ """
+ conn = self.get_connection( uci_wrapper )
+ vl = [] # volume list
+ count = 0 # counter for checking if all volumes assoc. w/ UCI were deleted
+
+ # Get all volumes assoc. w/ UCI, delete them from cloud as well as in local DB
+ vl = uci_wrapper.get_all_stores()
+ deletedList = []
+ failedList = []
+ for v in vl:
+ log.debug( "Deleting volume with id='%s'" % v.volume_id )
+ if conn.delete_volume( v.volume_id ):
+ deletedList.append( v.volume_id )
+ v.delete()
+ v.flush()
+ count += 1
+ else:
+ failedList.append( v.volume_id )
+
+ # Delete UCI if all of associated
+ log.debug( "count=%s, len(vl)=%s" % (count, len( vl ) ) )
+ if count == len( vl ):
+ uci_wrapper.delete()
+ else:
+ log.error( "Deleting following volume(s) failed: %s. However, these volumes were successfully deleted: %s. \
+ MANUAL intervention and processing needed." % ( failedList, deletedList ) )
+ uci_wrapper.change_state( uci_state="error" )
+
+ def addStorageToUCI( self, name ):
+ """ Adds more storage to specified UCI
+ TODO"""
+
+ def dummyStartUCI( self, uci_wrapper ):
+
+ uci = uci_wrapper.get_uci()
+ log.debug( "Would be starting instance '%s'" % uci.name )
+ uci_wrapper.change_state( 'pending' )
+# log.debug( "Sleeping a bit... (%s)" % uci.name )
+# time.sleep(20)
+# log.debug( "Woke up! (%s)" % uci.name )
+
+ def startUCI( self, uci_wrapper ):
+ """
+ Starts instance(s) of given UCI on the cloud.
+ """
+ conn = self.get_connection( uci_wrapper )
+#
+ self.set_keypair( uci_wrapper, conn )
+ i_indexes = uci_wrapper.get_instances_indexes() # Get indexes of *new* i_indexes associated with this UCI
+ log.debug( "Starting instances with IDs: '%s' associated with UCI '%s' " % ( uci_wrapper.get_name(), i_indexes ) )
+
+ for i_index in i_indexes:
+ mi_id = self.get_mi_id( uci_wrapper.get_type( i_index ) )
+ log.debug( "mi_id: %s, uci_wrapper.get_key_pair_name( i_index ): %s" % ( mi_id, uci_wrapper.get_key_pair_name( i_index ) ) )
+ uci_wrapper.set_mi( i_index, mi_id )
+
+ # Check if galaxy security group exists (and create it if it does not)
+ log.debug( '***** Setting up security group' )
+ security_group = 'galaxyWeb'
+ sgs = conn.get_all_security_groups() # security groups
+ gsgt = False # galaxy security group test
+ for sg in sgs:
+ if sg.name == security_group:
+ gsgt = True
+ # If security group does not exist, create it
+ if not gsgt:
+ gSecurityGroup = conn.create_security_group(security_group, 'Security group for Galaxy.')
+ gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port
+ gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
+ # Start an instance
+ log.debug( "***** Starting UCI instance '%s'" % uci_wrapper.get_name() )
+ log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s, security_groups=[%s], instance_type=%s, placement=%s )'
+ % ( mi_id, uci_wrapper.get_key_pair_name( i_index ), [security_group], uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) )
+ reservation = conn.run_instances( image_id=mi_id,
+ key_name=uci_wrapper.get_key_pair_name( i_index ),
+ security_groups=[security_group],
+ instance_type=uci_wrapper.get_type( i_index ),
+ placement=uci_wrapper.get_uci_availability_zone() )
+ # Record newly available instance data into local Galaxy database
+ l_time = datetime.utcnow()
+ uci_wrapper.set_launch_time( l_time, i_index=i_index ) # format_time( reservation.i_indexes[0].launch_time ) )
+ if not uci_wrapper.uci_launch_time_set():
+ uci_wrapper.set_uci_launch_time( l_time )
+ uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] )
+ # TODO: if more than a single instance will be started through single reservation, change this reference to element [0]
+ i_id = str( reservation.instances[0]).split(":")[1]
+ uci_wrapper.set_instance_id( i_index, i_id )
+ s = reservation.instances[0].state
+ uci_wrapper.change_state( s, i_id, s )
+ log.debug( "Instance of UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) )
+
+
+
+# # Wait until instance gets running and then update the DB
+# while s!="running":
+# log.debug( "Waiting on instance '%s' to start up (reservation ID: %s); current state: %s" % ( uci.instance[0].instance_id, uci.instance[0].reservation_id, s ) )
+# time.sleep( 15 )
+# s = reservation.i_indexes[0].update()
+#
+# # Update instance data in local DB
+# uci.instance[0].state = s
+# uci.instance[0].public_dns = reservation.i_indexes[0].dns_name
+# uci.instance[0].private_dns = reservation.i_indexes[0].private_dns_name
+# uci.instance[0].flush()
+# # Update storage data in local DB w/ volume state info. NOTE that this only captures current volume state
+# # and does not connect or wait on connection between instance and volume to be established
+# vl = model.CloudStore.filter( model.CloudStore.c.uci_id == uci.id ).all()
+# vols = []
+# for v in vl:
+# vols.append( v.volume_id )
+# try:
+# volumes = conn.get_all_volumes( vols )
+# for i, v in enumerate( volumes ):
+# uci.store[i].i_id = v.instance_id
+# uci.store[i].status = v.status
+# uci.store[i].device = v.device
+# uci.store[i].flush()
+# except BotoServerError:
+# log.debug( "Error getting volume(s) attached to instance. Volume status was not updated." )
+#
+# uci.state = s
+# uci.flush()
+
+
+ def stopUCI( self, uci_wrapper):
+ """
+ Stops all of cloud instances associated with given UCI.
+ """
+ conn = self.get_connection( uci_wrapper )
+
+ # Get all instances associated with given UCI
+ il = uci_wrapper.get_instances_ids() # instance list
+# log.debug( 'List of instances being terminated: %s' % il )
+ rl = conn.get_all_instances( il ) # Reservation list associated with given instances
+
+# tState = conn.terminate_instances( il )
+# # TODO: Need to update instance stop time (for all individual instances)
+# stop_time = datetime.utcnow()
+# uci_wrapper.set_stop_time( stop_time )
+
+ # Initiate shutdown of all instances under given UCI
+ cnt = 0
+ stopped = []
+ notStopped = []
+ for r in rl:
+ for inst in r.instances:
+ log.debug( "Sending stop signal to instance '%s' associated with reservation '%s'." % ( inst, r ) )
+ inst.stop()
+ uci_wrapper.set_stop_time( datetime.utcnow(), i_id=inst.id )
+ uci_wrapper.change_state( instance_id=inst.id, i_state=inst.update() )
+ stopped.append( inst )
+
+# uci_wrapper.change_state( uci_state='available' )
+ uci_wrapper.reset_uci_launch_time()
+
+# # Wait for all instances to actually terminate and update local DB
+# terminated=0
+# while terminated!=len( rl ):
+# for i, r in enumerate( rl ):
+# log.debug( "r state: %s" % r.instances[0].state )
+# state = r.instances[0].update()
+# if state=='terminated':
+# uci.instance[i].state = state
+# uci.instance[i].flush()
+# terminated += 1
+# time.sleep ( 5 )
+#
+# # Reset UCI state
+# uci.state = 'available'
+# uci.launch_time = None
+# uci.flush()
+#
+ log.debug( "All instances for UCI '%s' were terminated." % uci_wrapper.get_name() )
+
+
+
+# dbInstances = get_instances( trans, uci ) #TODO: handle list!
+#
+# # Get actual cloud instance object
+# cloudInstance = get_cloud_instance( conn, dbInstances.instance_id )
+#
+# # TODO: Detach persistent storage volume(s) from instance and update volume data in local database
+# stores = get_stores( trans, uci )
+# for i, store in enumerate( stores ):
+# log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) )
+# mntDevice = store.device
+# volStat = None
+## Detaching volume does not work with Eucalyptus Public Cloud, so comment it out
+## try:
+## volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice )
+## except:
+## log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id )
+# store.attach_time = None
+# store.device = None
+# store.i_id = None
+# store.status = volStat
+# log.debug ( '***** volume status: %s' % volStat )
+#
+#
+# # Stop the instance and update status in local database
+# cloudInstance.stop()
+# dbInstances.stop_time = datetime.utcnow()
+# while cloudInstance.state != 'terminated':
+# log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) )
+# time.sleep(3)
+# cloudInstance.update()
+# dbInstances.state = cloudInstance.state
+#
+# # Reset relevant UCI fields
+# uci.state = 'available'
+# uci.launch_time = None
+#
+# # Persist
+# session = trans.sa_session
+## session.save_or_update( stores )
+# session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable?
+# session.save_or_update( uci )
+# session.flush()
+# trans.log_event( "User stopped cloud instance '%s'" % uci.name )
+# trans.set_message( "Galaxy instance '%s' stopped." % uci.name )
+
+ def update( self ):
+ """
+ Runs a global status update on all storage volumes and all instances whose UCI is in
+ 'running', 'pending', or 'shutting-down' state.
+ Reason behind this method is to sync state of local DB and real-world resources
+ """
+ log.debug( "Running general status update for EPC UCIs." )
+ instances = model.CloudInstance.filter( or_( model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending", model.CloudInstance.c.state=="shutting-down" ) ).all()
+ for inst in instances:
+ log.debug( "Running general status update on instance '%s'" % inst.instance_id )
+ self.updateInstance( inst )
+
+ stores = model.CloudStore.filter( or_( model.CloudStore.c.status=="in-use", model.CloudStore.c.status=="creating" ) ).all()
+ for store in stores:
+ log.debug( "Running general status update on store '%s'" % store.volume_id )
+ self.updateStore( store )
+
+ def updateInstance( self, inst ):
+
+ # Get credentials associated wit this instance
+ uci_id = inst.uci_id
+ uci = model.UCI.get( uci_id )
+ uci.refresh()
+ a_key = uci.credentials.access_key
+ s_key = uci.credentials.secret_key
+ # Get connection
+ conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key )
+ # Get reservations handle for given instance
+ rl= conn.get_all_instances( [inst.instance_id] )
+ # Because EPC deletes references to reservations after a short while after instances have terminated, getting an empty list as a response to a query
+ # typically means the instance has successfully shut down but the check was not performed in short enough amount of time. As a result, below code simply
+ # marks given instance as having terminated. Note that an instance might have also crashed and this code will not catch the difference...
+ if len( rl ) == 0:
+ log.info( "Instance ID '%s' was not found by the cloud provider. Instance might have crashed or otherwise been terminated." % inst.instance_id )
+ inst.state = 'terminated'
+ uci.state = 'available'
+ uci.launch_time = None
+ inst.flush()
+ uci.flush()
+ # Update instance status in local DB with info from cloud provider
+ for r in rl:
+ for i, cInst in enumerate( r.instances ):
+ s = cInst.update()
+ log.debug( "Checking state of cloud instance '%s' associated with reservation '%s'. State='%s'" % ( cInst, r, s ) )
+ if s != inst.state:
+ inst.state = s
+ inst.flush()
+ if s == 'terminated': # After instance has shut down, ensure UCI is marked as 'available'
+ uci.state = 'available'
+ uci.flush()
+ if s != uci.state and s != 'terminated':
+ # Making sure state of UCI is updated. Once multiple instances become associated with single UCI, this will need to be changed.
+ uci.state = s
+ uci.flush()
+ if cInst.public_dns_name != inst.public_dns:
+ inst.public_dns = cInst.public_dns_name
+ inst.flush()
+ if cInst.private_dns_name != inst.private_dns:
+ inst.private_dns = cInst.private_dns_name
+ inst.flush()
+
+ def updateStore( self, store ):
+ # Get credentials associated wit this store
+ uci_id = store.uci_id
+ uci = model.UCI.get( uci_id )
+ uci.refresh()
+ a_key = uci.credentials.access_key
+ s_key = uci.credentials.secret_key
+ # Get connection
+ conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key )
+ # Get reservations handle for given store
+ vl = conn.get_all_volumes( [store.volume_id] )
+# log.debug( "Store '%s' vl: '%s'" % ( store.volume_id, vl ) )
+ # Update store status in local DB with info from cloud provider
+ if store.status != vl[0].status:
+ store.status = vl[0].status
+ store.flush()
+ if store.i_id != vl[0].instance_id:
+ store.i_id = vl[0].instance_id
+ store.flush()
+ if store.attach_time != vl[0].attach_time:
+ store.attach_time = vl[0].attach_time
+ store.flush()
+ if store.device != vl[0].device:
+ store.device = vl[0].device
+ store.flush()
+
+# def updateUCI( self, uci ):
+# """
+# Runs a global status update on all storage volumes and all instances that are
+# associated with specified UCI
+# """
+# conn = self.get_connection( uci )
+#
+# # Update status of storage volumes
+# vl = model.CloudStore.filter( model.CloudInstance.c.uci_id == uci.id ).all()
+# vols = []
+# for v in vl:
+# vols.append( v.volume_id )
+# try:
+# volumes = conn.get_all_volumes( vols )
+# for i, v in enumerate( volumes ):
+# uci.store[i].i_id = v.instance_id
+# uci.store[i].status = v.status
+# uci.store[i].device = v.device
+# uci.store[i].flush()
+# except:
+# log.debug( "Error updating status of volume(s) associated with UCI '%s'. Status was not updated." % uci.name )
+# pass
+#
+# # Update status of instances
+# il = model.CloudInstance.filter_by( uci_id=uci.id ).filter( model.CloudInstance.c.state != 'terminated' ).all()
+# instanceList = []
+# for i in il:
+# instanceList.append( i.instance_id )
+# log.debug( 'instanceList: %s' % instanceList )
+# try:
+# reservations = conn.get_all_instances( instanceList )
+# for i, r in enumerate( reservations ):
+# uci.instance[i].state = r.instances[0].update()
+# log.debug('updating instance %s; status: %s' % ( uci.instance[i].instance_id, uci.instance[i].state ) )
+# uci.state = uci.instance[i].state
+# uci.instance[i].public_dns = r.instances[0].dns_name
+# uci.instance[i].private_dns = r.instances[0].private_dns_name
+# uci.instance[i].flush()
+# uci.flush()
+# except:
+# log.debug( "Error updating status of instances associated with UCI '%s'. Instance status was not updated." % uci.name )
+# pass
+
+ # --------- Helper methods ------------
+
+ def format_time( time ):
+ dict = {'T':' ', 'Z':''}
+ for i, j in dict.iteritems():
+ time = time.replace(i, j)
+ return time
+
\ No newline at end of file
diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/cloud/providers/eucalyptus.py
--- a/lib/galaxy/cloud/providers/eucalyptus.py Wed Oct 14 19:20:11 2009 -0400
+++ b/lib/galaxy/cloud/providers/eucalyptus.py Fri Oct 16 13:06:44 2009 -0400
@@ -22,7 +22,6 @@
"""
STOP_SIGNAL = object()
def __init__( self, app ):
- log.debug( "Using eucalyptus as default cloud provider." )
self.zone = "epc"
self.key_pair = "galaxy-keypair"
self.queue = Queue()
@@ -35,7 +34,7 @@
worker = threading.Thread( target=self.run_next )
worker.start()
self.threads.append( worker )
- log.debug( "%d cloud workers ready", nworkers )
+ log.debug( "%d eucalyptus cloud workers ready", nworkers )
def run_next( self ):
"""Run the next job, waiting until one is available if necessary"""
@@ -65,12 +64,9 @@
def get_connection( self, uci_wrapper ):
"""
- Establishes EC2 connection using user's default credentials
+ Establishes eucalyptus cloud connection using user's credentials associated with given UCI
"""
- log.debug( '##### Establishing cloud connection' )
- # Amazon EC2
- #conn = EC2Connection( uci_wrapper.get_access_key(), uci_wrapper.get_secret_key() )
-
+ log.debug( '##### Establishing eucalyptus cloud connection' )
# Eucalyptus Public Cloud
# TODO: Add option in Galaxy config file to specify these values (i.e., for locally managed Eucalyptus deployments)
euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" )
@@ -141,32 +137,20 @@
"""
conn = self.get_connection( uci_wrapper )
# Temporary code - need to ensure user selects zone at UCI creation time!
- if uci_wrapper.get_store_availability_zone( 0 )=='':
- log.info( "Availability zone for storage volume was not selected, using default zone: %s" % self.zone )
- uci_wrapper.set_store_availability_zone( 0, self.zone )
+ if uci_wrapper.get_uci_availability_zone()=='':
+ log.info( "Availability zone for UCI (i.e., storage volume) was not selected, using default zone: %s" % self.zone )
+ uci_wrapper.set_store_availability_zone( self.zone )
#TODO: check if volume associated with UCI already exists (if server crashed for example) and don't recreate it
- log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_store_availability_zone( 0 ) )
- vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_store_availability_zone( 0 ), snapshot=None )
- uci_wrapper.set_store_volume_id( 0, vol.id )
-
- # Wait for a while to ensure volume was created
-# vol_status = vol.status
-# for i in range( 30 ):
-# if vol_status is not "u'available'":
-# log.debug( 'Updating volume status; current status: %s' % vol_status )
-# vol_status = vol.status
-# time.sleep(3)
-# if i is 29:
-# log.debug( "Error while creating volume '%s'; stuck in state '%s'; deleting volume." % ( vol.id, vol_status ) )
-# conn.delete_volume( vol.id )
-# uci.state = 'error'
-# uci.flush()
-# return
+ log.info( "Creating volume in zone '%s'..." % uci_wrapper.get_uci_availability_zone() )
+ # Because only 1 storage volume may be created at UCI config time, index of this storage volume in local Galaxy DB w.r.t
+ # current UCI is 0, so reference it in following methods
+ vol = conn.create_volume( uci_wrapper.get_store_size( 0 ), uci_wrapper.get_uci_availability_zone(), snapshot=None )
+ uci_wrapper.set_store_volume_id( 0, vol.id )
# EPC does not allow creation of storage volumes (it deletes one as soon as it is created, so manually set uci_state here)
uci_wrapper.change_state( uci_state='available' )
- uci_wrapper.set_store_status( 0, vol.status )
+ uci_wrapper.set_store_status( vol.id, vol.status )
def deleteUCI( self, uci_wrapper ):
"""
@@ -214,7 +198,7 @@
def startUCI( self, uci_wrapper ):
"""
- Starts an instance of named UCI on the cloud.
+ Starts instance(s) of given UCI on the cloud.
"""
conn = self.get_connection( uci_wrapper )
#
@@ -250,11 +234,12 @@
if not uci_wrapper.uci_launch_time_set():
uci_wrapper.set_uci_launch_time( l_time )
uci_wrapper.set_reservation_id( i_index, str( reservation ).split(":")[1] )
+ # TODO: if more than a single instance will be started through single reservation, change this reference to element [0]
i_id = str( reservation.instances[0]).split(":")[1]
uci_wrapper.set_instance_id( i_index, i_id )
- s = reservation.instances[0].state # TODO: once more than a single instance will be started through single reservation, change this
+ s = reservation.instances[0].state
uci_wrapper.change_state( s, i_id, s )
- log.debug( "UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) )
+ log.debug( "Instance of UCI '%s' started, current state: %s" % ( uci_wrapper.get_name(), uci_wrapper.get_state() ) )
@@ -291,7 +276,7 @@
def stopUCI( self, uci_wrapper):
"""
- Stops all of cloud instances associated with named UCI.
+ Stops all of cloud instances associated with given UCI.
"""
conn = self.get_connection( uci_wrapper )
diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Wed Oct 14 19:20:11 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Fri Oct 16 13:06:44 2009 -0400
@@ -14,7 +14,10 @@
from galaxy.workflow.modules import *
from galaxy.model.mapping import desc
from galaxy.model.orm import *
-from datetime import datetime
+from datetime import datetime, timedelta
+
+pkg_resources.require( "WebHelpers" )
+from webhelpers import *
# Required for Cloud tab
import galaxy.eggs
@@ -116,7 +119,7 @@
@web.expose
@web.require_login( "start Galaxy cloud instance" )
- def start( self, trans, id, type='small' ):
+ def start( self, trans, id, type='m1.small' ):
"""
Start a new cloud resource instance
"""
@@ -311,28 +314,39 @@
Configure and add new cloud instance to user's instance pool
"""
inst_error = vol_error = cred_error = None
+ error = {}
user = trans.get_user()
# TODO: Hack until present user w/ bullet list w/ registered credentials
- storedCreds = trans.sa_session.query( model.CloudUserCredentials ) \
- .filter_by( user=user ).all()
- credsMatch = False
- for cred in storedCreds:
- if cred.name == credName:
- credsMatch = True
+ storedCreds = trans.sa_session.query( model.CloudUserCredentials ).filter_by( user=user ).all()
+ if len( storedCreds ) == 0:
+ return trans.show_error_message( "You must register credentials before configuring a Galaxy instance." )
+
+ providersToZones = {}
+ for storedCred in storedCreds:
+ if storedCred.provider_name == 'ec2':
+ ec2_zones = ['us-east-1a', 'us-east-1b', 'us-east-1c', 'us-east-1d']
+ providersToZones[storedCred.name] = ec2_zones
+ elif storedCred.provider_name == 'eucalyptus':
+ providersToZones[storedCred.name] = ['epc']
if instanceName:
# Create new user configured instance
try:
- if len( instanceName ) > 255:
- inst_error = "Instance name exceeds maximum allowable length."
- elif trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first():
- inst_error = "An instance with that name already exist."
- elif int( volSize ) > 1000:
- vol_error = "Volume size cannot exceed 1000GB. You must specify an integer between 1 and 1000."
- elif int( volSize ) < 1:
- vol_error = "Volume size cannot be less than 1GB. You must specify an integer between 1 and 1000."
- elif not credsMatch:
- cred_error = "You specified unknown credentials."
+ if trans.app.model.UCI.filter( and_( trans.app.model.UCI.table.c.name==instanceName, trans.app.model.UCI.table.c.state!='deleted' ) ).first():
+ error['inst_error'] = "An instance with that name already exist."
+ elif instanceName=='' or len( instanceName ) > 255:
+ error['inst_error'] = "Instance name must be between 1 and 255 characters long."
+ elif credName=='':
+ error['cred_error'] = "You must select credentials."
+ elif volSize == '':
+ error['vol_error'] = "You must specify volume size as an integer value between 1 and 1000."
+ elif ( int( volSize ) < 1 ) or ( int( volSize ) > 1000 ):
+ error['vol_error'] = "Volume size must be integer value between 1 and 1000."
+# elif type( volSize ) != type( 1 ): # Check if volSize is int
+# log.debug( "volSize='%s'" % volSize )
+# error['vol_error'] = "Volume size must be integer value between 1 and 1000."
+ elif zone=='':
+ error['zone_error'] = "You must select zone where this UCI will be registered."
else:
# Capture user configured instance information
uci = model.UCI()
@@ -341,7 +355,8 @@
trans.app.model.CloudUserCredentials.table.c.name==credName ).first()
uci.user= user
uci.total_size = volSize # This is OK now because new instance is being created.
- uci.state = "newUCI"
+ uci.state = "newUCI"
+
storage = model.CloudStore()
storage.user = user
storage.uci = uci
@@ -361,6 +376,16 @@
except AttributeError, ae:
inst_error = "No registered cloud images. You must contact administrator to add some before proceeding."
log.debug("AttributeError: %s " % str( ae ) )
+
+ #TODO: based on user credentials (i.e., provider) selected, zone options will be different (e.g., EC2: us-east-1a vs EPC: epc)
+
+ return trans.fill_template( "cloud/configure_uci.mako",
+ instanceName = instanceName,
+ credName = storedCreds,
+ volSize = volSize,
+ zone = zone,
+ error = error,
+ providersToZones = providersToZones )
return trans.show_form(
web.FormBuilder( web.url_for(), "Configure new instance", submit_text="Add" )
@@ -445,9 +470,9 @@
elif ( ( providerName.lower()!='ec2' ) and ( providerName.lower()!='eucalyptus' ) ):
error['provider_error'] = "You specified an unsupported cloud provider."
elif accessKey=='' or len( accessKey ) > 255:
- error['access_key_error'] = "Access key much be between 1 and 255 characters long."
+ error['access_key_error'] = "Access key must be between 1 and 255 characters long."
elif secretKey=='' or len( secretKey ) > 255:
- error['secret_key_error'] = "Secret key much be between 1 and 255 characters long."
+ error['secret_key_error'] = "Secret key must be between 1 and 255 characters long."
else:
# Create new user stored credentials
credentials = model.CloudUserCredentials()
@@ -523,7 +548,7 @@
# Display the management page
trans.set_message( "Credentials '%s' deleted." % stored.name )
return self.list( trans )
-
+
@web.expose
@web.require_login( "edit workflows" )
def editor( self, trans, id=None ):
@@ -965,7 +990,23 @@
ids_in_menu=ids_in_menu )
## ---- Utility methods -------------------------------------------------------
+
+def get_UCIs_state( trans ):
+ user = trans.get_user()
+ instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).filter( model.UCI.c.state != "deleted" ).all()
+ insd = {} # instance name-state dict
+ for inst in instances:
+ insd[inst.name] = inst.state
+
+def get_UCIs_time_ago( trans ):
+ user = trans.get_user()
+ instances = trans.sa_session.query( model.UCI ).filter_by( user=user ).all()
+ intad = {} # instance name-time-ago dict
+ for inst in instances:
+ if inst.launch_time != None:
+ intad[inst.name] = str(date.distance_of_time_in_words (inst.launch_time, date.datetime.utcnow() ) )
+
def get_stored_credentials( trans, id, check_ownership=True ):
"""
Get StoredUserCredentials from the database by id, verifying ownership.
@@ -1020,7 +1061,7 @@
# Looks good
return live
-def get_mi( trans, size='small' ):
+def get_mi( trans, size='m1.small' ):
"""
Get appropriate machine image (mi) based on instance size.
TODO: Dummy method - need to implement logic
diff -r 6aab50510e43 -r 9881b0df3252 lib/galaxy/web/framework/helpers/__init__.py
--- a/lib/galaxy/web/framework/helpers/__init__.py Wed Oct 14 19:20:11 2009 -0400
+++ b/lib/galaxy/web/framework/helpers/__init__.py Fri Oct 16 13:06:44 2009 -0400
@@ -4,6 +4,7 @@
from webhelpers import *
from datetime import datetime, timedelta
+from galaxy.util.json import to_json_string
# If the date is more than one week ago, then display the actual date instead of in words
def time_ago( x ):
diff -r 6aab50510e43 -r 9881b0df3252 templates/cloud/add_credentials.mako
--- a/templates/cloud/add_credentials.mako Wed Oct 14 19:20:11 2009 -0400
+++ b/templates/cloud/add_credentials.mako Fri Oct 16 13:06:44 2009 -0400
@@ -28,7 +28,7 @@
<div class="${cls}">
<label>Credentials name:</label>
<div class="form-row-input">
- <input type="text" name="credName" value="Unnamed credentials" size="40">
+ <input type="text" name="credName" value="${credName}" size="40">
</div>
%if error.has_key('cred_error'):
<div class="form-row-error-message">${error['cred_error']}</div>
@@ -45,7 +45,7 @@
<div class="${cls}">
<label>Cloud provider name:</label>
<div class="form-row-input">
- <select name="providerName">
+ <select name="providerName" style="width:40em">
<option value="ec2">Amazon EC2</option>
<option value="eucalyptus">Eucalpytus Public Cloud (EPC)</option>
</select>
@@ -64,7 +64,7 @@
<div class="${cls}">
<label>Access key:</label>
<div class="form-row-input">
- <input type="text" name="accessKey" value="" size="40">
+ <input type="text" name="accessKey" value="${accessKey}" size="40">
</div>
%if error.has_key('access_key_error'):
<div class="form-row-error-message">${error['access_key_error']}</div>
@@ -81,7 +81,7 @@
<div class="${cls}">
<label>Secret key:</label>
<div class="form-row-input">
- <input type="password" name="secretKey" value="" size="40">
+ <input type="password" name="secretKey" value="${secretKey}" size="40">
</div>
%if error.has_key('secret_key_error'):
<div class="form-row-error-message">${error['secret_key_error']}</div>
diff -r 6aab50510e43 -r 9881b0df3252 templates/cloud/configure_uci.mako
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/templates/cloud/configure_uci.mako Fri Oct 16 13:06:44 2009 -0400
@@ -0,0 +1,116 @@
+<% _=n_ %>
+<%inherit file="/base.mako"/>
+<%def name="title()">Configure new UCI</%def>
+
+<%def name="javascripts()">
+${parent.javascripts()}
+<script type="text/javascript">
+
+var providers_zones = ${h.to_json_string(providersToZones)};
+
+$(function(){
+ $("input:text:first").focus();
+
+ $("#credName").change(function() {
+ var zones = providers_zones[ $(this).val() ];
+ var zones_select = $("#zones");
+
+ zones_select.children().remove();
+
+ for (var i in zones) {
+ var zone = zones[i];
+ var new_option = $('<option value="' + zone + '">' + zone + '</option>');
+ new_option.appendTo(zones_select);
+ }
+
+ });
+})
+</script>
+</%def>
+
+%if header:
+ ${header}
+%endif
+
+<div class="form">
+ <div class="form-title">Configure new Galaxy instance</div>
+ <div class="form-body">
+ <form name="Configure new UCI" action="${h.url_for( action='configureNew' )}" method="post" >
+
+ <%
+ cls = "form-row"
+ if error.has_key('inst_error'):
+ cls += " form-row-error"
+ %>
+ <div class="${cls}">
+ <label>Instance name:</label>
+ <div class="form-row-input">
+ <input type="text" name="instanceName" value="${instanceName}" size="40">
+ </div>
+ %if error.has_key('inst_error'):
+ <div class="form-row-error-message">${error['inst_error']}</div>
+ %endif
+ <div style="clear: both"></div>
+ </div>
+
+ <%
+ cls = "form-row"
+ if error.has_key('cred_error'):
+ cls += " form-row-error"
+ %>
+ <div class="${cls}">
+ <label>Name of registered credentials to use:</label>
+ <div class="form-row-input">
+ <select id="credName" name="credName" style="width:40em">
+ <option value="">Select Credential...</option>
+ % for cred in credName:
+ <option value="${cred.name}">${cred.name}</option>
+ %endfor
+ </select>
+ </div>
+ %if error.has_key('cred_error'):
+ <div class="form-row-error-message">${error['cred_error']}</div>
+ %endif
+ <div style="clear: both"></div>
+ </div>
+
+
+ <%
+ cls = "form-row"
+ if error.has_key('vol_error'):
+ cls += " form-row-error"
+ %>
+ <div class="${cls}">
+ <label>Permanent storage size (1-1000GB)<br/>NOTE: you will be able to add more storage later:</label>
+ <div class="form-row-input">
+ <input type="text" name="volSize" value="${volSize}" size="40">
+ </div>
+ %if error.has_key('vol_error'):
+ <div class="form-row-error-message">${error['vol_error']}</div>
+ %endif
+ <div style="clear: both"></div>
+ </div>
+
+ <%
+ cls = "form-row"
+ if error.has_key('zone_error'):
+ cls += " form-row-error"
+ %>
+ <div class="${cls}">
+ <label>Zone to create storage in</label>
+ <div class="form-row-input">
+ <select id="zones" name="zone" style="width:40em">
+ </select>
+ </div>
+ %if error.has_key('zone_error'):
+ <div class="form-row-error-message">${error['zone_error']}</div>
+ %endif
+ <div style="clear: both"></div>
+ </div>
+
+
+ <div class="form-row"><input type="submit" value="Add"></div>
+
+ </form>
+ </div>
+</div>
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/7f3054622e87
changeset: 3071:7f3054622e87
user: Enis Afgan <afgane(a)gmail.com>
date: Wed Oct 21 11:40:51 2009 -0400
description:
Updates instance startup for EC2 to include user data as per new AMI's requiremets
diffstat:
lib/galaxy/cloud/providers/ec2.py | 6 +++---
1 files changed, 3 insertions(+), 3 deletions(-)
diffs (16 lines):
diff -r 049083fee997 -r 7f3054622e87 lib/galaxy/cloud/providers/ec2.py
--- a/lib/galaxy/cloud/providers/ec2.py Mon Oct 19 17:45:49 2009 -0400
+++ b/lib/galaxy/cloud/providers/ec2.py Wed Oct 21 11:40:51 2009 -0400
@@ -270,9 +270,9 @@
gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
# Start an instance
log.debug( "***** Starting instance for UCI '%s'" % uci_wrapper.get_name() )
- #TODO: Get customization scripts remotley and pass volID and user credential data only as user data from here.
- userdata = open('/Users/afgane/Dropbox/Galaxy/EC2startupScripts/web/ec2autorun.zip', 'rb').read()
- log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s, security_groups=[%s], instance_type=%s, placement=%s )'
+ #TODO: Once multiple volumes can be attached to a single instance, update 'userdata' composition
+ userdata = uci_wrapper.get_store_volume_id()+"|"+uci_wrapper.get_access_key()+"|"+uci_wrapper.get_secret_key()
+ log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s, security_groups=[%s], user_data=[OMITTED], instance_type=%s, placement=%s )'
% ( mi_id, uci_wrapper.get_key_pair_name( i_index ), [security_group], uci_wrapper.get_type( i_index ), uci_wrapper.get_uci_availability_zone() ) )
reservation = conn.run_instances( image_id=mi_id,
key_name=uci_wrapper.get_key_pair_name( i_index ),
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/5b18a134bf56
changeset: 3063:5b18a134bf56
user: Enis Afgan <afgane(a)gmail.com>
date: Tue Sep 15 16:53:01 2009 -0400
description:
Verified much of the code regarding cloud connectivity functionality w/ Eucalyptus.
diffstat:
lib/galaxy/web/controllers/cloud.py | 130 ++++++++++++++++++++++---------
templates/cloud/configure_cloud.mako | 2 +-
templates/root/index.mako | 16 ++--
3 files changed, 100 insertions(+), 48 deletions(-)
diffs (294 lines):
diff -r 52318cf73454 -r 5b18a134bf56 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Mon Sep 14 15:08:23 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Tue Sep 15 16:53:01 2009 -0400
@@ -42,7 +42,7 @@
.filter_by( user=user, state="pending" ) \
.all()
- for i in range( len( pendingInstances ) ):
+ for i in range( len ( pendingInstances ) ):
update_instance_state( trans, pendingInstances[i].id )
cloudCredentials = trans.sa_session.query( model.CloudUserCredentials ) \
@@ -60,7 +60,16 @@
.filter_by( user=user, state="available" ) \
.order_by( desc( model.UCI.c.update_time ) ) \
.all()
-
+
+ # Check after update there are instances in pending state; if so, display message
+ # TODO: Auto-refresh once instance is running
+ pendingInstances = trans.sa_session.query( model.UCI ) \
+ .filter_by( user=user, state="pending" ) \
+ .all()
+ if pendingInstances:
+ trans.set_message( "Galaxy instance started. NOTE: Please wait about 3-5 minutes for the instance to "
+ "start up and then refresh this page. A button to connect to the instance will then appear alongside "
+ "instance description." )
return trans.fill_template( "cloud/configure_cloud.mako",
cloudCredentials = cloudCredentials,
liveInstances = liveInstances,
@@ -88,7 +97,7 @@
@web.expose
@web.require_login( "start Galaxy cloud instance" )
- def start( self, trans, id, size='' ):
+ def start( self, trans, id, size='small' ):
"""
Start a new cloud resource instance
"""
@@ -108,26 +117,32 @@
instance.availability_zone = stores[0].availability_zone # Bc. all EBS volumes need to be in the same avail. zone, just check 1st
instance.type = size
conn = get_connection( trans )
+# log.debug( '***** Setting up security group' )
# If not existent, setup galaxy security group
- try:
- gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.')
- gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port
- gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
- except:
- pass
+# try:
+# gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.')
+# gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port
+# gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
+# except:
+# pass
# sgs = conn.get_all_security_groups()
# for i in range( len( sgs ) ):
# if sgs[i].name == "galaxy":
# sg.append( sgs[i] )
# break # only 1 security group w/ this name can exist, so continue
+ log.debug( '***** Starting an instance' )
+ log.debug( 'Using following command: conn.run_instances( image_id=%s, key_name=%s )' % ( instance.image.image_id, instance.keypair_name ) )
+ reservation = conn.run_instances( image_id=instance.image.image_id, key_name=instance.keypair_name )
#reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=['galaxy'], instance_type=instance.type, placement=instance.availability_zone )
instance.launch_time = datetime.utcnow()
uci.launch_time = instance.launch_time
- #instance.reservation = str( reservation.instances[0] )
- instance.state = "pending"
- #instance.state = reservation.instances[0].state
+ instance.reservation_id = str( reservation ).split(":")[1]
+ instance.instance_id = str( reservation.instances[0]).split(":")[1]
+# instance.state = "pending"
+ instance.state = reservation.instances[0].state
uci.state = instance.state
+
# TODO: After instance boots up, need to update status, DNS and attach EBS
# Persist
@@ -135,10 +150,7 @@
session.save_or_update( instance )
session.flush()
- trans.log_event( "User started cloud instance '%s'" % uci.name )
- trans.set_message( "Galaxy instance '%s' started. NOTE: Please wait about 2-3 minutes for the instance to "
- "start up and then refresh this page. A button to connect to the instance will then appear alongside "
- "instance description." % uci.name )
+ trans.log_event ("Started new instance. Reservation ID: '%s', Instance ID: '%s'" % (instance.reservation_id, instance.instance_id ) )
return self.list( trans )
# return trans.show_form(
@@ -151,21 +163,49 @@
@web.require_login( "stop Galaxy cloud instance" )
def stop( self, trans, id ):
"""
- Stop a cloud instance. This implies stopping Galaxy servcer and unmounting relevant file system(s)
+ Stop a cloud instance. This implies stopping Galaxy server and disconnecting/unmounting relevant file system(s).
"""
uci = get_uci( trans, id )
- instances = get_instances( trans, uci ) #TODO: handle list!
+ dbInstances = get_instances( trans, uci ) #TODO: handle list!
- instances.stop()
- instances.state = 'terminated'
- instances.stop_time = datetime.utcnow()
+ conn = get_connection( trans )
+ # Get actual cloud instance object
+ cloudInstance = get_cloud_instance( conn, dbInstances.instance_id )
+
+ # TODO: Detach persistent storage volume(s) from instance and update volume data in local database
+ stores = get_stores( trans, uci )
+ for i, store in enumerate( stores ):
+ log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) )
+ mntDevice = store.device
+ volStat = None
+# try:
+# volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice )
+# except:
+# log.debug ( 'Error detaching volume; still going to try and stop instance %s.' % dbInstances.instance_id )
+ store.attach_time = None
+ store.device = None
+ store.i_id = None
+ store.status = volStat
+ log.debug ( '***** volume status: %s' % volStat )
+
+
+ # Stop the instance and update status in local database
+ cloudInstance.stop()
+ dbInstances.stop_time = datetime.utcnow()
+ while cloudInstance.state != 'terminated':
+ log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) )
+ time.sleep(2)
+ cloudInstance.update()
+ dbInstances.state = cloudInstance.state
+
# Reset relevant UCI fields
uci.state = 'available'
uci.launch_time = None
# Persist
session = trans.sa_session
- session.save_or_update( instances )
+# session.save_or_update( stores )
+ session.save_or_update( dbInstances ) # TODO: Is this going to work w/ multiple instances stored in dbInstances variable?
session.save_or_update( uci )
session.flush()
trans.log_event( "User stopped cloud instance '%s'" % uci.name )
@@ -891,7 +931,7 @@
For valid sizes, see http://aws.amazon.com/ec2/instance-types/
"""
return trans.app.model.CloudImage.filter(
- trans.app.model.CloudImage.table.c.id==1).first()
+ trans.app.model.CloudImage.table.c.id==2).first()
def get_stores( trans, uci ):
"""
@@ -917,30 +957,33 @@
return instances
-
def get_cloud_instance( conn, instance_id ):
"""
- Returns a cloud instance representation of the instance id, i.e., cloud instance objects that cloud API can be invoked on
+ Returns a cloud instance representation of the instance id, i.e., cloud instance object that cloud API can be invoked on
"""
# get_all_instances func. takes a list of desired instance id's, so create a list first
idLst = list()
idLst.append( instance_id )
- # Retrieve cloud instance based on passed instance id
- cloudInstance = conn.get_all_instances( idLst )
- return cloudInstance[0]
+ # Retrieve cloud instance based on passed instance id. get_all_instances( idLst ) method returns reservation ID. Because
+ # we are passing only 1 ID, we can retrieve only the first element of the returning list. Furthermore, because (for now!)
+ # only 1 instance corresponds each individual reservation, grab only the first element of the returned list of instances.
+ cloudInstance = conn.get_all_instances( idLst )[0].instances[0]
+ return cloudInstance
def get_connection( trans ):
"""
Establishes EC2 conncection using user's default credentials
"""
+ log.debug( '##### Establishing cloud connection.' )
user = trans.get_user()
creds = trans.sa_session.query(model.CloudUserCredentials).filter_by(user=user, defaultCred=True).first()
if creds:
a_key = creds.access_key
s_key = creds.secret_key
+ # Amazon EC2
#conn = EC2Connection( a_key, s_key )
+ # Eucalyptus Public Cloud
euca_region = RegionInfo(None, "eucalyptus", "mayhem9.cs.ucsb.edu")
- #conn = EC2Connection(aws_access_key_id="2s42fQmcCu4WBpS3RJ9e5g", aws_secret_access_key="2iEzpThjZQttuvWYXL-0nRUuurzl2dump2drwg", is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus")
conn = EC2Connection(aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus")
return conn
else:
@@ -952,11 +995,16 @@
Generate keypair using user's default credentials
"""
conn = get_connection( trans )
+
+ log.debug( "Getting user's keypair" )
+ key_pair = conn.get_key_pair( 'galaxy-keypair' )
+
try:
- key_pair = conn.get_key_pair( 'galaxy-keypair' )
- except: # No keypair under this name exists so create it
+ return key_pair.name
+ except AttributeError: # No keypair under this name exists so create it
+ log.debug( 'No keypair found, creating keypair' )
key_pair = conn.create_key_pair( 'galaxy-keypair' )
-
+
return key_pair.name
def update_instance_state( trans, id ):
@@ -970,12 +1018,12 @@
oldState = dbInstances.state
# Establish connection with cloud
conn = get_connection( trans )
- # Get actual instance from the cloud
+ # Get actual cloud instance object
cloudInstance = get_cloud_instance( conn, dbInstances.instance_id )
- # Update status of instance
+ # Update instance status
cloudInstance.update()
dbInstances.state = cloudInstance.state
- log.debug( "Processing instance %i, current instance state: %s" % i, cloudInstance.state )
+ log.debug( "Updating instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) )
# Update state of UCI (TODO: once more than 1 instance is assoc. w/ 1 UCI, this will be need to be updated differently)
uci.state = dbInstances.state
# Persist
@@ -1004,13 +1052,17 @@
dbInstance.public_dns = cloudInstance.dns_name
dbInstance.private_dns = cloudInstance.private_dns_name
- # TODO: connect EBS volume to instance
# Attach storage volume(s) to instance
stores = get_stores( trans, uci )
for i, store in enumerate( stores ):
- log.debug( "Attaching volume %s to instance %s." % store.volume_id, store.uci_id )
- mtnDevice = '/dev/sdb'+str(i)
- conn.attach_volume( store.volume_id, store.uci_id, mntDevice )
+ log.debug( "Attaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstance.instance_id ) )
+ mntDevice = '/dev/sdb'+str(i)
+ volStat = conn.attach_volume( store.volume_id, dbInstance.instance_id, mntDevice )
+ store.attach_time = datetime.utcnow()
+ store.device = mntDevice
+ store.i_id = dbInstance.instance_id
+ store.status = volStat
+ log.debug ( '***** volume status: %s' % volStat )
# Wait until instances have attached and add file system
diff -r 52318cf73454 -r 5b18a134bf56 templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Mon Sep 14 15:08:23 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Tue Sep 15 16:53:01 2009 -0400
@@ -136,7 +136,7 @@
</td>
<td>
<div popupmenu="li-${i}-popup">
- <a class="action-button" confirm="Are you sure you want to stop instance '${liveInstance.name}'?" href="${h.url_for( action='stop', id=trans.security.encode_id(liveInstance.id) )}">Stop</a>
+ <a class="action-button" confirm="Are you sure you want to stop instance '${liveInstance.name}'? Please note that this may take up to 1 minute during which time the page will not refresh." href="${h.url_for( action='stop', id=trans.security.encode_id(liveInstance.id) )}">Stop</a>
<a class="action-button" href="${h.url_for( action='renameInstance', id=trans.security.encode_id(liveInstance.id) )}">Rename</a>
<a class="action-button" href="${h.url_for( action='viewInstance', id=trans.security.encode_id(liveInstance.id) )}">View details</a>
</div>
diff -r 52318cf73454 -r 5b18a134bf56 templates/root/index.mako
--- a/templates/root/index.mako Mon Sep 14 15:08:23 2009 -0400
+++ b/templates/root/index.mako Tue Sep 15 16:53:01 2009 -0400
@@ -2,14 +2,14 @@
<%def name="init()">
<%
-if trans.app.config.cloud_controller_instance:
- self.has_left_panel=False
- self.has_right_panel=False
- self.active_view="cloud"
-else:
- self.has_left_panel=True
- self.has_right_panel=True
- self.active_view="analysis"
+ if trans.app.config.cloud_controller_instance:
+ self.has_left_panel=False
+ self.has_right_panel=False
+ self.active_view="cloud"
+ else:
+ self.has_left_panel=True
+ self.has_right_panel=True
+ self.active_view="analysis"
%>
%if trans.app.config.require_login and not trans.user:
<script type="text/javascript">
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/cff66a171623
changeset: 3065:cff66a171623
user: Enis Afgan <afgane(a)gmail.com>
date: Thu Sep 17 13:45:01 2009 -0400
description:
Started coding CloudManager and decoupling web controller from back end.
diffstat:
lib/galaxy/app.py | 4 +-
lib/galaxy/cloud/__init__.py | 791 +++++++++++++++++++++++++++++++++++
lib/galaxy/cloud/providers/eucalyptus.py | 29 +
lib/galaxy/config.py | 5 +
templates/cloud/configure_cloud.mako | 2 +-
5 files changed, 829 insertions(+), 2 deletions(-)
diffs (876 lines):
diff -r 163ad9f94851 -r cff66a171623 lib/galaxy/app.py
--- a/lib/galaxy/app.py Wed Sep 16 16:40:09 2009 -0400
+++ b/lib/galaxy/app.py Thu Sep 17 13:45:01 2009 -0400
@@ -1,6 +1,6 @@
import sys, os, atexit
-from galaxy import config, jobs, util, tools, web
+from galaxy import config, jobs, util, tools, web, cloud
from galaxy.tracks import store
from galaxy.web import security
import galaxy.model
@@ -68,6 +68,8 @@
# FIXME: These are exposed directly for backward compatibility
self.job_queue = self.job_manager.job_queue
self.job_stop_queue = self.job_manager.job_stop_queue
+ # Start the cloud manager
+ self.cloud_manager = cloud.CloudManager( self )
# Track Store
self.track_store = store.TrackStoreManager( self.config.track_store_path )
diff -r 163ad9f94851 -r cff66a171623 lib/galaxy/cloud/__init__.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/galaxy/cloud/__init__.py Thu Sep 17 13:45:01 2009 -0400
@@ -0,0 +1,791 @@
+import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil
+
+from galaxy import util, model
+from galaxy.model import mapping
+from galaxy.model.orm import lazyload
+from galaxy.datatypes.tabular import *
+from galaxy.datatypes.interval import *
+from galaxy.datatypes import metadata
+
+import pkg_resources
+pkg_resources.require( "PasteDeploy" )
+
+from paste.deploy.converters import asbool
+
+from Queue import Queue, Empty
+
+log = logging.getLogger( __name__ )
+
+# States for running a job. These are NOT the same as data states
+JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted'
+
+class CloudManager( object ):
+ """
+ Highest level interface to cloud management.
+ """
+ def __init__( self, app ):
+ self.app = app
+ if self.app.config.get_bool( "enable_cloud_execution", True ):
+ # The dispatcher manager underlying cloud instances
+ self.provider = DefaultCloudProvider( app )
+ # Monitor for updating status of cloud instances
+ self.cloud_monitor = CloudMonitor( app, self.provider )
+# self.job_stop_queue = JobStopQueue( app, self.dispatcher )
+ else:
+ self.job_queue = self.job_stop_queue = NoopCloudMonitor()
+ def shutdown( self ):
+ self.cloud_monitor.shutdown()
+# self.job_stop_queue.shutdown()
+
+ def createUCI( self, name, storage_size, zone=None):
+ """
+ Createse User Configured Instance (UCI). Essentially, creates storage volume.
+ """
+ self.provider.createUCI( name, storage_size, zone )
+
+ def deleteUCI( self, name ):
+ """
+ Deletes UCI. NOTE that this implies deletion of any and all data associated
+ with this UCI from the cloud. All data will be deleted.
+ """
+
+ def addStorageToUCI( self, name ):
+ """ Adds more storage to specified UCI """
+
+ def startUCI( self, name, type ):
+ """
+ Starts an instance of named UCI on the cloud. This implies, mounting of
+ storage and starting Galaxy instance.
+ """
+
+ def stopUCI( self, name ):
+ """
+ Stops cloud instance associated with named UCI. This also implies
+ stopping of Galaxy and unmounting of the file system.
+ """
+
+class Sleeper( object ):
+ """
+ Provides a 'sleep' method that sleeps for a number of seconds *unless*
+ the notify method is called (from a different thread).
+ """
+ def __init__( self ):
+ self.condition = threading.Condition()
+ def sleep( self, seconds ):
+ self.condition.acquire()
+ self.condition.wait( seconds )
+ self.condition.release()
+ def wake( self ):
+ self.condition.acquire()
+ self.condition.notify()
+ self.condition.release()
+
+class CloudMonitor( object ):
+ """
+ Cloud manager, waits for user to instantiate a cloud instance and then invokes a
+ CloudProvider.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, provider ):
+ """Start the cloud manager"""
+ self.app = app
+ # Keep track of the pid that started the cloud manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+# self.queue = Queue()
+
+ # Contains requests that are waiting (only use from monitor thread)
+ self.waiting = []
+
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.provider = provider
+ self.monitor_thread = threading.Thread( target=self.__monitor )
+ self.monitor_thread.start()
+ log.info( "cloud manager started" )
+# if app.config.get_bool( 'enable_job_recovery', True ):
+# self.__check_jobs_at_startup()
+
+ def __check_jobs_at_startup( self ):
+ """
+ Checks all jobs that are in the 'new', 'queued' or 'running' state in
+ the database and requeues or cleans up as necessary. Only run as the
+ job manager starts.
+ """
+ model = self.app.model
+ for job in model.Job.filter( model.Job.c.state==model.Job.states.NEW ).all():
+ if job.tool_id not in self.app.toolbox.tools_by_id:
+ log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
+ JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
+ else:
+ log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
+ self.queue.put( ( job.id, job.tool_id ) )
+ for job in model.Job.filter( (model.Job.c.state == model.Job.states.RUNNING) | (model.Job.c.state == model.Job.states.QUEUED) ).all():
+ if job.tool_id not in self.app.toolbox.tools_by_id:
+ log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
+ JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
+ elif job.job_runner_name is None:
+ log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
+ self.queue.put( ( job.id, job.tool_id ) )
+ else:
+ job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self )
+ self.dispatcher.recover( job, job_wrapper )
+
+ def __monitor( self ):
+ """
+ Daemon that continuously monitors cloud instance requests as well as state
+ of running instances.
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+
+ while self.running:
+ try:
+ self.__monitor_step()
+ except:
+ log.exception( "Exception in cloud manager monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def __monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to process cloud instance requests.
+ TODO: Update following description to match the code
+ Gets any new cloud instance requests from the database, then iterates
+ over all new and waiting jobs to check the state of the jobs each
+ depends on. If the job has dependencies that have not finished, it
+ it goes to the waiting queue. If the job has dependencies with errors,
+ it is marked as having errors and removed from the queue. Otherwise,
+ the job is dispatched.
+ """
+ # Get an orm session
+ session = mapping.Session()
+ # Pull all new jobs from the queue at once
+ new_jobs = []
+ if self.track_jobs_in_database:
+ for j in session.query( model.Job ).options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ).filter( model.Job.c.state == model.Job.states.NEW ).all():
+ job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self )
+ new_jobs.append( job )
+ else:
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, tool_id = message
+ # Create a job wrapper from it
+ job_entity = session.query( model.Job ).get( job_id )
+ job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self )
+ # Append to watch queue
+ new_jobs.append( job )
+ except Empty:
+ pass
+ # Iterate over new and waiting jobs and look for any that are
+ # ready to run
+ new_waiting = []
+ for job in ( new_jobs + self.waiting ):
+ try:
+ # Clear the session for each job so we get fresh states for
+ # job and all datasets
+ session.clear()
+ # Get the real job entity corresponding to the wrapper (if we
+ # are tracking in the database this is probably cached in
+ # the session from the origianl query above)
+ job_entity = session.query( model.Job ).get( job.job_id )
+ # Check the job's dependencies, requeue if they're not done
+ job_state = self.__check_if_ready_to_run( job, job_entity )
+ if job_state == JOB_WAIT:
+ if not self.track_jobs_in_database:
+ new_waiting.append( job )
+ elif job_state == JOB_ERROR:
+ log.info( "job %d ended with an error" % job.job_id )
+ elif job_state == JOB_INPUT_ERROR:
+ log.info( "job %d unable to run: one or more inputs in error state" % job.job_id )
+ elif job_state == JOB_INPUT_DELETED:
+ log.info( "job %d unable to run: one or more inputs deleted" % job.job_id )
+ elif job_state == JOB_READY:
+ # If special queuing is enabled, put the ready jobs in the special queue
+ if self.use_policy :
+ self.squeue.put( job )
+ log.debug( "job %d put in policy queue" % job.job_id )
+ else: # or dispatch the job directly
+ self.dispatcher.put( job )
+ log.debug( "job %d dispatched" % job.job_id)
+ elif job_state == JOB_DELETED:
+ msg = "job %d deleted by user while still queued" % job.job_id
+ job.info = msg
+ log.debug( msg )
+ elif job_state == JOB_ADMIN_DELETED:
+ job.fail( job_entity.info )
+ log.info( "job %d deleted by admin while still queued" % job.job_id )
+ else:
+ msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id )
+ job.info = msg
+ log.error( msg )
+ except Exception, e:
+ job.info = "failure running job %d: %s" % ( job.job_id, str( e ) )
+ log.exception( "failure running job %d" % job.job_id )
+ # Update the waiting list
+ self.waiting = new_waiting
+ # If special (e.g. fair) scheduling is enabled, dispatch all jobs
+ # currently in the special queue
+ if self.use_policy :
+ while 1:
+ try:
+ sjob = self.squeue.get()
+ self.dispatcher.put( sjob )
+ log.debug( "job %d dispatched" % sjob.job_id )
+ except Empty:
+ # squeue is empty, so stop dispatching
+ break
+ except Exception, e: # if something else breaks while dispatching
+ job.fail( "failure running job %d: %s" % ( sjob.job_id, str( e ) ) )
+ log.exception( "failure running job %d" % sjob.job_id )
+ # Done with the session
+ mapping.Session.remove()
+
+ def __check_if_ready_to_run( self, job_wrapper, job ):
+ """
+ Check if a job is ready to run by verifying that each of its input
+ datasets is ready (specifically in the OK state). If any input dataset
+ has an error, fail the job and return JOB_INPUT_ERROR. If any input
+ dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all
+ input datasets are in OK state, return JOB_READY indicating that the
+ job can be dispatched. Otherwise, return JOB_WAIT indicating that input
+ datasets are still being prepared.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ job_wrapper.fail( "input data %d is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+ return JOB_READY
+
+ def put( self, job_id, tool ):
+ """Add a job to the queue (by job identifier)"""
+ if not self.track_jobs_in_database:
+ self.queue.put( ( job_id, tool.id ) )
+ self.sleeper.wake()
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+# if not self.track_jobs_in_database:
+# self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "cloud manager stopped" )
+ self.dispatcher.shutdown()
+
+class JobWrapper( object ):
+ """
+ Wraps a 'model.Job' with convience methods for running processes and
+ state management.
+ """
+ def __init__(self, job, tool, queue ):
+ self.job_id = job.id
+ # This is immutable, we cache it for the scheduling policy to use if needed
+ self.session_id = job.session_id
+ self.tool = tool
+ self.queue = queue
+ self.app = queue.app
+ self.extra_filenames = []
+ self.command_line = None
+ self.galaxy_lib_dir = None
+ # With job outputs in the working directory, we need the working
+ # directory to be set before prepare is run, or else premature deletion
+ # and job recovery fail.
+ self.working_directory = \
+ os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
+ self.output_paths = None
+ self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) #wrapper holding the info required to restore and clean up from files used for setting metadata externally
+
+ def get_param_dict( self ):
+ """
+ Restore the dictionary of parameters from the database.
+ """
+ job = model.Job.get( self.job_id )
+ param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] )
+ param_dict = self.tool.params_from_strings( param_dict, self.app )
+ return param_dict
+
+ def prepare( self ):
+ """
+ Prepare the job to run by creating the working directory and the
+ config files.
+ """
+ mapping.context.current.clear() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner
+ if not os.path.exists( self.working_directory ):
+ os.mkdir( self.working_directory )
+ # Restore parameters from the database
+ job = model.Job.get( self.job_id )
+ incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] )
+ incoming = self.tool.params_from_strings( incoming, self.app )
+ # Do any validation that could not be done at job creation
+ self.tool.handle_unvalidated_param_values( incoming, self.app )
+ # Restore input / output data lists
+ inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
+ out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
+ # These can be passed on the command line if wanted as $userId $userEmail
+ if job.history.user: # check for anonymous user!
+ userId = '%d' % job.history.user.id
+ userEmail = str(job.history.user.email)
+ else:
+ userId = 'Anonymous'
+ userEmail = 'Anonymous'
+ incoming['userId'] = userId
+ incoming['userEmail'] = userEmail
+ # Build params, done before hook so hook can use
+ param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory )
+ # Certain tools require tasks to be completed prior to job execution
+ # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
+ if self.tool.tool_type is not None:
+ out_data = self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict )
+ # Run the before queue ("exec_before_job") hook
+ self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data,
+ out_data=out_data, tool=self.tool, param_dict=incoming)
+ mapping.context.current.flush()
+ # Build any required config files
+ config_filenames = self.tool.build_config_files( param_dict, self.working_directory )
+ # FIXME: Build the param file (might return None, DEPRECATED)
+ param_filename = self.tool.build_param_file( param_dict, self.working_directory )
+ # Build the job's command line
+ self.command_line = self.tool.build_command_line( param_dict )
+ # FIXME: for now, tools get Galaxy's lib dir in their path
+ if self.command_line and self.command_line.startswith( 'python' ):
+ self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root
+ # We need command_line persisted to the db in order for Galaxy to re-queue the job
+ # if the server was stopped and restarted before the job finished
+ job.command_line = self.command_line
+ job.flush()
+ # Return list of all extra files
+ extra_filenames = config_filenames
+ if param_filename is not None:
+ extra_filenames.append( param_filename )
+ self.param_dict = param_dict
+ self.extra_filenames = extra_filenames
+ return extra_filenames
+
+ def fail( self, message, exception=False ):
+ """
+ Indicate job failure by setting state and message on all output
+ datasets.
+ """
+ job = model.Job.get( self.job_id )
+ job.refresh()
+ # if the job was deleted, don't fail it
+ if not job.state == model.Job.states.DELETED:
+ # Check if the failure is due to an exception
+ if exception:
+ # Save the traceback immediately in case we generate another
+ # below
+ job.traceback = traceback.format_exc()
+ # Get the exception and let the tool attempt to generate
+ # a better message
+ etype, evalue, tb = sys.exc_info()
+ m = self.tool.handle_job_failure_exception( evalue )
+ if m:
+ message = m
+ if self.app.config.outputs_to_working_directory:
+ for dataset_path in self.get_output_fnames():
+ try:
+ shutil.move( dataset_path.false_path, dataset_path.real_path )
+ log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
+ except ( IOError, OSError ), e:
+ log.error( "fail(): Missing output file in working directory: %s" % e )
+ for dataset_assoc in job.output_datasets:
+ dataset = dataset_assoc.dataset
+ dataset.refresh()
+ dataset.state = dataset.states.ERROR
+ dataset.blurb = 'tool error'
+ dataset.info = message
+ dataset.set_size()
+ dataset.flush()
+ job.state = model.Job.states.ERROR
+ job.command_line = self.command_line
+ job.info = message
+ job.flush()
+ # If the job was deleted, just clean up
+ self.cleanup()
+
+ def change_state( self, state, info = False ):
+ job = model.Job.get( self.job_id )
+ job.refresh()
+ for dataset_assoc in job.output_datasets:
+ dataset = dataset_assoc.dataset
+ dataset.refresh()
+ dataset.state = state
+ if info:
+ dataset.info = info
+ dataset.flush()
+ if info:
+ job.info = info
+ job.state = state
+ job.flush()
+
+ def get_state( self ):
+ job = model.Job.get( self.job_id )
+ job.refresh()
+ return job.state
+
+ def set_runner( self, runner_url, external_id ):
+ job = model.Job.get( self.job_id )
+ job.refresh()
+ job.job_runner_name = runner_url
+ job.job_runner_external_id = external_id
+ job.flush()
+
+ def finish( self, stdout, stderr ):
+ """
+ Called to indicate that the associated command has been run. Updates
+ the output datasets based on stderr and stdout from the command, and
+ the contents of the output files.
+ """
+ # default post job setup
+ mapping.context.current.clear()
+ job = model.Job.get( self.job_id )
+ # if the job was deleted, don't finish it
+ if job.state == job.states.DELETED:
+ self.cleanup()
+ return
+ elif job.state == job.states.ERROR:
+ # Job was deleted by an administrator
+ self.fail( job.info )
+ return
+ if stderr:
+ job.state = "error"
+ else:
+ job.state = 'ok'
+ if self.app.config.outputs_to_working_directory:
+ for dataset_path in self.get_output_fnames():
+ try:
+ shutil.move( dataset_path.false_path, dataset_path.real_path )
+ log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
+ except ( IOError, OSError ):
+ self.fail( "Job %s's output dataset(s) could not be read" % job.id )
+ return
+ for dataset_assoc in job.output_datasets:
+ #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur
+ for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running
+ dataset.blurb = 'done'
+ dataset.peek = 'no peek'
+ dataset.info = stdout + stderr
+ dataset.set_size()
+ if stderr:
+ dataset.blurb = "error"
+ elif dataset.has_data():
+ #if a dataset was copied, it won't appear in our dictionary:
+ #either use the metadata from originating output dataset, or call set_meta on the copies
+ #it would be quicker to just copy the metadata from the originating output dataset,
+ #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
+ if not self.external_output_metadata.external_metadata_set_successfully( dataset ):
+ # Only set metadata values if they are missing...
+ dataset.set_meta( overwrite = False )
+ else:
+ #load metadata from file
+ #we need to no longer allow metadata to be edited while the job is still running,
+ #since if it is edited, the metadata changed on the running output will no longer match
+ #the metadata that was stored to disk for use via the external process,
+ #and the changes made by the user will be lost, without warning or notice
+ dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset ).filename_out )
+ if self.tool.is_multi_byte:
+ dataset.set_multi_byte_peek()
+ else:
+ dataset.set_peek()
+ else:
+ dataset.blurb = "empty"
+ dataset.flush()
+ if stderr:
+ dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
+ else:
+ dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
+ dataset_assoc.dataset.dataset.flush()
+
+ # Save stdout and stderr
+ if len( stdout ) > 32768:
+ log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
+ job.stdout = stdout[:32768]
+ if len( stderr ) > 32768:
+ log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
+ job.stderr = stderr[:32768]
+ # custom post process setup
+ inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
+ out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
+ param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows
+ param_dict = self.tool.params_from_strings( param_dict, self.app )
+ # Check for and move associated_files
+ self.tool.collect_associated_files(out_data, self.working_directory)
+ # Create generated output children and primary datasets and add to param_dict
+ collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)}
+ param_dict.update({'__collected_datasets__':collected_datasets})
+ # Certain tools require tasks to be completed after job execution
+ # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ).
+ if self.tool.tool_type is not None:
+ self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job )
+ # Call 'exec_after_process' hook
+ self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data,
+ out_data=out_data, param_dict=param_dict,
+ tool=self.tool, stdout=stdout, stderr=stderr )
+ # TODO
+ # validate output datasets
+ job.command_line = self.command_line
+ mapping.context.current.flush()
+ log.debug( 'job %d ended' % self.job_id )
+ self.cleanup()
+
+ def cleanup( self ):
+ # remove temporary files
+ try:
+ for fname in self.extra_filenames:
+ os.remove( fname )
+ if self.working_directory is not None:
+ shutil.rmtree( self.working_directory )
+ if self.app.config.set_metadata_externally:
+ self.external_output_metadata.cleanup_external_metadata()
+ except:
+ log.exception( "Unable to cleanup job %d" % self.job_id )
+
+ def get_command_line( self ):
+ return self.command_line
+
+ def get_session_id( self ):
+ return self.session_id
+
+ def get_input_fnames( self ):
+ job = model.Job.get( self.job_id )
+ filenames = []
+ for da in job.input_datasets: #da is JobToInputDatasetAssociation object
+ if da.dataset:
+ filenames.append( da.dataset.file_name )
+ #we will need to stage in metadata file names also
+ #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.)
+ for key, value in da.dataset.metadata.items():
+ if isinstance( value, model.MetadataFile ):
+ filenames.append( value.file_name )
+ return filenames
+
+ def get_output_fnames( self ):
+ if self.output_paths is not None:
+ return self.output_paths
+
+ class DatasetPath( object ):
+ def __init__( self, real_path, false_path = None ):
+ self.real_path = real_path
+ self.false_path = false_path
+ def __str__( self ):
+ if self.false_path is None:
+ return self.real_path
+ else:
+ return self.false_path
+
+ job = model.Job.get( self.job_id )
+ if self.app.config.outputs_to_working_directory:
+ self.output_paths = []
+ for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets ]:
+ false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
+ self.output_paths.append( DatasetPath( data.file_name, false_path ) )
+ else:
+ self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ]
+ return self.output_paths
+
+ def check_output_sizes( self ):
+ sizes = []
+ output_paths = self.get_output_fnames()
+ for outfile in [ str( o ) for o in output_paths ]:
+ sizes.append( ( outfile, os.stat( outfile ).st_size ) )
+ return sizes
+ def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, **kwds ):
+ if tmp_dir is None:
+ #this dir should should relative to the exec_dir
+ tmp_dir = self.app.config.new_file_path
+ if dataset_files_path is None:
+ dataset_files_path = self.app.model.Dataset.file_path
+ if config_root is None:
+ config_root = self.app.config.root
+ if datatypes_config is None:
+ datatypes_config = self.app.config.datatypes_config
+ job = model.Job.get( self.job_id )
+ return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], exec_dir = exec_dir, tmp_dir = tmp_dir, dataset_files_path = dataset_files_path, config_root = config_root, datatypes_config = datatypes_config, **kwds )
+
+class DefaultCloudProvider( object ):
+ def __init__( self, app ):
+ self.app = app
+ self.cloud_provider = {}
+# start_cloud_provider = None
+# if app.config.start_job_runners is not None:
+# start_cloud_provider.extend( app.config.start_job_runners.split(",") )
+# for provider_name in start_cloud_provider:
+ provider_name = app.config.cloud_provider
+ if provider_name == "eucalyptus":
+ import providers.eucalyptus
+ self.cloud_provider[provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app )
+ elif provider_name == "ec2":
+ import providers.ec2
+ self.cloud_provider[provider_name] = providers.ec2.EC2CloudProvider( app )
+ else:
+ log.error( "Unable to start unknown cloud provider: %s" %provider_name )
+
+ def createUCI( self, name, storage_size, zone=None):
+ """
+ Createse User Configured Instance (UCI). Essentially, creates storage volume.
+ """
+ log.debug( "Creating UCI %s" % name )
+ self.cloud_provider[name].createUCI( name, storage_size, zone )
+
+ def deleteUCI( self, name ):
+ """
+ Deletes UCI. NOTE that this implies deletion of any and all data associated
+ with this UCI from the cloud. All data will be deleted.
+ """
+
+ def addStorageToUCI( self, name ):
+ """ Adds more storage to specified UCI """
+
+ def startUCI( self, name, type ):
+ """
+ Starts an instance of named UCI on the cloud. This implies, mounting of
+ storage and starting Galaxy instance.
+ """
+
+ def stopUCI( self, name ):
+ """
+ Stops cloud instance associated with named UCI. This also implies
+ stopping of Galaxy and unmounting of the file system.
+ """
+
+ def put( self, job_wrapper ):
+ runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0]
+ log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
+ self.cloud_provider[runner_name].put( job_wrapper )
+
+ def stop( self, job ):
+ runner_name = ( job.job_runner_name.split(":", 1) )[0]
+ log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) )
+ self.cloud_provider[runner_name].stop_job( job )
+
+ def recover( self, job, job_wrapper ):
+ runner_name = ( job.job_runner_name.split(":", 1) )[0]
+ log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
+ self.cloud_provider[runner_name].recover( job, job_wrapper )
+
+ def shutdown( self ):
+ for runner in self.cloud_provider.itervalues():
+ runner.shutdown()
+
+class JobStopQueue( object ):
+ """
+ A queue for jobs which need to be terminated prematurely.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, dispatcher ):
+ self.app = app
+ self.dispatcher = dispatcher
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+
+ # Contains jobs that are waiting (only use from monitor thread)
+ self.waiting = []
+
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.monitor )
+ self.monitor_thread.start()
+ log.info( "job stopper started" )
+
+ def monitor( self ):
+ """
+ Continually iterate the waiting jobs, stop any that are found.
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to stop jobs.
+ """
+ # Pull all new jobs from the queue at once
+ jobs = []
+ try:
+ while 1:
+ ( job_id, error_msg ) = self.queue.get_nowait()
+ if job_id is self.STOP_SIGNAL:
+ return
+ # Append to watch queue
+ jobs.append( ( job_id, error_msg ) )
+ except Empty:
+ pass
+
+ for job_id, error_msg in jobs:
+ job = model.Job.get( job_id )
+ job.refresh()
+ # if desired, error the job so we can inform the user.
+ if error_msg is not None:
+ job.state = job.states.ERROR
+ job.info = error_msg
+ else:
+ job.state = job.states.DELETED
+ job.flush()
+ # if job is in JobQueue or FooJobRunner's put method,
+ # job_runner_name will be unset and the job will be dequeued due to
+ # state change above
+ if job.job_runner_name is not None:
+ # tell the dispatcher to stop the job
+ self.dispatcher.stop( job )
+
+ def put( self, job_id, error_msg=None ):
+ self.queue.put( ( job_id, error_msg ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ self.queue.put( ( self.STOP_SIGNAL, None ) )
+ self.sleeper.wake()
+ log.info( "job stopper stopped" )
+
+class NoopCloudMonitor( object ):
+ """
+ Implements the CloudMonitor interface but does nothing
+ """
+ def put( self, *args ):
+ return
+ def shutdown( self ):
+ return
+
diff -r 163ad9f94851 -r cff66a171623 lib/galaxy/cloud/providers/eucalyptus.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/galaxy/cloud/providers/eucalyptus.py Thu Sep 17 13:45:01 2009 -0400
@@ -0,0 +1,29 @@
+import subprocess, threading, os, errno
+from Queue import Queue, Empty
+from datetime import datetime
+
+from galaxy import model
+from galaxy.datatypes.data import nice_size
+
+from time import sleep
+
+import galaxy.eggs
+galaxy.eggs.require("boto")
+from boto.ec2.connection import EC2Connection
+from boto.ec2.regioninfo import RegionInfo
+
+import logging
+log = logging.getLogger( __name__ )
+
+class EucalyptusCloudProvider( object ):
+ """
+ Eucalyptus-based cloud provider implementation for managing instances.
+ """
+ def __init__( self, app ):
+ log.debug( "In eucalyptus cloud provider." )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the monitor thread"""
+ log.info( "sending stop signal to worker threads in eucalyptus cloud manager" )
+ self.queue.put( self.STOP_SIGNAL )
+ log.info( "eucalyptus cloud manager stopped" )
\ No newline at end of file
diff -r 163ad9f94851 -r cff66a171623 lib/galaxy/config.py
--- a/lib/galaxy/config.py Wed Sep 16 16:40:09 2009 -0400
+++ b/lib/galaxy/config.py Thu Sep 17 13:45:01 2009 -0400
@@ -105,6 +105,11 @@
self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' )
# Cloud configuration options
self.cloud_controller_instance = string_as_bool( kwargs.get( 'cloud_controller_instance', 'False' ) )
+ self.cloud_provider = kwargs.get( 'cloud_provider', None )
+ if self.cloud_controller_instance:
+ self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'True' ) )
+ else:
+ self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'False' ) )
def get( self, key, default ):
return self.config_dict.get( key, default )
def get_bool( self, key, default ):
diff -r 163ad9f94851 -r cff66a171623 templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Wed Sep 16 16:40:09 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Thu Sep 17 13:45:01 2009 -0400
@@ -129,7 +129,7 @@
## TODO: Once more instances will be running under the same liveInstance, additional logic will need to be added to account for that
%if instance.state == "running":
<a class="action-button" href="http://${instance.public_dns}" target="_blank">
- <span>Connect to!</span>
+ <span>Access Galaxy</span>
<img src="${h.url_for('/static/images/silk/resultset_next.png')}" /></a></div>
%endif
%endfor
1
0
details: http://www.bx.psu.edu/hg/galaxy/rev/163ad9f94851
changeset: 3064:163ad9f94851
user: Enis Afgan <afgane(a)gmail.com>
date: Wed Sep 16 16:40:09 2009 -0400
description:
Added functionality to delete a UCI.
diffstat:
lib/galaxy/web/controllers/cloud.py | 37 ++++++++++++++++++++++++++++++++-----
templates/cloud/viewInstance.mako | 8 ++++++++
2 files changed, 40 insertions(+), 5 deletions(-)
diffs (82 lines):
diff -r 5b18a134bf56 -r 163ad9f94851 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Tue Sep 15 16:53:01 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Wed Sep 16 16:40:09 2009 -0400
@@ -178,6 +178,7 @@
log.debug( "Detaching volume '%s' to instance '%s'." % ( store.volume_id, dbInstances.instance_id ) )
mntDevice = store.device
volStat = None
+# Detaching volume does not work with Eucalyptus Public Cloud, so comment it out
# try:
# volStat = conn.detach_volume( store.volume_id, dbInstances.instance_id, mntDevice )
# except:
@@ -194,7 +195,7 @@
dbInstances.stop_time = datetime.utcnow()
while cloudInstance.state != 'terminated':
log.debug( "Stopping instance %s state; current state: %s" % ( str( cloudInstance ).split(":")[1], cloudInstance.state ) )
- time.sleep(2)
+ time.sleep(3)
cloudInstance.update()
dbInstances.state = cloudInstance.state
@@ -214,13 +215,39 @@
return self.list( trans )
@web.expose
- @web.require_login( "delete Galaxy cloud instance" )
+ @web.require_login( "delete user configured Galaxy cloud instance" )
def deleteInstance( self, trans, id ):
- instance = get_uci( trans, id )
+ """
+ Deletes User Configured Instance (UCI) from the cloud and local database. NOTE that this implies deletion of
+ any and all storage associated with this UCI!
+ """
+ uci = get_uci( trans, id )
+ dbInstances = get_instances( trans, uci ) #TODO: handle list!
+ conn = get_connection( trans )
+ session = trans.sa_session
- error( "Deleting instance '%s' is not supported yet." % instance.name )
-
+ # Delete volume(s) associated with given uci
+ stores = get_stores( trans, uci )
+ for i, store in enumerate( stores ):
+ log.debug( "Deleting volume '%s' that is associated with UCI '%s'." % ( store.volume_id, uci.name ) )
+ volStat = None
+ try:
+ volStat = conn.delete_volume( store.volume_id )
+ except:
+ log.debug ( 'Error deleting volume %s' % store.volume_id )
+
+ if volStat:
+ session.delete( store )
+
+ # Delete UCI from table
+ uciName = uci.name # Store name for logging
+ session.delete( uci )
+
+ session.flush()
+ trans.log_event( "User deleted cloud instance '%s'" % uciName )
+ trans.set_message( "Galaxy instance '%s' deleted." % uciName )
+
return self.list( trans )
@web.expose
diff -r 5b18a134bf56 -r 163ad9f94851 templates/cloud/viewInstance.mako
--- a/templates/cloud/viewInstance.mako Tue Sep 15 16:53:01 2009 -0400
+++ b/templates/cloud/viewInstance.mako Wed Sep 16 16:40:09 2009 -0400
@@ -74,6 +74,14 @@
<td> ${liveInstance.state} </td>
</tr>
<tr>
+ <td> Type:</td>
+ <td> ${liveInstance.type} </td>
+ </tr>
+ <tr>
+ <td> Storage size:</td>
+ <td> ${liveInstance.uci.total_size} </td>
+ </tr>
+ <tr>
<td> Public DNS:</td>
<td> ${liveInstance.public_dns} </td>
</tr>
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/c9c9adf06e9d
changeset: 3066:c9c9adf06e9d
user: Enis Afgan <afgane(a)gmail.com>
date: Wed Sep 30 17:57:11 2009 -0400
description:
Modified couple of DB tables to accomodate per-user cloud provider support
diffstat:
lib/galaxy/app.py | 4 +-
lib/galaxy/cloud/__init__.py | 47 ++++++-----
lib/galaxy/cloud/providers/eucalyptus.py | 78 ++++++++++++++++++-
lib/galaxy/model/mapping.py | 16 ++-
lib/galaxy/model/migrate/versions/0014_cloud_tables.py | 23 +++--
lib/galaxy/web/controllers/cloud.py | 66 +++++++++++-----
templates/cloud/configure_cloud.mako | 14 +--
templates/cloud/view.mako | 4 +
templates/cloud/viewInstance.mako | 20 +++++
9 files changed, 196 insertions(+), 76 deletions(-)
diffs (660 lines):
diff -r cff66a171623 -r c9c9adf06e9d lib/galaxy/app.py
--- a/lib/galaxy/app.py Thu Sep 17 13:45:01 2009 -0400
+++ b/lib/galaxy/app.py Wed Sep 30 17:57:11 2009 -0400
@@ -1,6 +1,6 @@
import sys, os, atexit
-from galaxy import config, jobs, util, tools, web, cloud
+from galaxy import config, jobs, util, tools, web
from galaxy.tracks import store
from galaxy.web import security
import galaxy.model
@@ -68,8 +68,6 @@
# FIXME: These are exposed directly for backward compatibility
self.job_queue = self.job_manager.job_queue
self.job_stop_queue = self.job_manager.job_stop_queue
- # Start the cloud manager
- self.cloud_manager = cloud.CloudManager( self )
# Track Store
self.track_store = store.TrackStoreManager( self.config.track_store_path )
diff -r cff66a171623 -r c9c9adf06e9d lib/galaxy/cloud/__init__.py
--- a/lib/galaxy/cloud/__init__.py Thu Sep 17 13:45:01 2009 -0400
+++ b/lib/galaxy/cloud/__init__.py Wed Sep 30 17:57:11 2009 -0400
@@ -1,11 +1,12 @@
import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil
-from galaxy import util, model
+from galaxy import util, model, config
from galaxy.model import mapping
from galaxy.model.orm import lazyload
from galaxy.datatypes.tabular import *
from galaxy.datatypes.interval import *
from galaxy.datatypes import metadata
+#from util import Bunch
import pkg_resources
pkg_resources.require( "PasteDeploy" )
@@ -17,6 +18,10 @@
log = logging.getLogger( __name__ )
# States for running a job. These are NOT the same as data states
+#messages = {
+# JOB_WAIT
+#
+# }
JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted'
class CloudManager( object ):
@@ -29,19 +34,20 @@
# The dispatcher manager underlying cloud instances
self.provider = DefaultCloudProvider( app )
# Monitor for updating status of cloud instances
- self.cloud_monitor = CloudMonitor( app, self.provider )
+# self.cloud_monitor = CloudMonitor( self.config, self.provider )
# self.job_stop_queue = JobStopQueue( app, self.dispatcher )
else:
self.job_queue = self.job_stop_queue = NoopCloudMonitor()
+
def shutdown( self ):
self.cloud_monitor.shutdown()
# self.job_stop_queue.shutdown()
- def createUCI( self, name, storage_size, zone=None):
+ def createUCI( self, user, name, storage_size, zone=None):
"""
Createse User Configured Instance (UCI). Essentially, creates storage volume.
"""
- self.provider.createUCI( name, storage_size, zone )
+ self.provider.createUCI( user, name, storage_size, zone )
def deleteUCI( self, name ):
"""
@@ -104,7 +110,7 @@
self.provider = provider
self.monitor_thread = threading.Thread( target=self.__monitor )
self.monitor_thread.start()
- log.info( "cloud manager started" )
+ log.info( "Cloud manager started" )
# if app.config.get_bool( 'enable_job_recovery', True ):
# self.__check_jobs_at_startup()
@@ -143,11 +149,12 @@
while self.running:
try:
- self.__monitor_step()
+ #self.__monitor_step()
+ log.debug ( "would be calling monitor_step" )
except:
log.exception( "Exception in cloud manager monitor_step" )
# Sleep
- self.sleeper.sleep( 1 )
+ self.sleeper.sleep( 2 )
def __monitor_step( self ):
"""
@@ -636,39 +643,39 @@
# if app.config.start_job_runners is not None:
# start_cloud_provider.extend( app.config.start_job_runners.split(",") )
# for provider_name in start_cloud_provider:
- provider_name = app.config.cloud_provider
- if provider_name == "eucalyptus":
+ self.provider_name = app.config.cloud_provider
+ if self.provider_name == "eucalyptus":
import providers.eucalyptus
- self.cloud_provider[provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app )
- elif provider_name == "ec2":
+ self.cloud_provider[self.provider_name] = providers.eucalyptus.EucalyptusCloudProvider( app )
+ elif self.provider_name == "ec2":
import providers.ec2
- self.cloud_provider[provider_name] = providers.ec2.EC2CloudProvider( app )
+ self.cloud_provider[self.provider_name] = providers.ec2.EC2CloudProvider( app )
else:
- log.error( "Unable to start unknown cloud provider: %s" %provider_name )
+ log.error( "Unable to start unknown cloud provider: %s" %self.provider_name )
- def createUCI( self, name, storage_size, zone=None):
+ def createUCI( self, user, uciName, storage_size, zone=None):
"""
Createse User Configured Instance (UCI). Essentially, creates storage volume.
"""
- log.debug( "Creating UCI %s" % name )
- self.cloud_provider[name].createUCI( name, storage_size, zone )
+ log.debug( "Creating UCI %s" % uciName )
+ self.cloud_provider[self.provider_name].createUCI( user, uciName, storage_size, zone )
- def deleteUCI( self, name ):
+ def deleteUCI( self, uciName ):
"""
Deletes UCI. NOTE that this implies deletion of any and all data associated
with this UCI from the cloud. All data will be deleted.
"""
- def addStorageToUCI( self, name ):
+ def addStorageToUCI( self, uciName ):
""" Adds more storage to specified UCI """
- def startUCI( self, name, type ):
+ def startUCI( self, uciName, type ):
"""
Starts an instance of named UCI on the cloud. This implies, mounting of
storage and starting Galaxy instance.
"""
- def stopUCI( self, name ):
+ def stopUCI( self, uciName ):
"""
Stops cloud instance associated with named UCI. This also implies
stopping of Galaxy and unmounting of the file system.
diff -r cff66a171623 -r c9c9adf06e9d lib/galaxy/cloud/providers/eucalyptus.py
--- a/lib/galaxy/cloud/providers/eucalyptus.py Thu Sep 17 13:45:01 2009 -0400
+++ b/lib/galaxy/cloud/providers/eucalyptus.py Wed Sep 30 17:57:11 2009 -0400
@@ -2,7 +2,7 @@
from Queue import Queue, Empty
from datetime import datetime
-from galaxy import model
+from galaxy import model # Database interaction class
from galaxy.datatypes.data import nice_size
from time import sleep
@@ -19,11 +19,81 @@
"""
Eucalyptus-based cloud provider implementation for managing instances.
"""
- def __init__( self, app ):
- log.debug( "In eucalyptus cloud provider." )
+ def __init__( self, app, user ):
+ log.debug( "Using eucalyptus as default cloud provider." )
+ self.conn = get_connection( user )
+
+
+ def get_connection( user ):
+ """
+ Establishes EC2 connection using user's default credentials
+ """
+ log.debug( '##### Establishing cloud connection' )
+ creds = model.CloudUserCredentials.filter_by( user=user, defaultCred=True ).first()
+ if creds:
+ a_key = creds.access_key
+ s_key = creds.secret_key
+ # Amazon EC2
+ #conn = EC2Connection( a_key, s_key )
+ # Eucalyptus Public Cloud
+ # TODO: Add option in Galaxy config file to specify these values (i.e., for locally manages Eucalyptus deployments)
+ euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" )
+ conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" )
+ return conn
+ else:
+ log.debug( "User did not specify default credentials." )
+ return 0
+
def shutdown( self ):
"""Attempts to gracefully shut down the monitor thread"""
log.info( "sending stop signal to worker threads in eucalyptus cloud manager" )
self.queue.put( self.STOP_SIGNAL )
- log.info( "eucalyptus cloud manager stopped" )
\ No newline at end of file
+ log.info( "eucalyptus cloud manager stopped" )
+
+ def createUCI( self, user, name, storage_size, zone=None):
+ """
+ Creates User Configured Instance (UCI). Essentially, creates storage volume on cloud provider
+ and registers relevant information in Galaxy database.
+ """
+ conn = getConnection( user )
+ # Capture user configured instance information
+ uci = model.UCI()
+ uci.name = name
+ uci.user = user
+ uci.state = "available" # Valid states include: "available", "running" or "pending"
+ uci.total_size = storage_size # This is OK now because a new instance is being created.
+ # Capture store related information
+ storage = model.CloudStore()
+ storage.user = user
+ storage.uci = uci
+ storage.size = storage_size
+ storage.availability_zone = "us-east-1a" # TODO: Give user choice here. Also, enable region selection.
+ #self.conn.create_volume( storage_size, storage.availability_zone, snapshot=None )
+ # TODO: get correct value from Eucalyptus
+ storage.volume_id = "made up"
+ # Persist
+ uci.flush()
+ storage.flush()
+ session.flush()
+
+ def deleteUCI( self, name ):
+ """
+ Deletes UCI. NOTE that this implies deletion of any and all data associated
+ with this UCI from the cloud. All data will be deleted.
+ """
+
+ def addStorageToUCI( self, name ):
+ """ Adds more storage to specified UCI """
+
+ def startUCI( self, name, type ):
+ """
+ Starts an instance of named UCI on the cloud. This implies, mounting of
+ storage and starting Galaxy instance.
+ """
+
+ def stopUCI( self, name ):
+ """
+ Stops cloud instance associated with named UCI. This also implies
+ stopping of Galaxy and unmounting of the file system.
+ """
\ No newline at end of file
diff -r cff66a171623 -r c9c9adf06e9d lib/galaxy/model/mapping.py
--- a/lib/galaxy/model/mapping.py Thu Sep 17 13:45:01 2009 -0400
+++ b/lib/galaxy/model/mapping.py Wed Sep 30 17:57:11 2009 -0400
@@ -400,6 +400,7 @@
Column( "create_time", DateTime, default=now ),
Column( "update_time", DateTime, default=now, onupdate=now ),
Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ),
+ Column( "credentials_id", Integer, ForeignKey( "cloud_user_credentials.id" ), index=True, nullable=False ),
Column( "name", TEXT ),
Column( "state", TEXT ),
Column( "total_size", Integer ),
@@ -421,6 +422,7 @@
Column( "public_dns", TEXT ),
Column( "private_dns", TEXT ),
Column( "keypair_name", TEXT ),
+ Column( "keypair_material", TEXT ),
Column( "availability_zone", TEXT ) )
CloudStore.table = Table( "cloud_store", metadata,
@@ -436,19 +438,18 @@
Column( "i_id", TEXT, ForeignKey( "cloud_instance.instance_id" ), index=True ),
Column( "status", TEXT ),
Column( "device", TEXT ),
- Column( "space_consumed", Integer )
- )
+ Column( "space_consumed", Integer ) )
CloudUserCredentials.table = Table( "cloud_user_credentials", metadata,
Column( "id", Integer, primary_key=True ),
Column( "create_time", DateTime, default=now ),
Column( "update_time", DateTime, default=now, onupdate=now ),
Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ),
- Column( "name", TEXT),
- Column( "access_key", TEXT),
- Column( "secret_key", TEXT),
- Column( "defaultCred", Boolean, default=False)
- )
+ Column( "name", TEXT ),
+ Column( "access_key", TEXT ),
+ Column( "secret_key", TEXT ),
+ Column( "defaultCred", Boolean, default=False ),
+ Column( "provider_name", TEXT ) )
# ***************************************************************************
StoredWorkflow.table = Table( "stored_workflow", metadata,
@@ -962,6 +963,7 @@
assign_mapper( context, UCI, UCI.table,
properties=dict( user=relation( User ),
+ credentials=relation( CloudUserCredentials ),
instance=relation( CloudInstance, backref='uci' ),
store=relation( CloudStore, backref='uci' )
) )
diff -r cff66a171623 -r c9c9adf06e9d lib/galaxy/model/migrate/versions/0014_cloud_tables.py
--- a/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Thu Sep 17 13:45:01 2009 -0400
+++ b/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Wed Sep 30 17:57:11 2009 -0400
@@ -25,6 +25,7 @@
Column( "create_time", DateTime, default=now ),
Column( "update_time", DateTime, default=now, onupdate=now ),
Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ),
+ Column( "credentials_id", Integer, ForeignKey( "cloud_user_credentials.id" ), index=True, nullable=False ),
Column( "name", TEXT ),
Column( "state", TEXT ),
Column( "total_size", Integer ),
@@ -46,6 +47,7 @@
Column( "public_dns", TEXT ),
Column( "private_dns", TEXT ),
Column( "keypair_name", TEXT ),
+ Column( "keypair_material", TEXT ),
Column( "availability_zone", TEXT ) )
CloudStore_table = Table( "cloud_store", metadata,
@@ -61,19 +63,18 @@
Column( "i_id", TEXT, ForeignKey( "cloud_instance.instance_id" ), index=True ),
Column( "status", TEXT ),
Column( "device", TEXT ),
- Column( "space_consumed", Integer )
- )
+ Column( "space_consumed", Integer ) )
CloudUserCredentials_table = Table( "cloud_user_credentials", metadata,
Column( "id", Integer, primary_key=True ),
Column( "create_time", DateTime, default=now ),
Column( "update_time", DateTime, default=now, onupdate=now ),
Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=False ),
- Column( "name", TEXT),
- Column( "access_key", TEXT),
- Column( "secret_key", TEXT),
- Column( "defaultCred", Boolean, default=False)
- )
+ Column( "name", TEXT ),
+ Column( "access_key", TEXT ),
+ Column( "secret_key", TEXT ),
+ Column( "defaultCred", Boolean, default=False ),
+ Column( "provider_name", TEXT ) )
def upgrade():
metadata.reflect()
@@ -92,8 +93,8 @@
def downgrade():
metadata.reflect()
try:
- log.debug( "Would drop cloud_image table." )
- CloudImage_table.drop() #Enable before putting final version
+# log.debug( "Would drop cloud_image table." )
+ CloudImage_table.drop() #Enable before release
except Exception, e:
log.debug( "Dropping cloud_image table failed: %s" % str( e ) )
@@ -108,8 +109,8 @@
log.debug( "Dropping cloud_store table failed: %s" % str( e ) )
try:
- log.debug( "Would drop cloud_user_credentials table." )
- #CloudUserCredentials_table.drop() #Enable before putting final version
+# log.debug( "Would drop cloud_user_credentials table." )
+ CloudUserCredentials_table.drop() #Enable before putting final version
except Exception, e:
log.debug( "Dropping cloud_user_credentials table failed: %s" % str( e ) )
diff -r cff66a171623 -r c9c9adf06e9d lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Thu Sep 17 13:45:01 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Wed Sep 30 17:57:11 2009 -0400
@@ -21,12 +21,16 @@
galaxy.eggs.require("boto")
from boto.ec2.connection import EC2Connection
from boto.ec2.regioninfo import RegionInfo
+from galaxy.cloud import CloudManager
import logging
log = logging.getLogger( __name__ )
class CloudController( BaseController ):
+# def __init__( self ):
+# self.cloudManager = CloudManager()
+
@web.expose
def index( self, trans ):
return trans.fill_template( "cloud/index.mako" )
@@ -262,12 +266,20 @@
@web.expose
@web.require_login( "use Galaxy cloud" )
- def configureNew( self, trans, instanceName='', volSize=''):
+ def configureNew( self, trans, instanceName='', credName='', volSize='', zone=''):
"""
Configure and add new cloud instance to user's instance pool
"""
+ inst_error = vol_error = cred_error = None
user = trans.get_user()
- inst_error = vol_error = None
+ # TODO: Hack until present user w/ bullet list w/ registered credentials
+ storedCreds = trans.sa_session.query( model.CloudUserCredentials ) \
+ .filter_by( user=user ).all()
+ credsMatch = False
+ for cred in storedCreds:
+ if cred.name == credName:
+ credsMatch = True
+
if instanceName:
# Create new user configured instance
try:
@@ -280,23 +292,30 @@
vol_error = "Volume size cannot exceed 1000GB. You must specify an integer between 1 and 1000."
elif int( volSize ) < 1:
vol_error = "Volume size cannot be less than 1GB. You must specify an integer between 1 and 1000."
+ elif not credsMatch:
+ cred_error = "You specified unknown credentials."
else:
# Capture user configured instance information
uci = model.UCI()
uci.name = instanceName
+ uci.credentials = trans.app.model.CloudUserCredentials.filter(
+ trans.app.model.CloudUserCredentials.table.c.name==credName ).first()
uci.user= user
- uci.state = "available" # Valid states include: "available", "running" or "pending"
uci.total_size = volSize # This is OK now because new instance is being created.
+ # Need to flush because connection object accesses uci table
+ uci.flush()
# Capture store related information
storage = model.CloudStore()
storage.user = user
storage.uci = uci
storage.size = volSize
storage.availability_zone = "us-east-1a" # TODO: Give user choice here. Also, enable region selection.
- conn = get_connection( trans )
+ conn = get_connection( trans, credName )
#conn.create_volume( volSize, storage.availability_zone, snapshot=None )
# TODO: get correct value from AWS
storage.volume_id = "made up"
+ # TODO: If volume creation was successfull, set state to available
+ uci.state = "available" # Valid states include: "available", "running" or "pending"
# Persist
session = trans.sa_session
session.save_or_update( uci )
@@ -304,7 +323,7 @@
session.flush()
# Log and display the management page
trans.log_event( "User configured new cloud instance" )
- trans.set_message( "New Galaxy instance '%s' configured." % uci.name )
+ trans.set_message( "New Galaxy instance '%s' configured." % instanceName )
return self.list( trans )
except ValueError:
vol_error = "Volume size must be specified as an integer value only, between 1 and 1000."
@@ -315,6 +334,7 @@
return trans.show_form(
web.FormBuilder( web.url_for(), "Configure new instance", submit_text="Add" )
.add_text( "instanceName", "Instance name", value="Unnamed instance", error=inst_error )
+ .add_text( "credName", "Name of registered credentials to use", value="", error=cred_error )
.add_text( "volSize", "Permanent storage size (1GB - 1000GB)"
"<br />Note: you will be able to add more storage later", value='', error=vol_error ) )
@@ -347,7 +367,7 @@
return trans.show_form(
web.FormBuilder( web.url_for(), "Add new cloud image", submit_text="Add" )
- .add_text( "image_id", "Image ID", value='', error=error )
+ .add_text( "image_id", "Machine Image ID (AMI or EMI)", value='', error=error )
.add_text( "manifest", "Manifest", value='', error=error ) )
@web.expose
@@ -378,18 +398,20 @@
@web.expose
@web.require_login( "add credentials" )
- def add( self, trans, credName='', accessKey='', secretKey='', defaultCred=True ):
+ def add( self, trans, credName='', accessKey='', secretKey='', providerName='' ):
"""
Add user's cloud credentials stored under name `credName`.
"""
user = trans.get_user()
- cred_error = accessKey_error = secretKey_error = None
+ cred_error = accessKey_error = secretKey_error = provider_error = None
if credName:
if len( credName ) > 255:
cred_error = "Credentials name exceeds maximum allowable length."
elif trans.app.model.CloudUserCredentials.filter(
trans.app.model.CloudUserCredentials.table.c.name==credName ).first():
cred_error = "Credentials with that name already exist."
+ elif ( ( providerName.lower()!='ec2' ) and ( providerName.lower()!='eucalyptus' ) ):
+ provider_error = "You specified an unsupported cloud provider."
else:
# Create new user stored credentials
credentials = model.CloudUserCredentials()
@@ -397,6 +419,7 @@
credentials.user = user
credentials.access_key = accessKey
credentials.secret_key = secretKey
+ credentials.provider_name = providerName.lower()
# Persist
session = trans.sa_session
session.save_or_update( credentials )
@@ -404,15 +427,15 @@
# Log and display the management page
trans.log_event( "User added new credentials" )
trans.set_message( "Credential '%s' created" % credentials.name )
- if defaultCred:
- self.makeDefault( trans, credentials.id)
+# if defaultCred:
+# self.makeDefault( trans, credentials.id)
return self.list( trans )
return trans.show_form(
web.FormBuilder( web.url_for(), "Add credentials", submit_text="Add" )
.add_text( "credName", "Credentials name", value="Unnamed credentials", error=cred_error )
+ .add_text( "providerName", "Cloud provider name", value="ec2 or eucalyptus", error=provider_error )
.add_text( "accessKey", "Access key", value='', error=accessKey_error )
- .add_password( "secretKey", "Secret key", value='', error=secretKey_error )
- .add_input( "checkbox","Make default credentials","defaultCred", value='defaultCred' ) )
+ .add_password( "secretKey", "Secret key", value='', error=secretKey_error ) )
@web.expose
@web.require_login( "view credentials" )
@@ -443,8 +466,9 @@
@web.require_login( "delete credentials" )
def delete( self, trans, id=None ):
"""
- Delete user's cloud credentials
- """
+ Delete user's cloud credentials
+ TODO: Because UCI's depend on specific credentials, need to handle case where given credentials are being used by a UCI
+ """
# Load credentials from database
stored = get_stored_credentials( trans, id )
# Delete and save
@@ -899,7 +923,7 @@
def get_stored_credentials( trans, id, check_ownership=True ):
"""
- Get a StoredUserCredntials from the database by id, verifying ownership.
+ Get StoredUserCredentials from the database by id, verifying ownership.
"""
# Check if 'id' is in int (i.e., it was called from this program) or
# it was passed from the web (in which case decode it)
@@ -997,21 +1021,21 @@
cloudInstance = conn.get_all_instances( idLst )[0].instances[0]
return cloudInstance
-def get_connection( trans ):
+def get_connection( trans, credName ):
"""
- Establishes EC2 conncection using user's default credentials
+ Establishes EC2 connection using user's default credentials
"""
log.debug( '##### Establishing cloud connection.' )
user = trans.get_user()
- creds = trans.sa_session.query(model.CloudUserCredentials).filter_by(user=user, defaultCred=True).first()
+ creds = trans.sa_session.query( model.CloudUserCredentials ).filter_by( user=user, name=credName ).first()
if creds:
a_key = creds.access_key
s_key = creds.secret_key
# Amazon EC2
#conn = EC2Connection( a_key, s_key )
# Eucalyptus Public Cloud
- euca_region = RegionInfo(None, "eucalyptus", "mayhem9.cs.ucsb.edu")
- conn = EC2Connection(aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus")
+ euca_region = RegionInfo( None, "eucalyptus", "mayhem9.cs.ucsb.edu" )
+ conn = EC2Connection( aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus" )
return conn
else:
error( "You must specify default credentials before starting an instance." )
@@ -1031,6 +1055,8 @@
except AttributeError: # No keypair under this name exists so create it
log.debug( 'No keypair found, creating keypair' )
key_pair = conn.create_key_pair( 'galaxy-keypair' )
+ # TODO: Store key_pair.material into instance table - this is the only time private key can be retrieved
+ # Actually, probably return key_pair to calling method and store name & key from there...
return key_pair.name
diff -r cff66a171623 -r c9c9adf06e9d templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Thu Sep 17 13:45:01 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Wed Sep 30 17:57:11 2009 -0400
@@ -34,7 +34,7 @@
<table class="mange-table colored" border="0" cellspacing="0" cellpadding="0" width="100%">
<tr class="header">
<th>Credentials name</th>
- <th>Default</th>
+ <th>Provider</th>
<th></th>
</tr>
%for i, cloudCredential in enumerate( cloudCredentials ):
@@ -43,15 +43,7 @@
${cloudCredential.name}
<a id="cr-${i}-popup" class="popup-arrow" style="display: none;">▼</a>
</td>
- ## Comment <td>${len(workflow.latest_workflow.steps)}</td>
- ##<td>${str(cloudCredential.update_time)[:19]}</td>
- <td>
- ##${str(cloudCredential.defaultCred)}
- <%
- if cloudCredential.defaultCred:
- context.write('*')
- %>
- </td>
+ <td>${cloudCredential.provider_name}</td>
<td>
<div popupmenu="cr-${i}-popup">
@@ -85,7 +77,7 @@
<colgroup width="25%"></colgroup>
<colgroup width="10%"></colgroup>
<tr class="header">
- <th>Live instances</th>
+ <th>Your instances</th>
<th>Storage size (GB)</th>
<th>State</th>
<th>Alive since</th>
diff -r cff66a171623 -r c9c9adf06e9d templates/cloud/view.mako
--- a/templates/cloud/view.mako Thu Sep 17 13:45:01 2009 -0400
+++ b/templates/cloud/view.mako Wed Sep 30 17:57:11 2009 -0400
@@ -38,6 +38,10 @@
</td>
</tr>
<tr>
+ <td> Cloud provider: </td>
+ <td> ${str(credDetails.provider_name)[:16]}</td>
+ </tr>
+ <tr>
<td> Access key: </td>
<td>
${credDetails.access_key}
diff -r cff66a171623 -r c9c9adf06e9d templates/cloud/viewInstance.mako
--- a/templates/cloud/viewInstance.mako Thu Sep 17 13:45:01 2009 -0400
+++ b/templates/cloud/viewInstance.mako Wed Sep 30 17:57:11 2009 -0400
@@ -103,6 +103,26 @@
<td> ${liveInstance.keypair_name} </td>
</tr>
%endif
+ %if liveInstance.keypair_material != None:
+ <tr>
+ <td> Keypair material:</td>
+ <div id="shortComment2">
+ <a onclick="document.getElementById('fullComment2').style.display = 'block';
+ document.getElementById('shortComment2').style.display = 'none'; return 0"
+ href="javascript:void(0)">
+ + Show
+ </a>
+ </div>
+ <div id="fullComment2" style="DISPLAY: none">
+ <nobr><b>${liveInstance.keypair_material}</b></nobr><br/>
+ <a onclick="document.getElementById('shortComment2').style.display = 'block';
+ document.getElementById('fullComment2').style.display = 'none'; return 0;"
+ href="javascript:void(0)">
+ - Hide
+ </a>
+ </div>
+ </tr>
+ %endif
</table>
%else:
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/52318cf73454
changeset: 3062:52318cf73454
user: Enis Afgan <afgane(a)gmail.com>
date: Mon Sep 14 15:08:23 2009 -0400
description:
Cloud tab is now enabled/disabled through configuration option in universe_wsgi.ini file
diffstat:
lib/galaxy/config.py | 2 ++
lib/galaxy/web/controllers/cloud.py | 6 +++++-
templates/base_panels.mako | 18 ++++++++++--------
templates/root/index.mako | 34 ++++++++++++++++++++--------------
4 files changed, 37 insertions(+), 23 deletions(-)
diffs (121 lines):
diff -r 2d1b957b8448 -r 52318cf73454 lib/galaxy/config.py
--- a/lib/galaxy/config.py Fri Sep 11 15:25:21 2009 -0400
+++ b/lib/galaxy/config.py Mon Sep 14 15:08:23 2009 -0400
@@ -103,6 +103,8 @@
except ConfigParser.NoSectionError:
self.tool_runners = []
self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' )
+ # Cloud configuration options
+ self.cloud_controller_instance = string_as_bool( kwargs.get( 'cloud_controller_instance', 'False' ) )
def get( self, key, default ):
return self.config_dict.get( key, default )
def get_bool( self, key, default ):
diff -r 2d1b957b8448 -r 52318cf73454 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Fri Sep 11 15:25:21 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Mon Sep 14 15:08:23 2009 -0400
@@ -20,6 +20,7 @@
import galaxy.eggs
galaxy.eggs.require("boto")
from boto.ec2.connection import EC2Connection
+from boto.ec2.regioninfo import RegionInfo
import logging
log = logging.getLogger( __name__ )
@@ -937,7 +938,10 @@
if creds:
a_key = creds.access_key
s_key = creds.secret_key
- conn = EC2Connection( a_key, s_key )
+ #conn = EC2Connection( a_key, s_key )
+ euca_region = RegionInfo(None, "eucalyptus", "mayhem9.cs.ucsb.edu")
+ #conn = EC2Connection(aws_access_key_id="2s42fQmcCu4WBpS3RJ9e5g", aws_secret_access_key="2iEzpThjZQttuvWYXL-0nRUuurzl2dump2drwg", is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus")
+ conn = EC2Connection(aws_access_key_id=a_key, aws_secret_access_key=s_key, is_secure=False, port=8773, region=euca_region, path="/services/Eucalyptus")
return conn
else:
error( "You must specify default credentials before starting an instance." )
diff -r 2d1b957b8448 -r 52318cf73454 templates/base_panels.mako
--- a/templates/base_panels.mako Fri Sep 11 15:25:21 2009 -0400
+++ b/templates/base_panels.mako Mon Sep 14 15:08:23 2009 -0400
@@ -16,7 +16,7 @@
</%def>
## Default title
-<%def name="title()">Galaxy Cloud</%def>
+<%def name="title()">Galaxy</%def>
## Default stylesheets
<%def name="stylesheets()">
@@ -135,13 +135,15 @@
<td class="${cls}" style="${style}"><a target="${target}" href="${href}">${display}</a></td>
</%def>
- ##${tab( "analysis", "Analyze Data", h.url_for( controller='root', action='index' ))}
-
- ##${tab( "workflow", "Workflow", h.url_for( controller='workflow', action='index' ))}
-
- ##${tab( "libraries", "Data Libraries", h.url_for( controller='library', action='index' ))}
-
- ${tab( "cloud", "Cloud", h.url_for( controller='cloud', action='index' ))}
+ %if app.config.cloud_controller_instance:
+ ${tab( "cloud", "Cloud", h.url_for( controller='cloud', action='index' ))}
+ %else:
+ ${tab( "analysis", "Analyze Data", h.url_for( controller='root', action='index' ))}
+
+ ${tab( "workflow", "Workflow", h.url_for( controller='workflow', action='index' ))}
+
+ ${tab( "libraries", "Data Libraries", h.url_for( controller='library', action='index' ))}
+ %endif
%if trans.request_types():
<td class="tab">
diff -r 2d1b957b8448 -r 52318cf73454 templates/root/index.mako
--- a/templates/root/index.mako Fri Sep 11 15:25:21 2009 -0400
+++ b/templates/root/index.mako Mon Sep 14 15:08:23 2009 -0400
@@ -2,9 +2,14 @@
<%def name="init()">
<%
- self.has_left_panel=False
- self.has_right_panel=False
- self.active_view="cloud"
+if trans.app.config.cloud_controller_instance:
+ self.has_left_panel=False
+ self.has_right_panel=False
+ self.active_view="cloud"
+else:
+ self.has_left_panel=True
+ self.has_right_panel=True
+ self.active_view="analysis"
%>
%if trans.app.config.require_login and not trans.user:
<script type="text/javascript">
@@ -28,17 +33,18 @@
## If a specific tool id was specified, load it in the middle frame
<%
- if trans.app.config.require_login and not trans.user:
- center_url = h.url_for( controller='user', action='login' )
- elif tool_id is not None:
- center_url = h.url_for( 'tool_runner', tool_id=tool_id, from_noframe=True )
- elif workflow_id is not None:
- center_url = h.url_for( controller='workflow', action='run', id=workflow_id )
- elif m_c is not None:
- center_url = h.url_for( controller=m_c, action=m_a )
- else:
- #center_url = h.url_for( '/static/welcome.html' )
- center_url = h.url_for( controller='cloud', action='list' )
+if trans.app.config.require_login and not trans.user:
+ center_url = h.url_for( controller='user', action='login' )
+elif tool_id is not None:
+ center_url = h.url_for( 'tool_runner', tool_id=tool_id, from_noframe=True )
+elif workflow_id is not None:
+ center_url = h.url_for( controller='workflow', action='run', id=workflow_id )
+elif m_c is not None:
+ center_url = h.url_for( controller=m_c, action=m_a )
+elif trans.app.config.cloud_controller_instance:
+ center_url = h.url_for( controller='cloud', action='list' )
+else:
+ center_url = h.url_for( '/static/welcome.html' )
%>
<iframe name="galaxy_main" id="galaxy_main" frameborder="0" style="position: absolute; width: 100%; height: 100%;" src="${center_url}"> </iframe>
1
0
23 Nov '09
details: http://www.bx.psu.edu/hg/galaxy/rev/81e8a5e42900
changeset: 3060:81e8a5e42900
user: afgane(a)enis-afgans-macbook-pro.local
date: Fri Sep 04 09:54:27 2009 -0400
description:
Added most of the functionality to connect to EC2. Have not tested it though.
diffstat:
lib/galaxy/model/migrate/versions/0014_cloud_tables.py | 6 +-
lib/galaxy/web/controllers/cloud.py | 337 ++++++++++++----------
static/june_2007_style/blue/base.css | 9 +
templates/base_panels.mako | 8 +-
templates/cloud/configure_cloud.mako | 56 ++-
templates/cloud/viewInstance.mako | 48 +--
templates/root/index.mako | 9 +-
7 files changed, 258 insertions(+), 215 deletions(-)
diffs (728 lines):
diff -r f87f1eed6e0c -r 81e8a5e42900 lib/galaxy/model/migrate/versions/0014_cloud_tables.py
--- a/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Fri Aug 28 17:45:40 2009 -0400
+++ b/lib/galaxy/model/migrate/versions/0014_cloud_tables.py Fri Sep 04 09:54:27 2009 -0400
@@ -92,8 +92,8 @@
def downgrade():
metadata.reflect()
try:
- log.deboug( "Would drop cloud_image table." )
- #CloudImage_table.drop() #Enable before putting final version
+ log.debug( "Would drop cloud_image table." )
+ CloudImage_table.drop() #Enable before putting final version
except Exception, e:
log.debug( "Dropping cloud_image table failed: %s" % str( e ) )
@@ -108,7 +108,7 @@
log.debug( "Dropping cloud_store table failed: %s" % str( e ) )
try:
- log.deboug( "Would drop cloud_user_credentials table." )
+ log.debug( "Would drop cloud_user_credentials table." )
#CloudUserCredentials_table.drop() #Enable before putting final version
except Exception, e:
log.debug( "Dropping cloud_user_credentials table failed: %s" % str( e ) )
diff -r f87f1eed6e0c -r 81e8a5e42900 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Fri Aug 28 17:45:40 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Fri Sep 04 09:54:27 2009 -0400
@@ -37,31 +37,33 @@
Render cloud main page (management of cloud resources)
"""
user = trans.get_user()
+ pendingInstances = trans.sa_session.query( model.UCI ) \
+ .filter_by( user=user, state="pending" ) \
+ .all()
+
+ for i in range( len( pendingInstances ) ):
+ update_instance_state( trans, pendingInstances[i].id )
+
cloudCredentials = trans.sa_session.query( model.CloudUserCredentials ) \
+ .filter_by( user=user ) \
.order_by( desc( model.CloudUserCredentials.c.update_time ) ) \
.all()
- prevInstances = trans.sa_session.query( model.UCI ) \
- .filter_by( user=user, state="available" ) \
- .order_by( desc( model.UCI.c.update_time ) ) \
- .all() #TODO: diff between live and previous instances
-
- liveInstances = trans.sa_session.query( model.CloudInstance ) \
- .filter_by( user=user ) \
- .filter( or_(model.CloudInstance.c.state=="running", model.CloudInstance.c.state=="pending") ) \
- .order_by( desc( model.CloudInstance.c.launch_time ) ) \
- .all()
-
liveInstances = trans.sa_session.query( model.UCI ) \
.filter_by( user=user ) \
.filter( or_(model.UCI.c.state=="running", model.UCI.c.state=="pending") ) \
.order_by( desc( model.UCI.c.launch_time ) ) \
.all()
-
+
+ prevInstances = trans.sa_session.query( model.UCI ) \
+ .filter_by( user=user, state="available" ) \
+ .order_by( desc( model.UCI.c.update_time ) ) \
+ .all()
+
return trans.fill_template( "cloud/configure_cloud.mako",
cloudCredentials = cloudCredentials,
- prevInstances = prevInstances,
- liveInstances = liveInstances )
+ liveInstances = liveInstances,
+ prevInstances = prevInstances )
@web.expose
@web.require_login( "use Galaxy cloud" )
@@ -77,162 +79,94 @@
newDefault.defaultCred = True
trans.sa_session.flush()
trans.set_message( "Credentials '%s' set as default." % newDefault.name )
-
+
+ # TODO: Fix bug that when this function returns, top Galaxy tab bar is missing from the webpage
return self.list( trans ) #trans.fill_template( "cloud/configure_cloud.mako",
#awsCredentials = awsCredentials )
-
- @web.expose
- @web.require_login( "use Galaxy workflows" )
- def list_for_run( self, trans ):
- """
- Render workflow list for analysis view (just allows running workflow
- or switching to management view)
- """
- user = trans.get_user()
- workflows = trans.sa_session.query( model.StoredWorkflow ) \
- .filter_by( user=user, deleted=False ) \
- .order_by( desc( model.StoredWorkflow.c.update_time ) ) \
- .all()
- shared_by_others = trans.sa_session \
- .query( model.StoredWorkflowUserShareAssociation ) \
- .filter_by( user=user ) \
- .filter( model.StoredWorkflow.c.deleted == False ) \
- .order_by( desc( model.StoredWorkflow.c.update_time ) ) \
- .all()
- return trans.fill_template( "workflow/list_for_run.mako",
- workflows = workflows,
- shared_by_others = shared_by_others )
-
- @web.expose
- @web.require_login( "use Galaxy workflows" )
- def share( self, trans, id, email="" ):
- msg = mtype = None
- # Load workflow from database
- stored = get_stored_workflow( trans, id )
- if email:
- other = model.User.filter( and_( model.User.table.c.email==email,
- model.User.table.c.deleted==False ) ).first()
- if not other:
- mtype = "error"
- msg = ( "User '%s' does not exist" % email )
- elif other == trans.get_user():
- mtype = "error"
- msg = ( "You cannot share a workflow with yourself" )
- elif trans.sa_session.query( model.StoredWorkflowUserShareAssociation ) \
- .filter_by( user=other, stored_workflow=stored ).count() > 0:
- mtype = "error"
- msg = ( "Workflow already shared with '%s'" % email )
- else:
- share = model.StoredWorkflowUserShareAssociation()
- share.stored_workflow = stored
- share.user = other
- session = trans.sa_session
- session.save_or_update( share )
- session.flush()
- trans.set_message( "Workflow '%s' shared with user '%s'" % ( stored.name, other.email ) )
- return trans.response.send_redirect( url_for( controller='workflow', action='sharing', id=id ) )
- return trans.fill_template( "workflow/share.mako",
- message = msg,
- messagetype = mtype,
- stored=stored,
- email=email )
-
- @web.expose
- @web.require_login( "use Galaxy workflows" )
- def sharing( self, trans, id, **kwargs ):
- session = trans.sa_session
- stored = get_stored_workflow( trans, id )
- if 'enable_import_via_link' in kwargs:
- stored.importable = True
- stored.flush()
- elif 'disable_import_via_link' in kwargs:
- stored.importable = False
- stored.flush()
- elif 'unshare_user' in kwargs:
- user = session.query( model.User ).get( trans.security.decode_id( kwargs['unshare_user' ] ) )
- if not user:
- error( "User not found for provided id" )
- association = session.query( model.StoredWorkflowUserShareAssociation ) \
- .filter_by( user=user, stored_workflow=stored ).one()
- session.delete( association )
- session.flush()
- return trans.fill_template( "workflow/sharing.mako",
- stored=stored )
-
- @web.expose
- @web.require_login( "use Galaxy workflows" )
- def imp( self, trans, id, **kwargs ):
- session = trans.sa_session
- stored = get_stored_workflow( trans, id, check_ownership=False )
- if stored.importable == False:
- error( "The owner of this workflow has disabled imports via this link" )
- elif stored.user == trans.user:
- error( "You are already the owner of this workflow, can't import" )
- elif stored.deleted:
- error( "This workflow has been deleted, can't import" )
- elif session.query( model.StoredWorkflowUserShareAssociation ) \
- .filter_by( user=trans.user, stored_workflow=stored ).count() > 0:
- error( "This workflow is already shared with you" )
- else:
- share = model.StoredWorkflowUserShareAssociation()
- share.stored_workflow = stored
- share.user = trans.user
- session = trans.sa_session
- session.save_or_update( share )
- session.flush()
- # Redirect to load galaxy frames.
- return trans.response.send_redirect( url_for( controller='workflow' ) )
-
+
+
@web.expose
@web.require_login( "start Galaxy cloud instance" )
- def start( self, trans, id, size='small' ):
+ def start( self, trans, id, size='' ):
"""
Start a new cloud resource instance
"""
+ # TODO: Add choice of instance size before starting one
+ #if size:
user = trans.get_user()
mi = get_mi( trans, size )
uci = get_uci( trans, id )
- stores = get_stores( trans, uci ) #TODO: handle list!
+ stores = get_stores( trans, uci )
+ log.debug(self.app.config.job_working_directory)
+ if len(stores) is not 0:
+ instance = model.CloudInstance()
+ instance.user = user
+ instance.image = mi
+ instance.uci = uci
+ instance.keypair_name = get_keypair_name( trans )
+ instance.availability_zone = stores[0].availability_zone # Bc. all EBS volumes need to be in the same avail. zone, just check 1st
+ instance.type = size
+ conn = get_connection( trans )
+ # Get or setup appropriate security group
+ sg = list() # security group list
+ try:
+ gSecurityGroup = conn.create_security_group('galaxy', 'Galaxy security group')
+ gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' )
+ sg.append( gSecurityGroup )
+ except:
+ sgs = rs = conn.get_all_security_groups()
+ for i in range( len( sgs ) ):
+ if sgs[i].name == "galaxy":
+ sg.append( sgs[i] )
+ break # only 1 security group w/ this name can exist, so continue
+
+ #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=sg, instance_type=instance.type, placement=instance.availability_zone )
+ instance.launch_time = datetime.utcnow()
+ uci.launch_time = instance.launch_time
+ #instance.reservation = str( reservation.instances[0] )
+ instance.state = "pending"
+ #instance.state = reservation.instances[0].state
+ uci.state = instance.state
+ # TODO: After instance boots up, need to update status, DNS and attach EBS
+
+ # Persist
+ session = trans.sa_session
+ session.save_or_update( instance )
+ session.flush()
+ #error( "Starting instance '%s' is not supported yet." % uci.name )
+
+ trans.log_event( "User started cloud instance '%s'" % uci.name )
+ trans.set_message( "Galaxy instance '%s' started. NOTE: Please wait about 2-3 minutes for the instance to "
+ "start up and then refresh this page. A button to connect to the instance will then appear alongside "
+ "instance description." % uci.name )
+ return self.list( trans )
- instance = model.CloudInstance()
- instance.user = user
- instance.image = mi
- instance.uci = uci
- # TODO: get real value from AWS
- instance.state = "pending"
- uci.state = instance.state
- uci.launch_time = datetime.utcnow()
- instance.launch_time = datetime.utcnow()
- instance.availability_zone = stores.availability_zone
- instance.type = size
-
- # Persist
- session = trans.sa_session
- session.save_or_update( instance )
- session.flush()
- #error( "Starting instance '%s' is not supported yet." % uci.name )
+# return trans.show_form(
+# web.FormBuilder( web.url_for(), "Start instance size", submit_text="Start" )
+# .add_input( "radio","Small","size", value='small' )
+# .add_input( "radio","Medium","size", value='medium' ) )
- trans.log_event( "User started cloud instance '%s'" % uci.name )
- trans.set_message( "Galaxy instance '%s' started." % uci.name )
- return self.list( trans )
@web.expose
@web.require_login( "stop Galaxy cloud instance" )
def stop( self, trans, id ):
"""
- Stop a cloud resource instance
+ Stop a cloud instance. This implies stopping Galaxy servcer and unmounting relevant file system(s)
"""
uci = get_uci( trans, id )
instances = get_instances( trans, uci ) #TODO: handle list!
- instances.state = 'done'
+ instances.stop()
+ instances.state = 'terminated'
instances.stop_time = datetime.utcnow()
+ # Reset relevant UCI fields
uci.state = 'available'
uci.launch_time = None
+
# Persist
session = trans.sa_session
+ session.save_or_update( instances )
session.save_or_update( uci )
- session.save_or_update( instances )
session.flush()
trans.log_event( "User stopped cloud instance '%s'" % uci.name )
trans.set_message( "Galaxy instance '%s' stopped." % uci.name )
@@ -429,11 +363,11 @@
"""
View details about running instance
"""
- instance = get_uci( trans, id )
- log.debug ( instance.name )
+ uci = get_uci( trans, id )
+ instances = get_instances( trans, uci ) # TODO: Handle list (will probably need to be done in mako template)
return trans.fill_template( "cloud/viewInstance.mako",
- liveInstance = instance )
+ liveInstance = instances )
@web.expose
@@ -931,7 +865,10 @@
"""
Get a UCI from the database by id, verifying ownership.
"""
- id = trans.security.decode_id( id )
+ # Check if 'id' is in int (i.e., it was called from this program) or
+ # it was passed from the web (in which case decode it)
+ if not isinstance( id, int ):
+ id = trans.security.decode_id( id )
live = trans.sa_session.query( model.UCI ).get( id )
if not live:
@@ -956,28 +893,121 @@
def get_stores( trans, uci ):
"""
- Get store objects/tables that are connected to uci object/table
+ Get stores objects that are connected to uci object
"""
user = trans.get_user()
stores = trans.sa_session.query( model.CloudStore ) \
.filter_by( user=user, uci_id=uci.id ) \
- .first()
- #.all() #TODO: return all but need to edit calling method(s) to handle list
+ .all()
return stores
def get_instances( trans, uci ):
"""
- Get instance objects/tables that are connected to uci object/table
+ Get objects of instances that are pending or running and are connected to uci object
"""
user = trans.get_user()
instances = trans.sa_session.query( model.CloudInstance ) \
.filter_by( user=user, uci_id=uci.id ) \
+ .filter( or_(model.CloudInstance.table.c.state=="running", model.CloudInstance.table.c.state=="pending" ) ) \
.first()
#.all() #TODO: return all but need to edit calling method(s) to handle list
return instances
+
+def get_cloud_instance( conn, instance_id ):
+ """
+ Returns a cloud instance representation of the instance id, i.e., cloud instance objects that cloud API can be invoked on
+ """
+ # get_all_instances func. takes a list of desired instance id's, so create a list first
+ idLst = list()
+ idLst.append( instance_id )
+ # Retrieve cloud instance based on passed instance id
+ cloudInstance = conn.get_all_instances( idLst )
+ return cloudInstance[0]
+
+def get_connection( trans ):
+ """
+ Establishes EC2 conncection using user's default credentials
+ """
+ user = trans.get_user()
+ creds = trans.sa_session.query(model.CloudUserCredentials).filter_by(user=user, defaultCred=True).first()
+ if creds:
+ a_key = creds.access_key
+ s_key = creds.secret_key
+ conn = EC2Connection( a_key, s_key )
+ return conn
+ else:
+ error( "You must specify default credentials before starting an instance." )
+ return 0
+
+def get_keypair_name( trans ):
+ """
+ Generate X.509 Certificate (i.e., keypair) using user's default credentials
+ """
+ conn = get_connection( trans )
+ try:
+ key_pair = conn.get_key_pair( 'galaxy-keypair' )
+ except: # No keypair under this name exists so create it
+ key_pair = conn.create_key_pair( 'galaxy-keypair' )
+# log.debug( key_pair.name )
+
+ return key_pair.name
+
+def update_instance_state( trans, id ):
+ """
+ Update state of instances associated with given UCI id and store it in local database. Also update
+ state of the given UCI.
+ """
+ uci = get_uci( trans, id )
+ # Get list of instances associated with given uci as they are stored in local database
+ dbInstances = get_instances( trans, uci ) # TODO: handle list (currently only 1 instance can correspond to 1 UCI)
+ oldState = dbInstances.state
+ # Establish connection with cloud
+ conn = get_connection( trans )
+ # Get actual instance from the cloud
+ cloudInstance = get_cloud_instance( conn, dbInstances.instance_id )
+ # Update status of instance
+ cloudInstance.update()
+ dbInstances.state = cloudInstance.state
+ log.debug( "Processing instance %i, instance current state: %s" % i, cloudInstance.state )
+
+ # If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS)
+ if oldState=="pending" and dbInstances.state=="running":
+ update_instance( trans, dbInstances, cloudInstance )
+
+ # Update state of UCI (TODO: once more than 1 instance is assoc. w/ 1 UCI, this will be need to be updated differently)
+ uci.state = dbInstances.state
+
+ # Persist
+ session = trans.sa_session
+ session.save_or_update( dbInstances )
+ session.save_or_update( uci )
+ session.flush()
+
+
+def update_instance( trans, dbInstance, cloudInstance ):
+ """
+ Update instance: mount file system, start Galaxy and update local DB w/ DNS info
+
+ Keyword arguments:
+ trans -- current transaction
+ dbInstance -- object of 'instance' as it is stored in local database
+ cloudInstance -- object of 'instance' as it resides in the cloud. Functions supported by the cloud API can be
+ instantiated directly on this object.
+ """
+ dbInstance.public_dns = cloudInstance.dns_name
+ dbInstance.private_dns = cloudInstance.private_dns_name
+
+ # TODO: mount storage
+ # TODO: start Galaxy
+
+ # Persist
+ session = trans.sa_session
+ session.save_or_update( dbInstance )
+ session.flush()
+
def attach_ordered_steps( workflow, steps ):
ordered_steps = order_workflow_steps( steps )
if ordered_steps:
@@ -1104,3 +1134,8 @@
cleanup( prefix, input.cases[current_case].inputs, group_values )
cleanup( "", inputs, values )
return associations
+
+
+
+
+
diff -r f87f1eed6e0c -r 81e8a5e42900 static/june_2007_style/blue/base.css
--- a/static/june_2007_style/blue/base.css Fri Aug 28 17:45:40 2009 -0400
+++ b/static/june_2007_style/blue/base.css Fri Sep 04 09:54:27 2009 -0400
@@ -57,6 +57,15 @@
table.colored tr.header{background:#ebd9b2;background-image:url(form_title_bg.png);background-repeat:repeat-x;background-position:top;border-bottom:solid #d8b365 1px;font-weight:bold;}
table.colored tr{background:white;}
table.colored tr.odd_row{background:#DADFEF;}
+table.noBottomHR{border-top:solid #d8b365 1px;}
+table.noBottomHR td,table.noBottomHR th{text-align:left;padding:5px;}
+table.noBottomHR tr.header{background:#ebd9b2;background-image:url(form_title_bg.png);background-repeat:repeat-x;background-position:top;border-bottom:solid #d8b365 1px;font-weight:bold;}
+table.noBottomHR tr{background:white;}
+table.noBottomHR tr.odd_row{background:#DADFEF;}
+table.noHR td,table.noHR th{text-align:left;padding:5px;}
+table.noHR tr.header{background:#ebd9b2;background-image:url(form_title_bg.png);background-repeat:repeat-x;background-position:top;border-bottom:solid #d8b365 1px;font-weight:bold;}
+table.noHR tr{background:white;}
+table.noHR tr.odd_row{background:#DADFEF;}
div.debug{margin:10px;padding:5px;background:#FFFF99;border:solid #FFFF33 1px;color:black;}
div.odd_row{background:#DADFEF;}
#footer{display:none;}
diff -r f87f1eed6e0c -r 81e8a5e42900 templates/base_panels.mako
--- a/templates/base_panels.mako Fri Aug 28 17:45:40 2009 -0400
+++ b/templates/base_panels.mako Fri Sep 04 09:54:27 2009 -0400
@@ -16,7 +16,7 @@
</%def>
## Default title
-<%def name="title()">Galaxy</%def>
+<%def name="title()">Galaxy Cloud</%def>
## Default stylesheets
<%def name="stylesheets()">
@@ -135,11 +135,11 @@
<td class="${cls}" style="${style}"><a target="${target}" href="${href}">${display}</a></td>
</%def>
- ${tab( "analysis", "Analyze Data", h.url_for( controller='root', action='index' ))}
+ ##${tab( "analysis", "Analyze Data", h.url_for( controller='root', action='index' ))}
- ${tab( "workflow", "Workflow", h.url_for( controller='workflow', action='index' ))}
+ ##${tab( "workflow", "Workflow", h.url_for( controller='workflow', action='index' ))}
- ${tab( "libraries", "Data Libraries", h.url_for( controller='library', action='index' ))}
+ ##${tab( "libraries", "Data Libraries", h.url_for( controller='library', action='index' ))}
${tab( "cloud", "Cloud", h.url_for( controller='cloud', action='index' ))}
diff -r f87f1eed6e0c -r 81e8a5e42900 templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Fri Aug 28 17:45:40 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Fri Sep 04 09:54:27 2009 -0400
@@ -82,12 +82,14 @@
<colgroup width="40%"></colgroup>
<colgroup width="15%"></colgroup>
<colgroup width="10%"></colgroup>
- <colgroup width="35%"></colgroup>
+ <colgroup width="25%"></colgroup>
+ <colgroup width="10%"></colgroup>
<tr class="header">
<th>Live instances</th>
<th>Storage size (GB)</th>
<th>State</th>
<th>Alive since</th>
+ <th></th>
<th></th>
</tr>
%if liveInstances:
@@ -97,30 +99,40 @@
${liveInstance.name}
<a id="li-${i}-popup" class="popup-arrow" style="display: none;">▼</a>
</td>
- <td>${str(liveInstance.total_size)}</td> <!--TODO:Replace with vol size once available-->
+ <td>${str(liveInstance.total_size)}</td>
<td>${str(liveInstance.state)}</td>
<td>
- ${str(liveInstance.launch_time)[:16]}
+ ##${str(liveInstance.launch_time)[:16]}
<%
- from datetime import datetime
- from datetime import timedelta
+ #from datetime import datetime
+ #from datetime import timedelta
# DB stores all times in GMT, so adjust for difference (4 hours)
- adjustedStarttime = liveInstance.update_time - timedelta(hours=4)
+ #adjustedStarttime = liveInstance.update_time - timedelta(hours=4)
- # (NOT CURRENTLY USED BLOCK OF CODE) Calculate time difference from now
- delta = datetime.now() - adjustedStarttime
+ #delta = datetime.now() - adjustedStarttime
#context.write( str(datetime.utcnow() ) )
#context.write( str(delta) )
# This is where current time and since duration is calculated
- #context.write( str( liveInstance.launch_time ) )
- context.write( ' UTC (' )
- context.write( str(h.date.distance_of_time_in_words (liveInstance.launch_time, h.date.datetime.utcnow() ) ) )
-
-
- %>)
+ if liveInstance.launch_time is None:
+ context.write( 'N/A' )
+ else:
+ context.write( str( liveInstance.launch_time )[:16] )
+ context.write( ' UTC (' )
+ context.write( str(h.date.distance_of_time_in_words (liveInstance.launch_time, h.date.datetime.utcnow() ) ) )
+ context.write( ')' )
+ %>
</td>
+ <td><div align="right">
+ %for i, instance in enumerate( liveInstance.instance ):
+ %if instance.state == "running":
+ <a class="action-button" href="http://${instance.public_dns}" target="_blank">
+ <span>Connect to!</span>
+ <img src="${h.url_for('/static/images/silk/resultset_next.png')}" /></a></div>
+ %endif
+ %endfor
+ </td>
<td>
<div popupmenu="li-${i}-popup">
<a class="action-button" confirm="Are you sure you want to stop instance '${liveInstance.name}'?" href="${h.url_for( action='stop', id=trans.security.encode_id(liveInstance.id) )}">Stop</a>
@@ -139,21 +151,21 @@
## *****************************************************
## Manage previously configured instances
- <table class="mange-table colored" border="0" cellspacing="0" cellpadding="0" width="100%">
+ <table class="mange-table noHR" border="0" cellspacing="0" cellpadding="0" width="100%">
<colgroup width="40%"></colgroup>
<colgroup width="15%"></colgroup>
<colgroup width="10%"></colgroup>
<colgroup width="35%"></colgroup>
- <tr class="header">
- <th>Previously configured instances</th>
+ ##<tr class="header">
+ ##<th>Previously configured instances</th>
##<th>Storage size (GB)</th>
##<th>State</th>
##<th>Alive since</th>
- <th></th>
- <th></th>
- <th></th>
- <th></th>
- </tr>
+ ##<th></th>
+ ##<th></th>
+ ##<th></th>
+ ##<th></th>
+ ##</tr>
%if prevInstances:
%for i, prevInstance in enumerate( prevInstances ):
diff -r f87f1eed6e0c -r 81e8a5e42900 templates/cloud/viewInstance.mako
--- a/templates/cloud/viewInstance.mako Fri Aug 28 17:45:40 2009 -0400
+++ b/templates/cloud/viewInstance.mako Fri Sep 04 09:54:27 2009 -0400
@@ -1,6 +1,13 @@
<%inherit file="/base.mako"/>
<%def name="title()">Live instance details</%def>
+<%
+ # Because of the one-to-many relationship between liveInstance (i.e., UCI) and actual instances, need to know
+ # which one is currently active. Because only one instance of UCI can be alive at any point in time, simply
+ # select the most recent one.
+ # TODO: Once individual UCI's will be able to start more than one instance, this will need to be fixed
+ #i_id = len(liveInstance.instance) - 1
+%>
<h2>Live instance details</h2>
@@ -18,22 +25,22 @@
<tr>
<td> Instance name: </td>
<td>
- ${liveInstance.name}
+ ${liveInstance.uci.name}
<a id="li-popup" class="popup-arrow" style="display: none;">▼</a>
</td>
<td>
<div popupmenu="li-popup">
- <a class="action-button" href="${h.url_for( action='renameInstance', id=trans.security.encode_id(liveInstance.id) )}">Rename</a>
- <a class="action-button" confirm="Are you sure you want to stop instance '${liveInstance.name}'?" href="${h.url_for( action='stop', id=trans.security.encode_id(liveInstance.id) )}">Stop</a>
+ <a class="action-button" href="${h.url_for( action='renameInstance', id=trans.security.encode_id(liveInstance.uci.id) )}">Rename</a>
+ <a class="action-button" confirm="Are you sure you want to stop instance '${liveInstance.uci.name}'?" href="${h.url_for( action='stop', id=trans.security.encode_id(liveInstance.uci.id) )}">Stop</a>
</div>
</td>
</tr>
<tr>
<td> Date created: </td>
- <td> ${str(liveInstance.create_time)[:16]}
+ <td> ${str(liveInstance.uci.create_time)[:16]}
<%
context.write( ' UTC (' )
- context.write( str(h.date.distance_of_time_in_words (liveInstance.create_time, h.date.datetime.utcnow() ) ) )
+ context.write( str(h.date.distance_of_time_in_words (liveInstance.uci.create_time, h.date.datetime.utcnow() ) ) )
%> ago)
</td>
</tr>
@@ -60,7 +67,7 @@
%endif
<tr>
<td> AMI: </td>
- <td> ${liveInstance.ami} </td>
+ <td> ${liveInstance.mi_id} </td>
</tr>
<tr>
<td> State:</td>
@@ -82,34 +89,13 @@
<td> ${liveInstance.availability_zone} </td>
</tr>
%endif
- %if liveInstance.keypair_fingerprint != None:
+ %if liveInstance.keypair_name != None:
<tr>
- <td> Keypair fingerprint:</td>
- <td> ${liveInstance.keypair_fingerprint} </td>
+ <td> Keypair file name:</td>
+ <td> ${liveInstance.keypair_name} </td>
</tr>
%endif
- %if liveInstance.keypair_material != None:
- <tr>
- <td> Keypair private key:</td>
- <td>
- <div id="shortComment2">
- <a onclick="document.getElementById('fullComment2').style.display = 'block';
- document.getElementById('shortComment2').style.display = 'none'; return 0"
- href="javascript:void(0)">
- + Show
- </a>
- </div>
- <div id="fullComment2" style="DISPLAY: none">
- <nobr><b>${liveInstance.keypair_material}</b></nobr><br/>
- <a onclick="document.getElementById('shortComment2').style.display = 'block';
- document.getElementById('fullComment2').style.display = 'none'; return 0;"
- href="javascript:void(0)">
- - Hide
- </a>
- </div>
- </td>
- </tr>
- %endif
+
</table>
%else:
There is no live instance under that name.
diff -r f87f1eed6e0c -r 81e8a5e42900 templates/root/index.mako
--- a/templates/root/index.mako Fri Aug 28 17:45:40 2009 -0400
+++ b/templates/root/index.mako Fri Sep 04 09:54:27 2009 -0400
@@ -2,9 +2,9 @@
<%def name="init()">
<%
- self.has_left_panel=True
- self.has_right_panel=True
- self.active_view="analysis"
+ self.has_left_panel=False
+ self.has_right_panel=False
+ self.active_view="cloud"
%>
%if trans.app.config.require_login and not trans.user:
<script type="text/javascript">
@@ -37,7 +37,8 @@
elif m_c is not None:
center_url = h.url_for( controller=m_c, action=m_a )
else:
- center_url = h.url_for( '/static/welcome.html' )
+ #center_url = h.url_for( '/static/welcome.html' )
+ center_url = h.url_for( controller='cloud', action='list' )
%>
<iframe name="galaxy_main" id="galaxy_main" frameborder="0" style="position: absolute; width: 100%; height: 100%;" src="${center_url}"> </iframe>
1
0
details: http://www.bx.psu.edu/hg/galaxy/rev/2d1b957b8448
changeset: 3061:2d1b957b8448
user: Enis Afgan <afgane(a)gmail.com>
date: Fri Sep 11 15:25:21 2009 -0400
description:
Merge
diffstat:
lib/galaxy/web/controllers/cloud.py | 68 ++++++++++++++++++++--------------
templates/cloud/configure_cloud.mako | 3 +-
2 files changed, 42 insertions(+), 29 deletions(-)
diffs (161 lines):
diff -r 81e8a5e42900 -r 2d1b957b8448 lib/galaxy/web/controllers/cloud.py
--- a/lib/galaxy/web/controllers/cloud.py Fri Sep 04 09:54:27 2009 -0400
+++ b/lib/galaxy/web/controllers/cloud.py Fri Sep 11 15:25:21 2009 -0400
@@ -107,20 +107,20 @@
instance.availability_zone = stores[0].availability_zone # Bc. all EBS volumes need to be in the same avail. zone, just check 1st
instance.type = size
conn = get_connection( trans )
- # Get or setup appropriate security group
- sg = list() # security group list
+ # If not existent, setup galaxy security group
try:
- gSecurityGroup = conn.create_security_group('galaxy', 'Galaxy security group')
- gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' )
- sg.append( gSecurityGroup )
+ gSecurityGroup = conn.create_security_group('galaxy', 'Security group for Galaxy.')
+ gSecurityGroup.authorize( 'tcp', 80, 80, '0.0.0.0/0' ) # Open HTTP port
+ gSecurityGroup.authorize( 'tcp', 22, 22, '0.0.0.0/0' ) # Open SSH port
except:
- sgs = rs = conn.get_all_security_groups()
- for i in range( len( sgs ) ):
- if sgs[i].name == "galaxy":
- sg.append( sgs[i] )
- break # only 1 security group w/ this name can exist, so continue
+ pass
+# sgs = conn.get_all_security_groups()
+# for i in range( len( sgs ) ):
+# if sgs[i].name == "galaxy":
+# sg.append( sgs[i] )
+# break # only 1 security group w/ this name can exist, so continue
- #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=sg, instance_type=instance.type, placement=instance.availability_zone )
+ #reservation = conn.run_instances( image_id=instance.image, key_name=instance.keypair_name, security_groups=['galaxy'], instance_type=instance.type, placement=instance.availability_zone )
instance.launch_time = datetime.utcnow()
uci.launch_time = instance.launch_time
#instance.reservation = str( reservation.instances[0] )
@@ -133,7 +133,6 @@
session = trans.sa_session
session.save_or_update( instance )
session.flush()
- #error( "Starting instance '%s' is not supported yet." % uci.name )
trans.log_event( "User started cloud instance '%s'" % uci.name )
trans.set_message( "Galaxy instance '%s' started. NOTE: Please wait about 2-3 minutes for the instance to "
@@ -225,9 +224,11 @@
storage.user = user
storage.uci = uci
storage.size = volSize
- # TODO: get correct values from AWS
+ storage.availability_zone = "us-east-1a" # TODO: Give user choice here. Also, enable region selection.
+ conn = get_connection( trans )
+ #conn.create_volume( volSize, storage.availability_zone, snapshot=None )
+ # TODO: get correct value from AWS
storage.volume_id = "made up"
- storage.availability_zone = "avail zone"
# Persist
session = trans.sa_session
session.save_or_update( uci )
@@ -863,7 +864,7 @@
def get_uci( trans, id, check_ownership=True ):
"""
- Get a UCI from the database by id, verifying ownership.
+ Get a UCI object from the database by id, verifying ownership.
"""
# Check if 'id' is in int (i.e., it was called from this program) or
# it was passed from the web (in which case decode it)
@@ -944,20 +945,19 @@
def get_keypair_name( trans ):
"""
- Generate X.509 Certificate (i.e., keypair) using user's default credentials
+ Generate keypair using user's default credentials
"""
conn = get_connection( trans )
try:
key_pair = conn.get_key_pair( 'galaxy-keypair' )
except: # No keypair under this name exists so create it
key_pair = conn.create_key_pair( 'galaxy-keypair' )
-# log.debug( key_pair.name )
return key_pair.name
def update_instance_state( trans, id ):
"""
- Update state of instances associated with given UCI id and store it in local database. Also update
+ Update state of instances associated with given UCI id and store state in local database. Also update
state of the given UCI.
"""
uci = get_uci( trans, id )
@@ -971,36 +971,48 @@
# Update status of instance
cloudInstance.update()
dbInstances.state = cloudInstance.state
- log.debug( "Processing instance %i, instance current state: %s" % i, cloudInstance.state )
-
- # If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS)
- if oldState=="pending" and dbInstances.state=="running":
- update_instance( trans, dbInstances, cloudInstance )
-
+ log.debug( "Processing instance %i, current instance state: %s" % i, cloudInstance.state )
# Update state of UCI (TODO: once more than 1 instance is assoc. w/ 1 UCI, this will be need to be updated differently)
uci.state = dbInstances.state
-
# Persist
session = trans.sa_session
session.save_or_update( dbInstances )
session.save_or_update( uci )
session.flush()
+ # If instance is now running, update/process instance (i.e., mount file system, start Galaxy, update DB with DNS)
+ if oldState=="pending" and dbInstances.state=="running":
+ update_instance( trans, dbInstances, cloudInstance, conn, uci )
-def update_instance( trans, dbInstance, cloudInstance ):
+
+def update_instance( trans, dbInstance, cloudInstance, conn, uci ):
"""
- Update instance: mount file system, start Galaxy and update local DB w/ DNS info
+ Update instance: connect EBS volume, mount file system, start Galaxy, and update local DB w/ DNS info
Keyword arguments:
trans -- current transaction
dbInstance -- object of 'instance' as it is stored in local database
cloudInstance -- object of 'instance' as it resides in the cloud. Functions supported by the cloud API can be
instantiated directly on this object.
+ conn -- cloud connection object
+ uci -- UCI object
"""
dbInstance.public_dns = cloudInstance.dns_name
dbInstance.private_dns = cloudInstance.private_dns_name
- # TODO: mount storage
+ # TODO: connect EBS volume to instance
+ # Attach storage volume(s) to instance
+ stores = get_stores( trans, uci )
+ for i, store in enumerate( stores ):
+ log.debug( "Attaching volume %s to instance %s." % store.volume_id, store.uci_id )
+ mtnDevice = '/dev/sdb'+str(i)
+ conn.attach_volume( store.volume_id, store.uci_id, mntDevice )
+
+ # Wait until instances have attached and add file system
+
+
+
+ # TODO: mount storage through ZFS
# TODO: start Galaxy
# Persist
diff -r 81e8a5e42900 -r 2d1b957b8448 templates/cloud/configure_cloud.mako
--- a/templates/cloud/configure_cloud.mako Fri Sep 04 09:54:27 2009 -0400
+++ b/templates/cloud/configure_cloud.mako Fri Sep 11 15:25:21 2009 -0400
@@ -125,7 +125,8 @@
%>
</td>
<td><div align="right">
- %for i, instance in enumerate( liveInstance.instance ):
+ %for j, instance in enumerate( liveInstance.instance ):
+ ## TODO: Once more instances will be running under the same liveInstance, additional logic will need to be added to account for that
%if instance.state == "running":
<a class="action-button" href="http://${instance.public_dns}" target="_blank">
<span>Connect to!</span>
1
0