commit/galaxy-central: inithello: Improve display of job status, migration script to accomodate same
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/b31863f9408d/ changeset: b31863f9408d user: inithello date: 2012-09-04 15:42:54 summary: Improve display of job status, migration script to accomodate same affected #: 5 files diff -r f7775cff6f6c8b3527557cae16151c9c38945679 -r b31863f9408d082a8dbd209af1eb4b8609faa9ac lib/galaxy/jobs/deferred/liftover_transfer.py --- a/lib/galaxy/jobs/deferred/liftover_transfer.py +++ b/lib/galaxy/jobs/deferred/liftover_transfer.py @@ -40,7 +40,7 @@ deferred = trans.app.model.DeferredJob( state = self.app.model.DeferredJob.states.NEW, plugin = 'LiftOverTransferPlugin', params = params ) self.sa_session.add( deferred ) self.sa_session.flush() - return job.id + return deferred.id def check_job( self, job ): if job.params['type'] == 'init_transfer': diff -r f7775cff6f6c8b3527557cae16151c9c38945679 -r b31863f9408d082a8dbd209af1eb4b8609faa9ac lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -1539,7 +1539,7 @@ properties=dict( job = relation( Job ), history = relation( History ) ) ) assign_mapper( context, GenomeIndexToolData, GenomeIndexToolData.table, - properties=dict( job = relation( Job ), + properties=dict( job = relation( Job, backref='job' ), dataset = relation( Dataset ), user = relation( User ), deferred = relation( DeferredJob, backref='deferred_job' ), diff -r f7775cff6f6c8b3527557cae16151c9c38945679 -r b31863f9408d082a8dbd209af1eb4b8609faa9ac lib/galaxy/model/migrate/versions/0104_update_genome_downloader_job_parameters.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0104_update_genome_downloader_job_parameters.py @@ -0,0 +1,82 @@ +""" +Migration script to update the deferred job parameters for liftover transfer jobs. +""" + +from sqlalchemy import * +from migrate import * +from sqlalchemy.orm import * + +import datetime +now = datetime.datetime.utcnow + +# Need our custom types, but don't import anything else from model +from galaxy.model.custom_types import * +from galaxy.model.orm.ext.assignmapper import assign_mapper + +import sys, logging +log = logging.getLogger( __name__ ) +log.setLevel(logging.DEBUG) +handler = logging.StreamHandler( sys.stdout ) +format = "%(name)s %(levelname)s %(asctime)s %(message)s" +formatter = logging.Formatter( format ) +handler.setFormatter( formatter ) +log.addHandler( handler ) + +context = scoped_session( sessionmaker( autoflush=False, autocommit=True ) ) + +metadata = MetaData( migrate_engine ) + +class DeferredJob( object ): + states = Bunch( NEW = 'new', + WAITING = 'waiting', + QUEUED = 'queued', + RUNNING = 'running', + OK = 'ok', + ERROR = 'error' ) + def __init__( self, state=None, plugin=None, params=None ): + self.state = state + self.plugin = plugin + self.params = params + +DeferredJob.table = Table( "deferred_job", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "state", String( 64 ), index=True ), + Column( "plugin", String( 128 ), index=True ), + Column( "params", JSONType ) ) + +assign_mapper( context, DeferredJob, DeferredJob.table, properties = {} ) + +def upgrade(): + print __doc__ + liftoverjobs = dict() + + jobs = context.query( DeferredJob ).filter_by( plugin='LiftOverTransferPlugin' ).all() + + for job in jobs: + if job.params[ 'parentjob' ] not in liftoverjobs: + liftoverjobs[ job.params[ 'parentjob' ] ] = [] + liftoverjobs[ job.params[ 'parentjob'] ].append( job.id ) + + for parent in liftoverjobs: + lifts = liftoverjobs[ parent ] + deferred = context.query( DeferredJob ).filter_by( id=parent ).first() + deferred.params[ 'liftover' ] = lifts + + context.flush() + +def downgrade(): + + jobs = context.query( DeferredJob ).filter_by( plugin='GenomeTransferPlugin' ).all() + + for job in jobs: + if len( job.params[ 'liftover' ] ) == 0: + continue + transfers = [] + for lift in job.params[ 'liftover' ]: + liftoverjob = context.query( DeferredJob ).filter_by( id=lift ).first() + transfers.append( liftoverjob.params[ 'transfer_job_id' ] ) + job.params[ 'liftover' ] = transfers + + context.flush() diff -r f7775cff6f6c8b3527557cae16151c9c38945679 -r b31863f9408d082a8dbd209af1eb4b8609faa9ac lib/galaxy/tools/actions/index_genome.py --- a/lib/galaxy/tools/actions/index_genome.py +++ b/lib/galaxy/tools/actions/index_genome.py @@ -37,7 +37,7 @@ # Setup job and job wrapper. # - # Add association for keeping track of job, history, archive relationship. + # Add association for keeping track of index jobs, transfer jobs, and so on. user = trans.sa_session.query( trans.app.model.User ).get( int( incoming['user'] ) ) assoc = trans.app.model.GenomeIndexToolData( job=job, dataset=temp_dataset, fasta_path=incoming['path'], \ indexer=incoming['indexer'], user=user, \ @@ -46,11 +46,10 @@ job_wrapper = GenomeIndexToolWrapper( job ) cmd_line = job_wrapper.setup_job( assoc ) + # # Add parameters to job_parameter table. # - - # Set additional parameters. incoming[ '__GENOME_INDEX_COMMAND__' ] = cmd_line for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): job.add_parameter( name, value ) diff -r f7775cff6f6c8b3527557cae16151c9c38945679 -r b31863f9408d082a8dbd209af1eb4b8609faa9ac lib/galaxy/web/controllers/data_admin.py --- a/lib/galaxy/web/controllers/data_admin.py +++ b/lib/galaxy/web/controllers/data_admin.py @@ -116,99 +116,22 @@ def download_build( self, trans, **kwd ): """Download a genome from a remote source and add it to the library.""" params = util.Params( kwd ) - source = params.get('source', '') - longname = params.get('longname', None) - if not isinstance( params.get( 'indexers', None ), list ): - indexers = [ params.get( 'indexers', None ) ] - else: - indexers = params.get( 'indexers', None ) - if indexers is not None: - if indexers == [None]: - indexers = None - url = None - liftover = None - newlift = [] - dbkey = params.get( 'dbkey', None ) + paramdict = build_param_dict( params, trans ) + if paramdict[ 'status' ] == 'error': + return trans.fill_template( '/admin/data_admin/generic_error.mako', message=paramdict[ 'message' ] ) + url = paramdict[ 'url' ] + liftover = paramdict[ 'liftover' ] + dbkey = paramdict[ 'dbkey' ] + indexers = paramdict[ 'indexers' ] + longname = paramdict[ 'longname' ] dbkeys = dict() protocol = 'http' - - if source == 'NCBI': - build = params.get('ncbi_name', '') - dbkey = build.split( ': ' )[0] - longname = build.split( ': ' )[-1] - url = 'http://togows.dbcls.jp/entry/ncbi-nucleotide/%s.fasta' % dbkey - elif source == 'URL': - dbkey = params.get( 'url_dbkey', '' ) - url = params.get( 'url', None ) - longname = params.get( 'longname', None ) - elif source == 'UCSC': - longname = None - for build in trans.ucsc_builds: - if dbkey == build[0]: - dbkey = build[0] - longname = build[1] - break - if dbkey == '?': - return trans.fill_template( '/admin/data_admin/generic_error.mako', message='An invalid build was specified.' ) - ftp = ftplib.FTP('hgdownload.cse.ucsc.edu') - ftp.login('anonymous', trans.get_user().email) - checker = [] - liftover = [] - newlift = [] - try: - ftp.retrlines('NLST /goldenPath/%s/liftOver/*.chain.gz' % dbkey, liftover.append) - for chain in liftover: - fname = chain.split( '/' )[-1] - target = fname.replace( '.over.chain.gz', '' ).split( 'To' )[1] - target = target[0].lower() + target[1:] - if not os.path.exists( os.path.join( trans.app.config.get( 'genome_data_path', 'tool-data/genome' ), dbkey, 'liftOver', fname ) ): - newlift.append( [ chain, dbkey, target ] ) - current = dbkey[0].upper() + dbkey[1:] - targetfile = '%sTo%s.over.chain.gz' % ( target, current ) - if not os.path.exists( os.path.join( trans.app.config.get( 'genome_data_path', 'tool-data/genome' ), target, 'liftOver', targetfile ) ): - newlift.append( [ '/goldenPath/%s/liftOver/%s' % ( target, targetfile ), target, dbkey ] ) - except: - newlift = None - pass - ftp.retrlines('NLST /goldenPath/%s/bigZips/' % dbkey, checker.append) - ftp.quit() - for filename in [ dbkey, 'chromFa' ]: - for extension in [ '.tar.gz', '.tar.bz2', '.zip', '.fa.gz', '.fa.bz2' ]: - testfile = '/goldenPath/%s/bigZips/%s%s' % ( dbkey, filename, extension ) - if testfile in checker: - url = 'ftp://hgdownload.cse.ucsc.edu%s' % testfile - break; - else: - continue - if url is None: - message = u'The genome %s was not found on the UCSC server.' % dbkey - status = u'error' - return trans.fill_template( '/admin/data_admin/data_form.mako', - message=message, - status=status, - ensembls=trans.ensembl_builds, - dbkeys=trans.ucsc_builds ) - elif source == 'Ensembl': - dbkey = params.get( 'ensembl_dbkey', None ) - if dbkey == '?': - return trans.fill_template( '/admin/data_admin/generic_error.mako', message='An invalid build was specified.' ) - for build in trans.ensembl_builds: - if build[ 'dbkey' ] == dbkey: - dbkey = build[ 'dbkey' ] - release = build[ 'release' ] - pathname = '_'.join( build[ 'name' ].split(' ')[0:2] ) - longname = build[ 'name' ].replace('_', ' ') - break - url = 'ftp://ftp.ensembl.org/pub/release-%s/fasta/%s/dna/%s.%s.%s.dna.toplevel.fa.gz' % ( release, pathname.lower(), pathname, dbkey, release ) - else: - return trans.fill_template( '/admin/data_admin/generic_error.mako', message='An invalid data source was specified.' ) if url is None: return trans.fill_template( '/admin/data_admin/generic_error.mako', message='Unable to generate a valid URL with the specified parameters.' ) - params = dict( protocol='http', name=dbkey, datatype='fasta', url=url, user=trans.user.id ) jobid = trans.app.job_manager.deferred_job_queue.plugins['GenomeTransferPlugin'].create_job( trans, url, dbkey, longname, indexers ) chainjob = [] - if newlift is not None: - for chain in newlift: + if liftover is not None: + for chain in liftover: liftover_url = u'ftp://hgdownload.cse.ucsc.edu%s' % chain[0] from_genome = chain[1] to_genome = chain[2] @@ -228,11 +151,10 @@ def monitor_status( self, trans, **kwd ): params = util.Params( kwd ) jobid = params.get( 'job', '' ) - gname = params.get( 'intname', '' ) deferred = trans.app.model.context.current.query( model.DeferredJob ).filter_by( id=jobid ).first() gname = deferred.params[ 'intname' ] indexers = ', '.join( deferred.params[ 'indexes' ] ) - jobs = self._get_jobs( jobid, trans ) + jobs = self._get_jobs( deferred, trans ) jsonjobs = json.dumps( jobs ) return trans.fill_template( '/admin/data_admin/download_status.mako', name=gname, indexers=indexers, mainjob=jobid, jobs=jobs, jsonjobs=jsonjobs ) @@ -243,15 +165,16 @@ jobs = [] params = util.Params( kwd ) jobid = params.get( 'jobid', '' ) - jobs = self._get_jobs( jobid, trans ) + job = sa_session.query( model.DeferredJob ).filter_by( id=jobid ).first() + jobs = self._get_jobs( job, trans ) return trans.fill_template( '/admin/data_admin/ajax_status.mako', json=json.dumps( jobs ) ) def _get_job( self, jobid, jobtype, trans ): sa = trans.app.model.context.current if jobtype == 'liftover': - job = sa.query( model.TransferJob ).filter_by( id=jobid ).first() - liftover = trans.app.job_manager.deferred_job_queue.plugins['LiftOverTransferPlugin'].get_job_status( jobid ) - joblabel = 'Download liftOver (%s to %s)' % ( liftover.params[ 'from_genome' ], liftover.params[ 'to_genome' ] ) + liftoverjob = sa.query( model.DeferredJob ).filter_by( id=jobid ).first() + job = sa.query( model.TransferJob ).filter_by( id=liftoverjob.params[ 'transfer_job_id' ] ).first() + joblabel = 'Download liftOver (%s to %s)' % ( liftoverjob.params[ 'from_genome' ], liftoverjob.params[ 'to_genome' ] ) elif jobtype == 'transfer': job = sa.query( model.TransferJob ).filter_by( id=jobid ).first() joblabel = 'Download Genome' @@ -259,21 +182,111 @@ job = sa.query( model.DeferredJob ).filter_by( id=jobid ).first() joblabel = 'Main Controller' elif jobtype == 'index': - job = sa.query( model.Job ).filter_by( id=jobid ).first() - joblabel = 'Index Genome' + job = sa.query( model.Job ).filter_by( id=jobid.job_id ).first() + joblabel = 'Index Genome (%s)' % jobid.indexer return dict( status=job.state, jobid=job.id, style=self.jobstyles[job.state], type=jobtype, label=joblabel ) - def _get_jobs( self, jobid, trans ): + def _get_jobs( self, deferredjob, trans ): jobs = [] - job = trans.app.job_manager.deferred_job_queue.plugins['GenomeTransferPlugin'].get_job_status( jobid ) + idxjobs = [] sa_session = trans.app.model.context.current - jobs.append( self._get_job( job.id, 'deferred', trans ) ) - if hasattr( job, 'transfer_job' ): # This is a transfer job, check for indexers - jobs.append( self._get_job( job.transfer_job.id, 'transfer', trans ) ) - idxjobs = sa_session.query( model.GenomeIndexToolData ).filter_by( deferred_job_id=job.id, transfer_job_id=job.transfer_job.id ).all() - if job.params.has_key( 'liftover' ) and job.params[ 'liftover' ] is not None: - for jobid in job.params[ 'liftover' ]: - jobs.append( self._get_job( jobid, 'liftover', trans ) ) - for idxjob in idxjobs: - jobs.append( self._get_job( idxjob.job_id, 'index', trans ) ) + job = sa_session.query( model.GenomeIndexToolData ).filter_by( deferred=deferredjob ).first() + jobs.append( self._get_job( deferredjob.id, 'deferred', trans ) ) + if 'transfer_job_id' in deferredjob.params: #hasattr( job, 'transfer' ) and job.transfer is not None: # This is a transfer job, check for indexers + jobs.append( self._get_job( deferredjob.params[ 'transfer_job_id' ], 'transfer', trans ) ) + if hasattr( job, 'deferred' ): + idxjobs = sa_session.query( model.GenomeIndexToolData ).filter_by( deferred=job.deferred, transfer=job.transfer ).all() + if deferredjob.params.has_key( 'liftover' ) and deferredjob.params[ 'liftover' ] is not None: + for jobid in deferredjob.params[ 'liftover' ]: + jobs.append( self._get_job( jobid, 'liftover', trans ) ) + for idxjob in idxjobs: + jobs.append( self._get_job( idxjob, 'index', trans ) ) return jobs + +def build_param_dict( params, trans ): + + source = params.get('source', '') + longname = params.get('longname', None) + if not isinstance( params.get( 'indexers', None ), list ): + indexers = [ params.get( 'indexers', None ) ] + else: + indexers = params.get( 'indexers', None ) + if indexers is not None: + if indexers == [None]: + indexers = None + url = None + liftover = None + newlift = [] + dbkey = params.get( 'dbkey', None ) + dbkeys = dict() + protocol = 'http' + + if source == 'NCBI': + build = params.get('ncbi_name', '') + dbkey = build.split( ': ' )[0] + longname = build.split( ': ' )[-1] + url = 'http://togows.dbcls.jp/entry/ncbi-nucleotide/%s.fasta' % dbkey + elif source == 'URL': + dbkey = params.get( 'url_dbkey', '' ) + url = params.get( 'url', None ) + longname = params.get( 'longname', None ) + elif source == 'UCSC': + longname = None + for build in trans.ucsc_builds: + if dbkey == build[0]: + dbkey = build[0] + longname = build[1] + break + if dbkey == '?': + return dict( status='error', message='An invalid build was specified.' ) + ftp = ftplib.FTP('hgdownload.cse.ucsc.edu') + ftp.login('anonymous', trans.get_user().email) + checker = [] + liftover = [] + newlift = [] + try: + ftp.retrlines('NLST /goldenPath/%s/liftOver/*.chain.gz' % dbkey, liftover.append) + for chain in liftover: + fname = chain.split( '/' )[-1] + target = fname.replace( '.over.chain.gz', '' ).split( 'To' )[1] + target = target[0].lower() + target[1:] + if not os.path.exists( os.path.join( trans.app.config.get( 'genome_data_path', 'tool-data/genome' ), dbkey, 'liftOver', fname ) ): + newlift.append( [ chain, dbkey, target ] ) + current = dbkey[0].upper() + dbkey[1:] + targetfile = '%sTo%s.over.chain.gz' % ( target, current ) + if not os.path.exists( os.path.join( trans.app.config.get( 'genome_data_path', 'tool-data/genome' ), target, 'liftOver', targetfile ) ): + newlift.append( [ '/goldenPath/%s/liftOver/%s' % ( target, targetfile ), target, dbkey ] ) + except: + newlift = None + pass + ftp.retrlines('NLST /goldenPath/%s/bigZips/' % dbkey, checker.append) + ftp.quit() + for filename in [ dbkey, 'chromFa' ]: + for extension in [ '.tar.gz', '.tar.bz2', '.zip', '.fa.gz', '.fa.bz2' ]: + testfile = '/goldenPath/%s/bigZips/%s%s' % ( dbkey, filename, extension ) + if testfile in checker: + url = 'ftp://hgdownload.cse.ucsc.edu%s' % testfile + break; + else: + continue + if url is None: + message = 'The genome %s was not found on the UCSC server.' % dbkey + status = 'error' + return dict( status=status, message=message ) + + elif source == 'Ensembl': + dbkey = params.get( 'ensembl_dbkey', None ) + if dbkey == '?': + return dict( status='error', message='An invalid build was specified.' ) + for build in trans.ensembl_builds: + if build[ 'dbkey' ] == dbkey: + dbkey = build[ 'dbkey' ] + release = build[ 'release' ] + pathname = '_'.join( build[ 'name' ].split(' ')[0:2] ) + longname = build[ 'name' ].replace('_', ' ') + break + url = 'ftp://ftp.ensembl.org/pub/release-%s/fasta/%s/dna/%s.%s.%s.dna.toplevel.fa.gz' % ( release, pathname.lower(), pathname, dbkey, release ) + + params = dict( status='ok', dbkey=dbkey, datatype='fasta', url=url, user=trans.user.id, liftover=newlift, longname=longname, indexers=indexers ) + + return params \ No newline at end of file Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket