# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User Dannon Baker <dannonbaker@me.com> # Date 1289408765 18000 # Node ID d56f296aaa610a537d35cb7d235eac9d8f596464 # Parent d6a0dd8e55e2f7d2077bc282bca3e42d98fbb7f2 Minimally functional splitting. Disabled by default, not advisable to enable on a production server (or main!) until scheduling issues are looked at and performance analysis has been done. Tested 'basic' splitting with many basic tools (bowtie, bwa, filter sam, others). This style of splitting should work for most 'embarassingly parallel' one input, one output tools (no dependence between parallel tasks). --- a/tools/sr_mapping/bowtie_wrapper.xml +++ b/tools/sr_mapping/bowtie_wrapper.xml @@ -1,6 +1,7 @@ <tool id="bowtie_wrapper" name="Map with Bowtie for Illumina" version="1.1.0"><requirements><requirement type='package'>bowtie</requirement></requirements><description></description> + <parallelism method="basic"></parallelism><command interpreter="python"> bowtie_wrapper.py --threads="4" --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1,10 +1,11 @@ -import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil +import logging, threading, sys, os, time, traceback, shutil import galaxy from galaxy import util, model from galaxy.model.orm import lazyload from galaxy.datatypes.tabular import * from galaxy.datatypes.interval import * +# tabular/interval imports appear to be unused. Clean up? from galaxy.datatypes import metadata from galaxy.util.json import from_json_string from galaxy.util.expressions import ExpressionContext @@ -13,8 +14,6 @@ from galaxy.jobs.actions.post import Act import pkg_resources pkg_resources.require( "PasteDeploy" ) -from paste.deploy.converters import asbool - from Queue import Queue, Empty log = logging.getLogger( __name__ ) @@ -83,11 +82,9 @@ class JobQueue( object ): self.parent_pid = os.getpid() # Contains new jobs. Note this is not used if track_jobs_in_database is True self.queue = Queue() - # Contains jobs that are waiting (only use from monitor thread) ## This and jobs_to_check[] are closest to a "Job Queue" self.waiting_jobs = [] - # Helper for interruptable sleep self.sleeper = Sleeper() self.running = True @@ -104,7 +101,7 @@ class JobQueue( object ): the database and requeues or cleans up as necessary. Only run as the job manager starts. """ - model = self.app.model + model = self.app.model # DBTODO Why? for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ): if job.tool_id not in self.app.toolbox.tools_by_id: log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) @@ -294,7 +291,10 @@ class JobWrapper( object ): self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) - + + def get_job( self ): + return self.sa_session.query( model.Job ).get( self.job_id ) + def get_param_dict( self ): """ Restore the dictionary of parameters from the database. @@ -479,9 +479,9 @@ class JobWrapper( object ): self.fail( job.info ) return if stderr: - job.state = "error" + job.state = job.states.ERROR else: - job.state = 'ok' + job.state = job.states.OK if self.app.config.outputs_to_working_directory: for dataset_path in self.get_output_fnames(): try: @@ -761,6 +761,215 @@ class JobWrapper( object ): else: return 'anonymous@unknown' +class TaskWrapper(JobWrapper): + """ + Extension of JobWrapper intended for running tasks. + Should be refactored into a generalized executable unit wrapper parent, then jobs and tasks. + """ + # Abstract this to be more useful for running tasks that *don't* necessarily compose a job. + + def __init__(self, task, queue): + super(TaskWrapper, self).__init__(task.job, queue) + self.task_id = task.id + self.parallelism = None + if task.part_file: + #do this better + self.working_directory = os.path.dirname(task.part_file) + else: + self.working_directory = None + self.status = task.states.NEW + + def get_job( self ): + if self.job_id: + return self.sa_session.query( model.Job ).get( self.job_id ) + else: + return None + + def get_task( self ): + return self.sa_session.query(model.Task).get(self.task_id) + + def get_param_dict( self ): + """ + Restore the dictionary of parameters from the database. + """ + job = self.sa_session.query( model.Job ).get( self.job_id ) + param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) + param_dict = self.tool.params_from_strings( param_dict, self.app ) + return param_dict + + def prepare( self ): + """ + Prepare the job to run by creating the working directory and the + config files. + """ + # Restore parameters from the database + job = self.get_job() + task = self.get_task() + if job.user is None and job.galaxy_session is None: + raise Exception( 'Job %s has no user and no session.' % job.id ) + incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) + incoming = self.tool.params_from_strings( incoming, self.app ) + # Do any validation that could not be done at job creation + self.tool.handle_unvalidated_param_values( incoming, self.app ) + # Restore input / output data lists + inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) + # DBTODO New method for generating command line for a task? + out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) + out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) + # These can be passed on the command line if wanted as $userId $userEmail + if job.history and job.history.user: # check for anonymous user! + userId = '%d' % job.history.user.id + userEmail = str(job.history.user.email) + else: + userId = 'Anonymous' + userEmail = 'Anonymous' + incoming['userId'] = userId + incoming['userEmail'] = userEmail + # Build params, done before hook so hook can use + param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) + fnames = {} + for v in self.get_input_fnames(): + fnames[v] = os.path.join(self.working_directory, os.path.basename(v)) + for dp in [x.real_path for x in self.get_output_fnames()]: + fnames[dp] = os.path.join(self.working_directory, os.path.basename(dp)) + # Certain tools require tasks to be completed prior to job execution + # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). + self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) + # Run the before queue ("exec_before_job") hook + self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, + out_data=out_data, tool=self.tool, param_dict=incoming) + self.sa_session.flush() + # Build any required config files + config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) + # FIXME: Build the param file (might return None, DEPRECATED) + param_filename = self.tool.build_param_file( param_dict, self.working_directory ) + # Build the job's command line + self.command_line = self.tool.build_command_line( param_dict ) + # HACK, Fix this when refactored. + for k, v in fnames.iteritems(): + self.command_line = self.command_line.replace(k, v) + # FIXME: for now, tools get Galaxy's lib dir in their path + if self.command_line and self.command_line.startswith( 'python' ): + self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root + # Shell fragment to inject dependencies + if self.app.config.use_tool_dependencies: + self.dependency_shell_commands = self.tool.build_dependency_shell_commands() + else: + self.dependency_shell_commands = None + # We need command_line persisted to the db in order for Galaxy to re-queue the job + # if the server was stopped and restarted before the job finished + task.command_line = self.command_line + self.sa_session.add( task ) + self.sa_session.flush() + # # Return list of all extra files + extra_filenames = config_filenames + if param_filename is not None: + extra_filenames.append( param_filename ) + self.param_dict = param_dict + self.extra_filenames = extra_filenames + self.status = 'prepared' + return extra_filenames + + def fail( self, message, exception=False ): + log.error("TaskWrapper Failure %s" % message) + self.status = 'error' + # How do we want to handle task failure? Fail the job and let it clean up? + + def change_state( self, state, info = False ): + task = self.get_task() + self.sa_session.refresh( task ) + if info: + task.info = info + task.state = state + self.sa_session.add( task ) + self.sa_session.flush() + + def get_state( self ): + task = self.get_task() + self.sa_session.refresh( task ) + return task.state + + def set_runner( self, runner_url, external_id ): + task = self.get_task() + self.sa_session.refresh( task ) + task.task_runner_name = runner_url + task.task_runner_external_id = external_id + # DBTODO Check task job_runner_stuff + self.sa_session.add( task ) + self.sa_session.flush() + + def finish( self, stdout, stderr ): + # DBTODO integrate previous finish logic. + # Simple finish for tasks. Just set the flag OK. + log.debug( 'task %s for job %d ended' % (self.task_id, self.job_id) ) + """ + Called to indicate that the associated command has been run. Updates + the output datasets based on stderr and stdout from the command, and + the contents of the output files. + """ + # default post job setup_external_metadata + self.sa_session.expunge_all() + task = self.get_task() + # if the job was deleted, don't finish it + if task.state == task.states.DELETED: + self.cleanup() + return + elif task.state == task.states.ERROR: + # Job was deleted by an administrator + self.fail( task.info ) + return + if stderr: + task.state = task.states.ERROR + else: + task.state = task.states.OK + # Save stdout and stderr + if len( stdout ) > 32768: + log.error( "stdout for task %d is greater than 32K, only first part will be logged to database" % task.id ) + task.stdout = stdout[:32768] + if len( stderr ) > 32768: + log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % task.id ) + task.stderr = stderr[:32768] + task.command_line = self.command_line + self.sa_session.flush() + log.debug( 'task %d ended' % self.task_id ) + + def cleanup( self ): + # There is no task cleanup. The job cleans up for all tasks. + pass + + def get_command_line( self ): + return self.command_line + + def get_session_id( self ): + return self.session_id + + def get_output_file_id( self, file ): + # There is no permanent output file for tasks. + return None + + def get_tool_provided_job_metadata( self ): + # DBTODO Handle this as applicable for tasks. + return None + + def get_dataset_finish_context( self, job_context, dataset ): + # Handled at the parent job level. Do nothing here. + pass + + def check_output_sizes( self ): + sizes = [] + output_paths = self.get_output_fnames() + for outfile in [ str( o ) for o in output_paths ]: + sizes.append( ( outfile, os.stat( outfile ).st_size ) ) + return sizes + + def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): + # There is no metadata setting for tasks. This is handled after the merge, at the job level. + pass + + @property + def user( self ): + pass + class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app @@ -768,10 +977,15 @@ class DefaultJobDispatcher( object ): start_job_runners = ["local"] if app.config.start_job_runners is not None: start_job_runners.extend( app.config.start_job_runners.split(",") ) + if app.config.use_tasked_jobs: + start_job_runners.append("tasks") for runner_name in start_job_runners: if runner_name == "local": import runners.local self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) + elif runner_name == "tasks": + import runners.tasks + self.job_runners[runner_name] = runners.tasks.TaskedJobRunner( app ) elif runner_name == "pbs": import runners.pbs self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) @@ -782,12 +996,17 @@ class DefaultJobDispatcher( object ): import runners.drmaa self.job_runners[runner_name] = runners.drmaa.DRMAAJobRunner( app ) else: - log.error( "Unable to start unknown job runner: %s" %runner_name ) + log.error( "Unable to start unknown job runner: '%s'" %runner_name ) def put( self, job_wrapper ): - runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) - self.job_runners[runner_name].put( job_wrapper ) + if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): + runner_name = "tasks" + log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) + else: + runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] + log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) def stop( self, job ): runner_name = ( job.job_runner_name.split(":", 1) )[0] --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -176,6 +176,44 @@ class Job( object ): dataset.peek = 'Job deleted' dataset.info = 'Job output deleted by user before job completed' +class Task( object ): + """ + A task represents a single component of a job. + """ + states = Bunch( NEW = 'new', + WAITING = 'waiting', + QUEUED = 'queued', + RUNNING = 'running', + OK = 'ok', + ERROR = 'error', + DELETED = 'deleted' ) + + def __init__( self, job, part_file = None ): + self.command_line = None + self.parameters = [] + self.state = Task.states.NEW + self.info = None + self.part_file = part_file + self.task_runner_name = None + self.task_runner_external_id = None + self.job = job + self.stdout = None + self.stderr = None + + def set_state( self, state ): + self.state = state + + def get_param_values( self, app ): + """ + Read encoded parameter values from the database and turn back into a + dict of tool parameter values. + """ + param_dict = dict( [ ( p.name, p.value ) for p in self.parent_job.parameters ] ) + tool = app.toolbox.tools_by_id[self.tool_id] + param_dict = tool.params_from_strings( param_dict, app ) + return param_dict + + class JobParameter( object ): def __init__( self, name, value ): self.name = name --- a/tools/samtools/sam_bitwise_flag_filter.xml +++ b/tools/samtools/sam_bitwise_flag_filter.xml @@ -1,5 +1,6 @@ <tool id="sam_bw_filter" name="Filter SAM" version="1.0.0"><description>on bitwise flag values</description> + <parallelism method="basic"></parallelism><command interpreter="python"> sam_bitwise_flag_filter.py --input_sam_file=$input1 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -367,7 +367,12 @@ class Tool: self.redirect_url_params = '' # Short description of the tool self.description = util.xml_text(root, "description") - # Job runner + # Parallelism for tasks, read from tool config. + parallelism = root.find("parallelism") + if parallelism is not None and parallelism.get("method"): + self.parallelism = parallelism.get("method") + else: + self.parallelism = None if self.app.config.start_job_runners is None: # Jobs are always local regardless of tool config if no additional # runners are started --- a/tools/sr_mapping/bwa_wrapper.xml +++ b/tools/sr_mapping/bwa_wrapper.xml @@ -1,5 +1,6 @@ <tool id="bwa_wrapper" name="Map with BWA" version="1.1.0"><description></description> + <parallelism method="basic"></parallelism><command interpreter="python"> bwa_wrapper.py --threads="4" --- /dev/null +++ b/lib/galaxy/jobs/runners/tasks.py @@ -0,0 +1,206 @@ +import logging +import subprocess +from Queue import Queue +import threading + +from galaxy import model + +import os, errno +from time import sleep + +from galaxy.jobs import TaskWrapper + +log = logging.getLogger( __name__ ) + +class TaskedJobRunner( object ): + """ + Job runner backed by a finite pool of worker threads. FIFO scheduling + """ + STOP_SIGNAL = object() + def __init__( self, app ): + """Start the job runner with 'nworkers' worker threads""" + self.app = app + self.sa_session = app.model.context + # start workers + self.queue = Queue() + self.threads = [] + nworkers = app.config.local_task_queue_workers + log.info( "Starting tasked-job runners" ) + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.threads.append( worker ) + log.debug( "%d workers ready", nworkers ) + + def run_next( self ): + """Run the next job, waiting until one is available if neccesary""" + while 1: + job_wrapper = self.queue.get() + if job_wrapper is self.STOP_SIGNAL: + return + try: + self.run_job( job_wrapper ) + except: + log.exception( "Uncaught exception running tasked job" ) + + def run_job( self, job_wrapper ): + job_wrapper.set_runner( 'tasks:///', None ) + stderr = stdout = command_line = '' + # Prepare the job to run + try: + job_wrapper.prepare() + command_line = job_wrapper.get_command_line() + except: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + # If we were able to get a command line, run the job. ( must be passed to tasks ) + if command_line: + try: + # DBTODO read tool info and use the right kind of parallelism. + # For now, the only splitter is the 'basic' one, n-ways split on one input, one output. + # This is incredibly simplified. Parallelism ultimately needs to describe which inputs, how, etc. + job_wrapper.change_state( model.Job.states.RUNNING ) + self.sa_session.flush() + parent_job = job_wrapper.get_job() + # Split with the tool-defined method. + if job_wrapper.tool.parallelism == "basic": + from galaxy.jobs.splitters import basic + if len(job_wrapper.get_input_fnames()) > 1 or len(job_wrapper.get_output_fnames()) > 1: + log.error("The basic splitter is not capable of handling jobs with multiple inputs or outputs.") + job_wrapper.change_state( model.Job.states.ERROR ) + job_wrapper.fail("Job Splitting Failed, the basic splitter only handles tools with one input and one output") + # Requeue as a standard job? + return + input_file = job_wrapper.get_input_fnames()[0] + working_directory = job_wrapper.working_directory + # DBTODO execute an external task to do the splitting, this should happen at refactor. + # Regarding number of ways split, use "hints" in tool config? + # If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status. + basic.split(input_file, working_directory, + 20, #Needs serious experimentation to find out what makes the most sense. + parent_job.input_datasets[0].dataset.ext) + # Tasks in this parts list are in alphabetical listdir order (15 before 5), but that should not matter. + parts = [os.path.join(os.path.abspath(job_wrapper.working_directory), p, os.path.basename(input_file)) + for p in os.listdir(job_wrapper.working_directory) + if p.startswith('task_')] + else: + job_wrapper.change_state( model.Job.states.ERROR ) + job_wrapper.fail("Job Splitting Failed, no match for '%s'" % job_wrapper.tool.parallelism) + # Assemble parts into task_wrappers + + # Not an option for now. Task objects don't *do* anything useful yet, but we'll want them tracked outside this thread to do anything. + # if track_tasks_in_database: + tasks = [] + task_wrappers = [] + for part in parts: + task = model.Task(parent_job, part) + self.sa_session.add(task) + tasks.append(task) + self.sa_session.flush() + # Must flush prior to the creation and queueing of task wrappers. + for task in tasks: + tw = TaskWrapper(task, job_wrapper.queue) + task_wrappers.append(tw) + self.app.job_manager.dispatcher.put(tw) + tasks_incomplete = False + sleep_time = 1 + while tasks_incomplete is False: + tasks_incomplete = True + for tw in task_wrappers: + if not tw.get_state() == model.Task.states.OK: + tasks_incomplete = False + sleep( sleep_time ) + if sleep_time < 8: + sleep_time *= 2 + output_filename = job_wrapper.get_output_fnames()[0].real_path + basic.merge(working_directory, output_filename) + log.debug('execution finished: %s' % command_line) + for tw in task_wrappers: + stdout += tw.get_task().stdout + stderr += tw.get_task().stderr + except Exception: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + + #run the metadata setting script here + #this is terminate-able when output dataset/job is deleted + #so that long running set_meta()s can be canceled without having to reboot the server + if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and self.app.config.set_metadata_externally and job_wrapper.output_paths: + external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), + set_extension = True, + kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior + log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) + external_metadata_proc = subprocess.Popen( args = external_metadata_script, + shell = True, + env = os.environ, + preexec_fn = os.setpgrp ) + job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) + external_metadata_proc.wait() + log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) + + # Finish the job + try: + job_wrapper.finish( stdout, stderr ) + except: + log.exception("Job wrapper finish method failed") + job_wrapper.fail("Unable to finish job", exception=True) + + def put( self, job_wrapper ): + """Add a job to the queue (by job identifier)""" + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.queue.put( job_wrapper ) + + def shutdown( self ): + """Attempts to gracefully shut down the worker threads""" + log.info( "sending stop signal to worker threads" ) + for i in range( len( self.threads ) ): + self.queue.put( self.STOP_SIGNAL ) + log.info( "local job runner stopped" ) + + def check_pid( self, pid ): + # DBTODO Need to check all subtask pids and return some sort of cumulative result. + return True + try: + os.kill( pid, 0 ) + return True + except OSError, e: + if e.errno == errno.ESRCH: + log.debug( "check_pid(): PID %d is dead" % pid ) + else: + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) ) + return False + + def stop_job( self, job ): + # DBTODO Call stop on all of the tasks. + #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished + if job.external_output_metadata: + pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + else: + pid = job.job_runner_external_id + if pid in [ None, '' ]: + log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) + return + pid = int( pid ) + if not self.check_pid( pid ): + log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) + return + for sig in [ 15, 9 ]: + try: + os.killpg( pid, sig ) + except OSError, e: + log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) + return # give up + sleep( 2 ) + if not self.check_pid( pid ): + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) ) + return + else: + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) ) + + def recover( self, job, job_wrapper ): + # DBTODO Task Recovery, this should be possible. + job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) + --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -347,6 +347,11 @@ use_interactive = True # Necessary if you're running the load balanced setup. #track_jobs_in_database = False +# This enables splitting of jobs into tasks, if specified by the particular tool config. +# This is a new feature and not recommended for production servers yet. +#use_tasked_jobs = True +#local_task_queue_workers = 2 + # Enable job recovery (if Galaxy is restarted while cluster jobs are running, # it can "recover" them when it starts). This is not safe to use if you are # running more than one Galaxy server using the same database. --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -391,7 +391,24 @@ JobImportHistoryArchive.table = Table( " Column( "history_id", Integer, ForeignKey( "history.id" ), index=True ), Column( "archive_dir", TEXT ) ) - + +Task.table = Table( "task", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "execution_time", DateTime ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "state", String( 64 ), index=True ), + Column( "command_line", TEXT ), + Column( "param_filename", String( 1024 ) ), + Column( "runner_name", String( 255 ) ), + Column( "stdout", TEXT ), + Column( "stderr", TEXT ), + Column( "traceback", TEXT ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True, nullable=False ), + Column( "part_file", String(1024)), + Column( "task_runner_name", String( 255 ) ), + Column( "task_runner_external_id", String( 255 ) ) ) + PostJobAction.table = Table("post_job_action", metadata, Column("id", Integer, primary_key=True), Column("workflow_step_id", Integer, ForeignKey( "workflow_step.id" ), index=True, nullable=False), @@ -1251,6 +1268,10 @@ assign_mapper( context, Job, Job.table, output_library_datasets=relation( JobToOutputLibraryDatasetAssociation, lazy=False ), external_output_metadata = relation( JobExternalOutputMetadata, lazy = False ) ) ) +assign_mapper( context, Task, Task.table, + properties=dict( job = relation( Job ))) + + assign_mapper( context, Event, Event.table, properties=dict( history=relation( History ), galaxy_session=relation( GalaxySession ), --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -4,7 +4,6 @@ Universe configuration builder. import sys, os import logging, logging.config -from optparse import OptionParser import ConfigParser from galaxy.util import string_as_bool @@ -76,6 +75,9 @@ class Configuration( object ): self.error_email_to = kwargs.get( 'error_email_to', None ) self.smtp_server = kwargs.get( 'smtp_server', None ) self.start_job_runners = kwargs.get( 'start_job_runners', None ) + # Tasked job runner. + self.use_tasked_jobs = string_as_bool( kwargs.get( 'use_tasked_jobs', False ) ) + self.local_task_queue_workers = int(kwargs.get("local_task_queue_workers", 2)) self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) self.pbs_application_server = kwargs.get('pbs_application_server', "" ) self.pbs_dataset_server = kwargs.get('pbs_dataset_server', "" ) --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0061_tasks.py @@ -0,0 +1,49 @@ +""" +Migration script to create tables task management. +""" +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import datetime + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData( migrate_engine ) +db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) ) +now = datetime.datetime.utcnow + +Task_table = Table( "task", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "execution_time", DateTime ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "state", String( 64 ), index=True ), + Column( "command_line", TEXT ), + Column( "param_filename", String( 1024 ) ), + Column( "runner_name", String( 255 ) ), + Column( "stdout", TEXT ), + Column( "stderr", TEXT ), + Column( "traceback", TEXT ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True, nullable=False ), + Column( "part_file", String(1024)), + Column( "task_runner_name", String( 255 ) ), + Column( "task_runner_external_id", String( 255 ) ) ) + +tables = [Task_table] + +def upgrade(): + print __doc__ + metadata.reflect() + for table in tables: + try: + table.create() + except: + log.warn( "Failed to create table '%s', ignoring (might result in wrong schema)" % table.name ) + +def downgrade(): + metadata.reflect() + for table in tables: + table.drop() --- /dev/null +++ b/lib/galaxy/jobs/splitters/basic.py @@ -0,0 +1,85 @@ +import os, logging +log = logging.getLogger( __name__ ) + +def _file_len(fname): + i = 0 + with open(fname) as f: + for i, l in enumerate(f): + pass + return i + 1 + +def _fq_seq_count(fname): + count = 0 + with open(fname) as f: + for i, l in enumerate(f): + if l.startswith('@'): + count += 1 + return count + +def split_fq(input_file, working_directory, parts): + # Temporary, switch this to use the fq reader in lib/galaxy_utils/sequence. + outputs = [] + length = _fq_seq_count(input_file) + if length < 1: + return outputs + if length < parts: + parts = length + len_each, remainder = divmod(length, parts) + with open(input_file, 'rt') as f: + for p in range(0, parts): + part_dir = os.path.join( os.path.abspath(working_directory), 'task_%s' % p) + if not os.path.exists( part_dir ): + os.mkdir( part_dir ) + part_path = os.path.join(part_dir, os.path.basename(input_file)) + with open(part_path, 'w') as part_file: + for l in range(0, len_each): + part_file.write(f.readline()) + part_file.write(f.readline()) + part_file.write(f.readline()) + part_file.write(f.readline()) + if remainder > 0: + part_file.write(f.readline()) + part_file.write(f.readline()) + part_file.write(f.readline()) + part_file.write(f.readline()) + remainder -= 1 + outputs.append(part_path) + return outputs + +def split_txt(input_file, working_directory, parts): + outputs = [] + length = _file_len(input_file) + if length < parts: + parts = length + len_each, remainder = divmod(length, parts) + with open(input_file, 'rt') as f: + for p in range(0, parts): + part_dir = os.path.join( os.path.abspath(working_directory), 'task_%s' % p) + if not os.path.exists( part_dir ): + os.mkdir( part_dir ) + part_path = os.path.join(part_dir, os.path.basename(input_file)) + with open(part_path, 'w') as part_file: + for l in range(0, len_each): + part_file.write(f.readline()) + if remainder > 0: + part_file.write(f.readline()) + remainder -= 1 + outputs.append(part_path) + return outputs + +def split( input_file, working_directory, parts, file_type = None): + #Implement a better method for determining how to split. + if file_type.startswith('fastq'): + return split_fq(input_file, working_directory, parts) + else: + return split_txt(input_file, working_directory, parts) + +def merge( working_directory, output_file ): + output_file_name = os.path.basename(output_file) + task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')] + task_dirs.sort(key = lambda x: int(x.split('task_')[-1])) + for task_dir in task_dirs: + try: + os.system( 'cat %s >> %s' % ( os.path.join(task_dir, output_file_name), output_file ) ) + except Exception, e: + log.error(str(e))