commit/galaxy-central: 11 new changesets
11 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/a841d01847df/ Changeset: a841d01847df User: jmchilton Date: 2014-03-26 17:01:46 Summary: Unit tests to exercise job and task mapping. Affected #: 1 file diff -r 09985439d17f9bef026554938b05d0d6eedd06cb -r a841d01847dffac0f554f9730ec45829c38c05e8 test/unit/test_galaxy_mapping.py --- a/test/unit/test_galaxy_mapping.py +++ b/test/unit/test_galaxy_mapping.py @@ -210,6 +210,29 @@ assert hist1.name == "History 2b" # gvk TODO need to ad test for GalaxySessions, but not yet sure what they should look like. + def test_jobs( self ): + model = self.model + u = model.User( email="jobtest@foo.bar.baz", password="password" ) + job = model.Job() + job.user = u + job.tool_id = "cat1" + + self.persist( u, job ) + + loaded_job = model.session.query( model.Job ).filter( model.Job.user == u ).first() + assert loaded_job.tool_id == "cat1" + + def test_tasks( self ): + model = self.model + u = model.User( email="jobtest@foo.bar.baz", password="password" ) + job = model.Job() + task = model.Task( job=job, working_directory="/tmp", prepare_files_cmd="split.sh" ) + job.user = u + self.persist( u, job, task ) + + loaded_task = model.session.query( model.Task ).filter( model.Task.job == job ).first() + assert loaded_task.prepare_input_files_cmd == "split.sh" + def test_history_contents( self ): model = self.model u = model.User( email="contents@foo.bar.baz", password="password" ) https://bitbucket.org/galaxy/galaxy-central/commits/ad576bf6497e/ Changeset: ad576bf6497e User: jmchilton Date: 2014-03-26 17:01:46 Summary: Layout models and mapping for a job metrics plugin framework. Affected #: 4 files diff -r a841d01847dffac0f554f9730ec45829c38c05e8 -r ad576bf6497ec40b64d84802bda596bb63af63af lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -17,6 +17,7 @@ import json import socket import time +import numbers from uuid import UUID, uuid4 from string import Template from itertools import ifilter @@ -87,6 +88,21 @@ return name +class HasJobMetrics: + + def _init_metrics( self ): + self.text_metrics = [] + self.numeric_metrics = [] + + def add_metric( self, plugin, metric_name, metric_value ): + if isinstance( metric_value, numbers.Number ): + metric = self._numeric_metric( plugin, metric_name, metric_value ) + self.numeric_metrics.append( metric ) + else: + metric = self._text_metric( plugin, metric_name, metric_value ) + self.text_metrics.append( metric ) + + class User( object, Dictifiable ): use_pbkdf2 = True """ @@ -226,7 +242,31 @@ return Template( in_string ).safe_substitute( environment ) -class Job( object, Dictifiable ): +class BaseJobMetric( object ): + + def __init__( self, plugin, metric_name, metric_value ): + self.plugin = plugin + self.metric_name = metric_name + self.metric_value = metric_value + + +class JobMetricText( BaseJobMetric ): + pass + + +class JobMetricNumeric( BaseJobMetric ): + pass + + +class TaskMetricText( BaseJobMetric ): + pass + + +class TaskMetricNumeric( BaseJobMetric ): + pass + + +class Job( object, HasJobMetrics, Dictifiable ): dict_collection_visible_keys = [ 'id', 'state', 'exit_code', 'update_time', 'create_time' ] dict_element_visible_keys = [ 'id', 'state', 'exit_code', 'update_time', 'create_time' ] @@ -234,6 +274,9 @@ A job represents a request to run a tool given input datasets, tool parameters, and output datasets. """ + _numeric_metric = JobMetricNumeric + _text_metric = JobMetricText + states = Bunch( NEW = 'new', UPLOAD = 'upload', WAITING = 'waiting', @@ -267,6 +310,7 @@ self.imported = False self.handler = None self.exit_code = None + self._init_metrics() @property def finished( self ): @@ -472,10 +516,14 @@ return rval -class Task( object ): + +class Task( object, HasJobMetrics ): """ A task represents a single component of a job. """ + _numeric_metric = TaskMetricNumeric + _text_metric = TaskMetricText + states = Bunch( NEW = 'new', WAITING = 'waiting', QUEUED = 'queued', @@ -498,6 +546,7 @@ self.stderr = "" self.exit_code = None self.prepare_input_files_cmd = prepare_files_cmd + self._init_metrics() def get_param_values( self, app ): """ @@ -608,6 +657,7 @@ def set_prepare_input_files_cmd( self, prepare_input_files_cmd ): self.prepare_input_files_cmd = prepare_input_files_cmd + class JobParameter( object ): def __init__( self, name, value ): self.name = name diff -r a841d01847dffac0f554f9730ec45829c38c05e8 -r ad576bf6497ec40b64d84802bda596bb63af63af lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -470,6 +470,52 @@ Column( "archive_dir", TEXT ) ) + +JOB_METRIC_MAX_LENGTH = 1023 + +model.JobMetricText.table = Table( + "job_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(JOB_METRIC_MAX_LENGTH), ), +) + +model.TaskMetricText.table = Table( + "task_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(JOB_METRIC_MAX_LENGTH), ), +) + + +model.JobMetricNumeric.table = Table( + "job_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + +model.TaskMetricNumeric.table = Table( + "task_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + model.GenomeIndexToolData.table = Table( "genome_index_tool_data", metadata, Column( "id", Integer, primary_key=True ), Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), @@ -1569,6 +1615,26 @@ job=relation( model.Job ), dataset=relation( model.LibraryDatasetDatasetAssociation, lazy=False ) ) ) +simple_mapping( + model.JobMetricText, + job=relation( model.Job, backref="text_metrics" ), +) + +simple_mapping( + model.TaskMetricText, + task=relation( model.Task, backref="text_metrics" ), +) + +simple_mapping( + model.JobMetricNumeric, + job=relation( model.Job, backref="numeric_metrics" ), +) + +simple_mapping( + model.TaskMetricNumeric, + task=relation( model.Task, backref="numeric_metrics" ), +) + mapper( model.JobParameter, model.JobParameter.table ) mapper( model.JobExternalOutputMetadata, model.JobExternalOutputMetadata.table, diff -r a841d01847dffac0f554f9730ec45829c38c05e8 -r ad576bf6497ec40b64d84802bda596bb63af63af lib/galaxy/model/migrate/versions/0119_job_metrics.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0119_job_metrics.py @@ -0,0 +1,102 @@ +""" +Migration script for job metric plugins. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import datetime +now = datetime.datetime.utcnow + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData() + +TEXT_METRIC_MAX_LENGTH = 1023 + +JobMetricText_table = Table( + "job_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(TEXT_METRIC_MAX_LENGTH), ), +) + + +TaskMetricText_table = Table( + "task_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(TEXT_METRIC_MAX_LENGTH), ), +) + + +JobMetricNumeric_table = Table( + "job_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + +TaskMetricNumeric_table = Table( + "task_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + +TABLES = [ + JobMetricText_table, + TaskMetricText_table, + JobMetricNumeric_table, + TaskMetricNumeric_table, +] + + +def upgrade( migrate_engine ): + metadata.bind = migrate_engine + print __doc__ + metadata.reflect() + + for table in TABLES: + __create(table) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + for table in TABLES: + __drop(table) + + +def __create(table): + try: + table.create() + except Exception as e: + print str(e) + log.debug("Creating %s table failed: %s" % (table.name, str( e ) ) ) + + +def __drop(table): + try: + table.drop() + except Exception as e: + print str(e) + log.debug("Dropping %s table failed: %s" % (table.name, str( e ) ) ) diff -r a841d01847dffac0f554f9730ec45829c38c05e8 -r ad576bf6497ec40b64d84802bda596bb63af63af test/unit/test_galaxy_mapping.py --- a/test/unit/test_galaxy_mapping.py +++ b/test/unit/test_galaxy_mapping.py @@ -222,6 +222,18 @@ loaded_job = model.session.query( model.Job ).filter( model.Job.user == u ).first() assert loaded_job.tool_id == "cat1" + def test_job_metrics( self ): + model = self.model + u = model.User( email="jobtest@foo.bar.baz", password="password" ) + job = model.Job() + job.user = u + job.tool_id = "cat1" + + job.add_metric( "gx", "galaxy_slots", 5 ) + job.add_metric( "system", "system_name", "localhost" ) + + self.persist( u, job ) + def test_tasks( self ): model = self.model u = model.User( email="jobtest@foo.bar.baz", password="password" ) https://bitbucket.org/galaxy/galaxy-central/commits/3ffd1ca6ac44/ Changeset: 3ffd1ca6ac44 User: jmchilton Date: 2014-03-26 17:01:46 Summary: Implement plugin framework for collecting data about runtime job execution. An example job_metrics_conf.xml.sample is included that describes which plugins are enabled and how they are configured. This will be updated for each new plugin added. By default not instrumentation or data collection occurs - but if a job_metrics.xml file is present it will serve as the default for all job destination. Additionally, individual job destinations may disable, load a different job metrics file, or define metrics directly in job_conf.xml in an embedded fashion. See comment at top of job_metrics_conf.xml for more information. This commit include an initial plugin (named 'core') to demonstrate the framework and capture the highest priority data - namely the number of cores allocated to the job and the runtime of the job on the cluster. These two pieces of information alone should provide a much clearer picture of what Galaxy is actually allocating cluster compute cycles to. Current limitations - This only works with job runners utilizing the job script module and the LWR (it utilizes the job script module on the remote server), hence it won't yet work with... - Local job runner - I do have a downstream fork of Galaxy where I have reworked the local job runner to use the common job script template. https://github.com/jmchilton/galaxy-central/commits/local_job_script https://github.com/jmchilton/galaxy-central/commit/949db2cd14c7191cedf1febeb... - CLI runner - CLI runner needs to be reworked to use the job script module anyway so GALAXY_SLOTS works - the LWR version of the CLI runner uses the job script module - this work just needs to be back ported to Galaxy. If a job_metrics_conf.xml is present and some jobs route to the above destinations - the jobs won't fail but annoying errors will appear in the logs. Simply attach a 'metrics="off"' those these specific job destinations to disable any attempt to use metrics for these jobs and disable these errors. Affected #: 15 files diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -62,6 +62,11 @@ <!-- Warning: Local slot count doesn't tie up additional worker threads, to prevent over allocating machine define a second local runner with different name and fewer workers to run this destination. --> + <job_metrics /> + <!-- Above element demonstrates embedded job metrics definition - see + job_metrics_conf.xml.sample for full documentation on possible nested + elements. This empty block will simply disable job metrics for the + corresponding destination. --></destination><destination id="pbs" runner="pbs" tags="mycluster"/><destination id="pbs_longjobs" runner="pbs" tags="mycluster,longjobs"> diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 job_metrics_conf.xml.sample --- /dev/null +++ b/job_metrics_conf.xml.sample @@ -0,0 +1,18 @@ +<?xml version="1.0"?> +<!-- If job_metrics.xml exists, this file will define the default job metric + plugin used for all jobs. Individual job_conf.xml destinations can + disable metric collection by setting metrics="off" on that destination. + The metrics attribute on destination definition elements can also be + a path - in which case that XML metrics file will be loaded and used for + that destination. Finally, the destination element may contain a job_metrics + child element (with all options defined below) to define job metrics in an + embedded manner directly in the job_conf.xml file. +--> +<job_metrics> + <!-- Each element in this file corresponds to a job instrumentation plugin + used to generate metrics in lib/galaxy/jobs/metrics/instrumenters. --> + + <!-- Core plugin captures Galaxy slots, start and end of job (in seconds + since epoch) and computes runtime in seconds. --> + <core /> +</job_metrics> diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -15,7 +15,7 @@ from galaxy.sample_tracking import external_service_types from galaxy.openid.providers import OpenIDProviders from galaxy.tools.data_manager.manager import DataManagers - +from galaxy.jobs import metrics as job_metrics from galaxy.web.base import pluginframework import logging @@ -60,6 +60,10 @@ self._configure_tool_data_tables( from_shed_config=False ) + # Initialize job metrics manager, needs to be in place before + # config so per-destination modifications can be made. + self.job_metrics = job_metrics.JobMetrics( self.config.job_metrics_config_file, app=self ) + # Initialize the job management configuration self.job_config = jobs.JobConfiguration(self) diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -137,6 +137,7 @@ self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root ) self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root ) self.dependency_resolvers_config_file = resolve_path( kwargs.get( 'dependency_resolvers_config_file', 'dependency_resolvers_conf.xml' ), self.root ) + self.job_metrics_config_file = resolve_path( kwargs.get( 'job_metrics_config_file', 'job_metrics_conf.xml' ), self.root ) self.job_config_file = resolve_path( kwargs.get( 'job_config_file', 'job_conf.xml' ), self.root ) self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) ) self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) ) @@ -453,6 +454,11 @@ admin_users = [ x.strip() for x in self.get( "admin_users", "" ).split( "," ) ] return ( user is not None and user.email in admin_users ) + def resolve_path( self, path ): + """ Resolve a path relative to Galaxy's root. + """ + return resolve_path( path, self.root ) + def get_database_engine_options( kwargs, model_prefix='' ): """ Allow options for the SQLAlchemy database engine to be passed by using diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -162,8 +162,21 @@ # Parse destinations destinations = root.find('destinations') + job_metrics = self.app.job_metrics for destination in self.__findall_with_required(destinations, 'destination', ('id', 'runner')): id = destination.get('id') + destination_metrics = destination.get( "metrics", None ) + if destination_metrics: + if not util.asbool( destination_metrics ): + # disable + job_metrics.set_destination_instrumenter( id, None ) + else: + metrics_conf_path = self.app.config.resolve_path( destination_metrics ) + job_metrics.set_destination_conf_file( id, metrics_conf_path ) + else: + metrics_elements = self.__findall_with_required( destination, 'job_metrics', () ) + if metrics_elements: + job_metrics.set_destination_conf_element( id, metrics_elements[ 0 ] ) job_destination = JobDestination(**dict(destination.items())) job_destination['params'] = self.__get_params(destination) self.destinations[id] = (job_destination,) @@ -1068,8 +1081,10 @@ # Finally set the job state. This should only happen *after* all # dataset creation, and will allow us to eliminate force_history_refresh. job.state = final_job_state + if not job.tasks: + # If job was composed of tasks, don't attempt to recollect statisitcs + self._collect_metrics( job ) self.sa_session.flush() - log.debug( 'job %d ended' % self.job_id ) delete_files = self.app.config.cleanup_job == 'always' or ( job.state == job.states.OK and self.app.config.cleanup_job == 'onsuccess' ) self.cleanup( delete_files=delete_files ) @@ -1094,6 +1109,16 @@ except: log.exception( "Unable to cleanup job %d" % self.job_id ) + def _collect_metrics( self, has_metrics ): + job = has_metrics.get_job() + per_plugin_properties = self.app.job_metrics.collect_properties( job.destination_id, self.job_id, self.working_directory ) + if per_plugin_properties: + log.info( "Collecting job metrics for %s" % has_metrics ) + for plugin, properties in per_plugin_properties.iteritems(): + for metric_name, metric_value in properties.iteritems(): + if metric_value is not None: + has_metrics.add_metric( plugin, metric_name, metric_value ) + def get_output_sizes( self ): sizes = [] output_paths = self.get_output_fnames() @@ -1508,6 +1533,7 @@ task.stdout = util.shrink_string_by_size( stdout, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) if len( stderr ) > DATABASE_MAX_STRING_SIZE: log.error( "stderr for task %d is greater than %s, only a portion will be logged to database" % ( task.id, DATABASE_MAX_STRING_SIZE_PRETTY ) ) + self._collect_metrics( task ) task.stderr = util.shrink_string_by_size( stderr, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) task.exit_code = tool_exit_code task.command_line = self.command_line diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/metrics/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/__init__.py @@ -0,0 +1,134 @@ +import collections +import os + +from xml.etree import ElementTree + +from galaxy.util.submodules import submodules +from galaxy import util + +from ..metrics import formatting + +import logging +log = logging.getLogger( __name__ ) + + +DEFAULT_FORMATTER = formatting.JobMetricFormatter() + + +class JobMetrics( object ): + + def __init__( self, conf_file=None, **kwargs ): + """ + """ + self.plugin_classes = self.__plugins_dict() + self.default_job_instrumenter = JobInstrumenter.from_file( self.plugin_classes, conf_file, **kwargs ) + self.job_instrumenters = collections.defaultdict( lambda: self.default_job_instrumenter ) + + def format( self, plugin, key, value ): + if plugin in self.plugin_classes: + plugin_class = self.plugin_classes[ plugin ] + formatter = plugin_class.formatter + else: + formatter = DEFAULT_FORMATTER + return formatter.format( key, value ) + + def set_destination_conf_file( self, destination_id, conf_file ): + instrumenter = JobInstrumenter.from_file( self.plugin_classes, conf_file ) + self.set_destination_instrumenter( destination_id, instrumenter ) + + def set_destination_conf_element( self, destination_id, element ): + instrumenter = JobInstrumenter( self.plugin_classes, element ) + self.set_destination_instrumenter( destination_id, instrumenter ) + + def set_destination_instrumenter( self, destination_id, job_instrumenter=None ): + if job_instrumenter is None: + job_instrumenter = NULL_JOB_INSTRUMENTER + self.job_instrumenters[ destination_id ] = job_instrumenter + + def collect_properties( self, destination_id, job_id, job_directory ): + return self.job_instrumenters[ destination_id ].collect_properties( job_id, job_directory ) + + def __plugins_dict( self ): + plugin_dict = {} + for plugin_module in self.__plugin_modules(): + for clazz in plugin_module.__all__: + plugin_type = getattr( clazz, 'plugin_type', None ) + if plugin_type: + plugin_dict[ plugin_type ] = clazz + return plugin_dict + + def __plugin_modules( self ): + import galaxy.jobs.metrics.instrumenters + return submodules( galaxy.jobs.metrics.instrumenters ) + + +class NullJobInstrumenter( object ): + + def pre_execute_commands( self, job_directory ): + return None + + def post_execute_commands( self, job_directory ): + return None + + def collect_properties( self, job_id, job_directory ): + return {} + +NULL_JOB_INSTRUMENTER = NullJobInstrumenter() + + +class JobInstrumenter( object ): + + def __init__( self, plugin_classes, metrics_element, **kwargs ): + self.extra_kwargs = kwargs + self.plugin_classes = plugin_classes + self.plugins = self.__plugins_for_element( metrics_element ) + + def pre_execute_commands( self, job_directory ): + commands = [] + for plugin in self.plugins: + try: + plugin_commands = plugin.pre_execute_instrument( job_directory ) + if plugin_commands: + commands.extend( util.listify( plugin_commands ) ) + except Exception: + log.exception( "Failed to generate pre-execute commands for plugin %s" % plugin ) + return "\n".join( [ c for c in commands if c ] ) + + def post_execute_commands( self, job_directory ): + commands = [] + for plugin in self.plugins: + try: + plugin_commands = plugin.post_execute_instrument( job_directory ) + if plugin_commands: + commands.extend( util.listify( plugin_commands ) ) + except Exception: + log.exception( "Failed to generate post-execute commands for plugin %s" % plugin ) + return "\n".join( [ c for c in commands if c ] ) + + def collect_properties( self, job_id, job_directory ): + per_plugin_properites = {} + for plugin in self.plugins: + try: + properties = plugin.job_properties( job_id, job_directory ) + if properties: + per_plugin_properites[ plugin.plugin_type ] = properties + except Exception: + log.exception( "Failed to collect job properties for plugin %s" % plugin ) + return per_plugin_properites + + def __plugins_for_element( self, plugins_element ): + plugins = [] + for plugin_element in plugins_element.getchildren(): + plugin_type = plugin_element.tag + plugin_kwds = dict( plugin_element.items() ) + plugin_kwds.update( self.extra_kwargs ) + plugin = self.plugin_classes[ plugin_type ]( **plugin_kwds ) + plugins.append( plugin ) + return plugins + + @staticmethod + def from_file( plugin_classes, conf_file, **kwargs ): + if not conf_file or not os.path.exists( conf_file ): + return NULL_JOB_INSTRUMENTER + plugins_element = ElementTree.parse( conf_file ).getroot() + return JobInstrumenter( plugin_classes, plugins_element, **kwargs ) diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/metrics/formatting.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/formatting.py @@ -0,0 +1,18 @@ + + +class JobMetricFormatter( object ): + """ Format job metric key-value pairs for human consumption in Web UI. """ + + def format( self, key, value ): + return ( str( key ), str( value ) ) + + +## Formatting utilities + +def seconds_to_str( value ): + if value < 60: + return "%s seconds" % value + elif value < 3600: + return "%s minutes" % ( value / 60 ) + else: + return "%s days and %s minutes" % ( value / 3600, ( value % 3600 ) / 60 ) diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/metrics/instrumenters/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/__init__.py @@ -0,0 +1,53 @@ +from abc import ABCMeta +from abc import abstractmethod + +import os.path + +from ...metrics import formatting + + +INSTRUMENT_FILE_PREFIX = "__instrument" + + +class InstrumentPlugin( object ): + """ A plugin describing how to instrument Galaxy jobs and retrieve metrics + from this instrumentation. + """ + __metaclass__ = ABCMeta + formatter = formatting.JobMetricFormatter() + + @property + @abstractmethod + def plugin_type( self ): + """ Short string providing labelling this plugin """ + + def pre_execute_instrument( self, job_directory ): + """ Optionally return one or more commands to instrument job. These + commands will be executed on the compute server prior to the job + running. + """ + return None + + def post_execute_instrument( self, job_directory ): + """ Optionally return one or more commands to instrument job. These + commands will be executed on the compute server after the tool defined + command is ran. + """ + return None + + @abstractmethod + def job_properties( self, job_id, job_directory ): + """ Collect properties for this plugin from specified job directory. + This method will run on the Galaxy server and can assume files created + in job_directory with pre_execute_instrument and + post_execute_instrument are available. + """ + + def _instrument_file_name( self, name ): + """ Provide a common pattern for naming files used by instrumentation + plugins - to ease their staging out of remote job directories. + """ + return "%s_%s_%s" % ( INSTRUMENT_FILE_PREFIX, self.plugin_type, name ) + + def _instrument_file_path( self, job_directory, name ): + return os.path.join( job_directory, self._instrument_file_name( name ) ) diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/metrics/instrumenters/core.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/core.py @@ -0,0 +1,85 @@ +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting +import time + +import logging +log = logging.getLogger( __name__ ) + +GALAXY_SLOTS_KEY = "galaxy_slots" +START_EPOCH_KEY = "start_epoch" +END_EPOCH_KEY = "end_epoch" +RUNTIME_SECONDS_KEY = "runtime_seconds" + + +class CorePluginFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + value = int( value ) + if key == GALAXY_SLOTS_KEY: + return ( "Cores Allocated", "%d" % value ) + elif key == RUNTIME_SECONDS_KEY: + return ( "Job Runtime (Wall Clock)", formatting.seconds_to_str( value ) ) + else: + # TODO: Use localized version of this from universe_wsgi.ini + title = "Job Start Time" if key == START_EPOCH_KEY else "Job End Time" + return (title, time.strftime( '%Y-%m-%d %H:%M:%S', time.localtime( value ) ) ) + + +class CorePlugin( InstrumentPlugin ): + """ Simple plugin that collects data without external dependencies. In + particular it currently collects value set for Galaxy slots. + """ + plugin_type = "core" + formatter = CorePluginFormatter() + + def __init__( self, **kwargs ): + pass + + def pre_execute_instrument( self, job_directory ): + commands = [] + commands.append( self.__record_galaxy_slots_command( job_directory ) ) + commands.append( self.__record_seconds_since_epoch_to_file( job_directory, "start" ) ) + return commands + + def post_execute_instrument( self, job_directory ): + commands = [] + commands.append( self.__record_seconds_since_epoch_to_file( job_directory, "end" ) ) + return commands + + def job_properties( self, job_id, job_directory ): + galaxy_slots_file = self.__galaxy_slots_file( job_directory ) + + properties = {} + properties[ GALAXY_SLOTS_KEY ] = self.__read_integer( galaxy_slots_file ) + start = self.__read_seconds_since_epoch( job_directory, "start" ) + end = self.__read_seconds_since_epoch( job_directory, "end" ) + if start is not None and end is not None: + properties[ START_EPOCH_KEY ] = start + properties[ END_EPOCH_KEY ] = end + properties[ RUNTIME_SECONDS_KEY ] = end - start + return properties + + def __record_galaxy_slots_command( self, job_directory ): + galaxy_slots_file = self.__galaxy_slots_file( job_directory ) + return '''echo "$GALAXY_SLOTS" > '%s' ''' % galaxy_slots_file + + def __record_seconds_since_epoch_to_file( self, job_directory, name ): + path = self._instrument_file_path( job_directory, "epoch_%s" % name ) + return 'date +"%s" > ' + path + + def __read_seconds_since_epoch( self, job_directory, name ): + path = self._instrument_file_path( job_directory, "epoch_%s" % name ) + return self.__read_integer( path ) + + def __galaxy_slots_file( self, job_directory ): + return self._instrument_file_path( job_directory, "galaxy_slots" ) + + def __read_integer( self, path ): + value = None + try: + value = int( open( path, "r" ).read() ) + except Exception: + pass + return value + +__all__ = [ CorePlugin ] diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -251,7 +251,10 @@ log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) def get_job_file(self, job_wrapper, **kwds): + job_metrics = job_wrapper.app.job_metrics + job_instrumenter = job_metrics.job_instrumenters[ job_wrapper.job_destination.id ] options = dict( + job_instrumenter=job_instrumenter, galaxy_lib=job_wrapper.galaxy_lib_dir, env_setup_commands=job_wrapper.get_env_setup_clause(), working_directory=os.path.abspath( job_wrapper.working_directory ), diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh --- a/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -12,6 +12,8 @@ export PYTHONPATH fi $env_setup_commands +$instrument_pre_commands cd $working_directory $command echo $? > $exit_code_path +$instrument_post_commands diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/jobs/runners/util/job_script/__init__.py --- a/lib/galaxy/jobs/runners/util/job_script/__init__.py +++ b/lib/galaxy/jobs/runners/util/job_script/__init__.py @@ -18,6 +18,8 @@ 'headers': '', 'env_setup_commands': '', 'slots_statement': SLOTS_STATEMENT_CLUSTER_DEFAULT, + 'instrument_pre_commands': '', + 'instrument_post_commands': '', } @@ -47,6 +49,13 @@ """ if any([param not in kwds for param in REQUIRED_TEMPLATE_PARAMS]): raise Exception("Failed to create job_script, a required parameter is missing.") + job_instrumenter = kwds.get("job_instrumenter", None) + if job_instrumenter: + del kwds[ "job_instrumenter" ] + working_directory = kwds["working_directory"] + kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or '' + kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or '' + template_params = OPTIONAL_TEMPLATE_PARAMS.copy() template_params.update(**kwds) if not isinstance(template, Template): diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -102,6 +102,11 @@ metric = self._text_metric( plugin, metric_name, metric_value ) self.text_metrics.append( metric ) + @property + def metrics( self ): + # TODO: Make iterable, concatenate with chain + return self.text_metrics + self.numeric_metrics + class User( object, Dictifiable ): use_pbkdf2 = True @@ -414,6 +419,12 @@ self.info = info def set_runner_name( self, job_runner_name ): self.job_runner_name = job_runner_name + + def get_job( self ): + # Added so job and task have same interface (.get_job() ) to get at + # underlying job object. + return self + def set_runner_external_id( self, job_runner_external_id ): self.job_runner_external_id = job_runner_external_id def set_post_job_actions( self, post_job_actions ): diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 templates/show_params.mako --- a/templates/show_params.mako +++ b/templates/show_params.mako @@ -126,6 +126,13 @@ %if job and job.command_line and trans.user_is_admin(): <tr><td>Job Command-Line:</td><td>${ job.command_line | h }</td></tr> %endif + %if job and trans.user_is_admin(): + <% job_metrics = trans.app.job_metrics %> + %for metric in job.metrics: + <% metric_title, metric_value = job_metrics.format( metric.plugin, metric.metric_name, metric.metric_value ) %> + <tr><td>${ metric_title | h }</td><td>${ metric_value | h }</td></tr> + %endfor + %endif </table><br /> diff -r ad576bf6497ec40b64d84802bda596bb63af63af -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 test/unit/test_galaxy_mapping.py --- a/test/unit/test_galaxy_mapping.py +++ b/test/unit/test_galaxy_mapping.py @@ -234,6 +234,11 @@ self.persist( u, job ) + task = model.Task( job=job, working_directory="/tmp", prepare_files_cmd="split.sh" ) + task.add_metric( "gx", "galaxy_slots", 5 ) + task.add_metric( "system", "system_name", "localhost" ) + self.persist( task ) + def test_tasks( self ): model = self.model u = model.User( email="jobtest@foo.bar.baz", password="password" ) https://bitbucket.org/galaxy/galaxy-central/commits/2a7ff2234880/ Changeset: 2a7ff2234880 User: jmchilton Date: 2014-03-26 17:01:46 Summary: Implement env job instrumentation plugin. The 'env' plugin can collect all environment variables set at the time of job execution or just targetted environment variables. Environment variables like PATH or PYTHONPATH may be useful for debugging and environment variables such as HOSTNAME, PID, and DRM-specific variables like SLURM_JOB_ID could be useful for integrating Galaxy's view of jobs and users with external accounting/metric systems. See job_metrics.xml.sample for how to configure environment variable collection. Affected #: 2 files diff -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 -r 2a7ff223488017279b3bdaf854821a6500ff688c job_metrics_conf.xml.sample --- a/job_metrics_conf.xml.sample +++ b/job_metrics_conf.xml.sample @@ -15,4 +15,10 @@ <!-- Core plugin captures Galaxy slots, start and end of job (in seconds since epoch) and computes runtime in seconds. --><core /> + <!-- Uncomment following to enable plugin dumping complete environment + for each job, potentially useful for debuging --> + <!-- <env /> --> + <!-- env plugin can also record more targetted, obviously useful variables + as well. --> + <!-- <env variables="HOSTNAME,SLURM_CPUS_ON_NODE,SLURM_JOBID" /> --></job_metrics> diff -r 3ffd1ca6ac44736cba9e36abfaf3b1e607ddefb6 -r 2a7ff223488017279b3bdaf854821a6500ff688c lib/galaxy/jobs/metrics/instrumenters/env.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/env.py @@ -0,0 +1,54 @@ +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + +import logging +log = logging.getLogger( __name__ ) + + +class EnvFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + return ( "%s (runtime environment variable)" % key, value ) + + +class EnvPlugin( InstrumentPlugin ): + """ Instrumentation plugin capable of recording all or specific environment + variables for a job at runtime. + """ + plugin_type = "env" + formatter = EnvFormatter() + + def __init__( self, **kwargs ): + variables_str = kwargs.get( "variables", None ) + if variables_str: + variables = [ v.strip() for v in variables_str.split(",") ] + else: + variables = None + self.variables = variables + + def pre_execute_instrument( self, job_directory ): + """ Use env to dump all environment variables to a file. + """ + return "env > '%s'" % self.__env_file( job_directory ) + + def post_execute_instrument( self, job_directory ): + return None + + def job_properties( self, job_id, job_directory ): + """ Recover environment variables dumped out on compute server and filter + out specific variables if needed. + """ + variables = self.variables + + properties = {} + for line in open( self.__env_file( job_directory ) ).readlines(): + var, value = line.split( "=", 1 ) + if not variables or var in variables: + properties[ var ] = value + + return properties + + def __env_file( self, job_directory ): + return self._instrument_file_path( job_directory, "vars" ) + +__all__ = [ EnvPlugin ] https://bitbucket.org/galaxy/galaxy-central/commits/de4745c66e9d/ Changeset: de4745c66e9d User: jmchilton Date: 2014-03-26 17:01:46 Summary: Implement cpuinfo job instrumentation plugin. Affected #: 2 files diff -r 2a7ff223488017279b3bdaf854821a6500ff688c -r de4745c66e9d030a7df3a8eec9586fa9d65aa6c6 job_metrics_conf.xml.sample --- a/job_metrics_conf.xml.sample +++ b/job_metrics_conf.xml.sample @@ -15,6 +15,13 @@ <!-- Core plugin captures Galaxy slots, start and end of job (in seconds since epoch) and computes runtime in seconds. --><core /> + + <!-- Uncomment to dump processor count for each job - linux only. --> + <!-- <cpuinfo /> --> + <!-- Uncomment to dump information about all processors for for each + job - this is likely too much data. Linux only. --> + <!-- <cpuinfo verbose="true" /> --> + <!-- Uncomment following to enable plugin dumping complete environment for each job, potentially useful for debuging --><!-- <env /> --> diff -r 2a7ff223488017279b3bdaf854821a6500ff688c -r de4745c66e9d030a7df3a8eec9586fa9d65aa6c6 lib/galaxy/jobs/metrics/instrumenters/cpuinfo.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/cpuinfo.py @@ -0,0 +1,62 @@ +import re + +from galaxy import util + +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + +import logging +log = logging.getLogger( __name__ ) + +PROCESSOR_LINE = re.compile(r"processor\s*\:\s*(\d+)") + + +class CpuInfoFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + if key == "processor_count": + return "Processor Count", "%s" % int( value ) + else: + return key, value + + +class CpuInfoPlugin( InstrumentPlugin ): + """ Gather information about processor configuration from /proc/cpuinfo. + Linux only. + """ + plugin_type = "cpuinfo" + formatter = CpuInfoFormatter() + + def __init__( self, **kwargs ): + self.verbose = util.asbool( kwargs.get( "verbose", False ) ) + + def pre_execute_instrument( self, job_directory ): + return "cat /proc/cpuinfo > '%s'" % self.__instrument_cpuinfo_path( job_directory ) + + def job_properties( self, job_id, job_directory ): + properties = {} + processor_count = 0 + with open( self.__instrument_cpuinfo_path( job_directory ) ) as f: + current_processor = None + for line in f: + line = line.strip().lower() + if not line: # Skip empty lines + continue + + processor_line_match = PROCESSOR_LINE.match( line ) + if processor_line_match: + processor_count += 1 + current_processor = processor_line_match.group( 1 ) + elif current_processor and self.verbose: + # If verbose, dump information about each processor + # into database... + key, value = line.split( ":", 1 ) + key = "processor_%s_%s" % ( current_processor, key.strip() ) + value = value + properties[ "processor_count" ] = processor_count + return properties + + def __instrument_cpuinfo_path( self, job_directory ): + return self._instrument_file_path( job_directory, "cpuinfo" ) + +__all__ = [ CpuInfoPlugin ] https://bitbucket.org/galaxy/galaxy-central/commits/2d9d06f5b39e/ Changeset: 2d9d06f5b39e User: jmchilton Date: 2014-03-26 17:01:46 Summary: Implement meminfo job instrumentation plugin. Affected #: 2 files diff -r de4745c66e9d030a7df3a8eec9586fa9d65aa6c6 -r 2d9d06f5b39eb5d7d2b926dc1d74d7720cf43e2f job_metrics_conf.xml.sample --- a/job_metrics_conf.xml.sample +++ b/job_metrics_conf.xml.sample @@ -22,6 +22,10 @@ job - this is likely too much data. Linux only. --><!-- <cpuinfo verbose="true" /> --> + <!-- Uncomment to dump system memory information for each job - linux + only. --> + <!-- <meminfo /> --> + <!-- Uncomment following to enable plugin dumping complete environment for each job, potentially useful for debuging --><!-- <env /> --> diff -r de4745c66e9d030a7df3a8eec9586fa9d65aa6c6 -r 2d9d06f5b39eb5d7d2b926dc1d74d7720cf43e2f lib/galaxy/jobs/metrics/instrumenters/meminfo.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/meminfo.py @@ -0,0 +1,59 @@ +import re + +from galaxy import util + +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + +MEMINFO_LINE = re.compile(r"(\w+)\s*\:\s*(\d+) kB") + +# Important (non-verbose) meminfo property titles. +MEMINFO_TITLES = { + "memtotal": "Total System Memory", + "swaptotal": "Total System Swap" +} + + +class MemInfoFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + title = MEMINFO_TITLES.get( key, key ) + return title, util.nice_size( value * 1000 ) # kB = *1000, KB = *1024 - wikipedia + + +class MemInfoPlugin( InstrumentPlugin ): + """ Gather information about processor configuration from /proc/cpuinfo. + Linux only. + """ + plugin_type = "meminfo" + formatter = MemInfoFormatter() + + def __init__( self, **kwargs ): + self.verbose = util.asbool( kwargs.get( "verbose", False ) ) + + def pre_execute_instrument( self, job_directory ): + return "cat /proc/meminfo > '%s'" % self.__instrument_meminfo_path( job_directory ) + + def job_properties( self, job_id, job_directory ): + properties = {} + with open( self.__instrument_meminfo_path( job_directory ) ) as f: + for line in f: + line = line.strip() + if not line: # Skip empty lines + continue + line_match = MEMINFO_LINE.match( line ) + if not line_match: + continue + key = line_match.group( 1 ).lower() + # By default just grab important meminfo properties with titles + # defined for formatter. Grab everything in verbose mode for + # an arbitrary snapshot of memory at beginning of run. + if key in MEMINFO_TITLES or self.verbose: + value = long( line_match.group( 2 ) ) + properties[ key ] = value + return properties + + def __instrument_meminfo_path( self, job_directory ): + return self._instrument_file_path( job_directory, "meminfo" ) + +__all__ = [ MemInfoPlugin ] https://bitbucket.org/galaxy/galaxy-central/commits/54d890bb498d/ Changeset: 54d890bb498d User: jmchilton Date: 2014-03-26 17:01:46 Summary: Implement uname job instrumentation plugin. Gathers information on operating system configuration job is run on - linux only. Affected #: 2 files diff -r 2d9d06f5b39eb5d7d2b926dc1d74d7720cf43e2f -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a job_metrics_conf.xml.sample --- a/job_metrics_conf.xml.sample +++ b/job_metrics_conf.xml.sample @@ -26,6 +26,10 @@ only. --><!-- <meminfo /> --> + <!-- Uncomment to record operating system each job is executed on - linux + only. --> + <!-- <uname /> --> + <!-- Uncomment following to enable plugin dumping complete environment for each job, potentially useful for debuging --><!-- <env /> --> diff -r 2d9d06f5b39eb5d7d2b926dc1d74d7720cf43e2f -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a lib/galaxy/jobs/metrics/instrumenters/uname.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/uname.py @@ -0,0 +1,34 @@ +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + + +class UnameFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + return "Operating System", value + + +class UnamePlugin( InstrumentPlugin ): + """ Use uname to gather operating system information about remote system + job is running on. Linux only. + """ + plugin_type = "uname" + formatter = UnameFormatter() + + def __init__( self, **kwargs ): + self.uname_args = kwargs.get( "args", "-a" ) + + def pre_execute_instrument( self, job_directory ): + return "uname %s > '%s'" % ( self.uname_args, self.__instrument_uname_path( job_directory ) ) + + def job_properties( self, job_id, job_directory ): + properties = {} + with open( self.__instrument_uname_path( job_directory ) ) as f: + properties[ "uname" ] = f.read() + return properties + + def __instrument_uname_path( self, job_directory ): + return self._instrument_file_path( job_directory, "uname" ) + + +__all__ = [ UnamePlugin ] https://bitbucket.org/galaxy/galaxy-central/commits/8cd359ea420e/ Changeset: 8cd359ea420e User: jmchilton Date: 2014-03-26 17:01:46 Summary: Implement collectl job instrumentation plugin. This changeset documentation is an abridged form fo the documentation included in job_metrics_conf.xml.sample. Collectl (http://collectl.sourceforge.net/) is a powerful monitoring utility capable of gathering numerous system and process level statistics of running applications. The Galaxy collectl job metrics plugin by default will grab a variety of process level metrics aggregated across all processes corresponding to a job, this behavior is highly customiziable - both using the attributes documented below or simply hacking up the code in lib/galaxy/jobs/metrics. Warning: In order to use this plugin collectl must be available on the compute server the job runs on and on the local Galaxy server as well (unless in this latter case summarize_process_data is set to False). Attributes (the follow describes attributes that can be used with the collectl job metrics element above to modify its behavior). 'summarize_process_data': Boolean indicating whether to run collectl in playback mode after jobs complete and gather process level statistics for the job run. These statistics can be customized with the 'process_statistics' attribute. (defaults to True) 'saved_logs_path': If set (it is off by default), all collectl logs will be saved to the specified path after jobs complete. These logs can later be replayed using collectl offline to generate full time-series data corresponding to a job run. 'subsystems': Comma separated list of collectl subystems to collect data for. Plugin doesn't currently expose all of them or offer summary data for any of them except 'process' but extensions would be welcome. May seem pointless to include subsystems beside process since they won't be processed online by Galaxy - but if 'saved_logs_path' these files can be played back at anytime. Available subsystems - 'process', 'cpu', 'memory', 'network', 'disk', 'network'. (Default 'process'). 'process_statistics': If 'summarize_process_data' this attribute can be specified as a comma separated list to override the statistics that are gathered. Each statistics is of the for X_Y where X if one of 'min', 'max', 'count', 'avg', or 'sum' and Y is a value from 'S', 'VmSize', 'VmLck', 'VmRSS', 'VmData', 'VmStk', 'VmExe', 'VmLib', 'CPU', 'SysT', 'UsrT', 'PCT', 'AccumT' 'WKB', 'RKBC', 'WKBC', 'RSYS', 'WSYS', 'CNCL', 'MajF', 'MinF'. Consult lib/galaxy/jobs/metrics/collectl/processes.py for more details on what each of these resource types means. Defaults to 'max_VmSize,avg_VmSize,max_VmRSS,avg_VmRSS,sum_SysT,sum_UsrT,max_PCT avg_PCT,max_AccumT,sum_RSYS,sum_WSYS' as variety of statistics roughly describing CPU and memory usage of the program and VERY ROUGHLY describing I/O consumption. Affected #: 7 files diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a job_metrics_conf.xml.sample --- a/job_metrics_conf.xml.sample +++ b/job_metrics_conf.xml.sample @@ -36,4 +36,89 @@ <!-- env plugin can also record more targetted, obviously useful variables as well. --><!-- <env variables="HOSTNAME,SLURM_CPUS_ON_NODE,SLURM_JOBID" /> --> + + <!-- <collectl /> --> + <!-- Collectl (http://collectl.sourceforge.net/) is a powerful monitoring + utility capable of gathering numerous system and process level + statistics of running applications. The Galaxy collectl job metrics + plugin by default will grab a variety of process level metrics + aggregated across all processes corresponding to a job, this behavior + is highly customiziable - both using the attributes documented below + or simply hacking up the code in lib/galaxy/jobs/metrics. + + Warning: In order to use this plugin collectl must be available on the + compute server the job runs on and on the local Galaxy server as well + (unless in this latter case summarize_process_data is set to False). + + Attributes (the follow describes attributes that can be used with + the collectl job metrics element above to modify its behavior). + + 'summarize_process_data': Boolean indicating whether to run collectl + in playback mode after jobs complete and gather process level + statistics for the job run. These statistics can be customized + with the 'process_statistics' attribute. (defaults to True) + + 'saved_logs_path': If set (it is off by default), all collectl logs + will be saved to the specified path after jobs complete. These + logs can later be replayed using collectl offline to generate + full time-series data corresponding to a job run. + + 'subsystems': Comma separated list of collectl subystems to collect + data for. Plugin doesn't currently expose all of them or offer + summary data for any of them except 'process' but extensions + would be welcome. May seem pointless to include subsystems + beside process since they won't be processed online by Galaxy - + but if 'saved_logs_path' these files can be played back at anytime. + + Available subsystems - 'process', 'cpu', 'memory', 'network', + 'disk', 'network'. (Default 'process'). + + Warning: If you override this - be sure to include 'process' + unless 'summarize_process_data' is set to false. + + 'process_statistics': If 'summarize_process_data' this attribute can be + specified as a comma separated list to override the statistics + that are gathered. Each statistics is of the for X_Y where X + if one of 'min', 'max', 'count', 'avg', or 'sum' and Y is a + value from 'S', 'VmSize', 'VmLck', 'VmRSS', 'VmData', 'VmStk', + 'VmExe', 'VmLib', 'CPU', 'SysT', 'UsrT', 'PCT', 'AccumT' 'WKB', + 'RKBC', 'WKBC', 'RSYS', 'WSYS', 'CNCL', 'MajF', 'MinF'. Consult + lib/galaxy/jobs/metrics/collectl/processes.py for more details + on what each of these resource types means. + + Defaults to 'max_VmSize,avg_VmSize,max_VmRSS,avg_VmRSS,sum_SysT,sum_UsrT,max_PCT avg_PCT,max_AccumT,sum_RSYS,sum_WSYS' + as variety of statistics roughly describing CPU and memory + usage of the program and VERY ROUGHLY describing I/O consumption. + + 'procfilt_on': By default Galaxy will tell collectl to only collect + 'process' level data for the current user (as identified) + by 'username' (default) - this can be disabled by settting this + to 'none' - the plugin will still only aggregate process level + statistics for the jobs process tree - but the additional + information can still be used offline with 'saved_logs_path' + if set. Obsecurely, this can also be set 'uid' to identify + the current user to filter on by UID instead of username - + this may needed on some clusters(?). + + 'interval': The time (in seconds) between data collection points. + Collectl uses a variety of different defaults for different + subsystems if this is not set, but process information (likely + the most pertinent for Galaxy jobs will collect data every + 60 seconds). + + 'flush': Interval (in seconds I think) between when collectl will + flush its buffer to disk. Galaxy overrides this to disable + flushing by default if not set. + + 'local_collectl_path', 'remote_collectl_path', 'collectl_path': + By default, jobs will just assume collectl is on the PATH, but + it can be overridden with 'local_collectl_path' and + 'remote_collectl_path' (or simply 'collectl_path' if it is not + on the path but installed in the same location both locally and + remotely). + + There are more and more increasingly obsecure options including - + log_collectl_program_output, interval2, and interval3. Consult + source code for more details. + --></job_metrics> diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a lib/galaxy/jobs/metrics/collectl/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/__init__.py @@ -0,0 +1,5 @@ +""" This module contains helper functions and data structures for interacting +with collectl and collectl generated data. More information on collectl can be +found at: http://collectl.sourceforge.net/. + +""" diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a lib/galaxy/jobs/metrics/collectl/cli.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/cli.py @@ -0,0 +1,128 @@ +from string import Template +import subprocess + +import logging +log = logging.getLogger( __name__ ) + +COMMAND_LINE_TEMPLATE = Template( + "$collectl_path $destination_arg $mode_arg $subsystems_arg $interval_arg $procfilt_arg $flush_arg $sep_arg" +) +MODE_RECORD = "record" +MODE_PLAYBACK = "playback" + + +class CollectlCli( object ): + """ Abstraction over (some of) the command-line arguments of collectl. + Ideally this will be useful for building up command line arguments for + remote execution as well as runnning directly on local host. + + This is meant to be a fairly generic utility - for interfacing with + collectl CLI - logic more directly related to the Galaxy job metric plugin + plugin should be placed in other modules. + + Keyword Arguments: + collectl_path: Path to collectl executable (defaults to collectl - i.e. + search the PATH). + + playback_path (defaults to None): If this is None collectl will run in + record mode, else it will playback specified file. + + Playback Mode Options: + + sep : Separator used in playback mode (set to 9 to produce tsv) + (defaults to None). + + Record Mode Options (some of these may work in playback mode also): + + destination_path: Location of path files to write to (defaults to None + and collectl will just use cwd). Really this is just to prefix - + collectl will append hostname and datetime to file. + interval: Setup polling interval (secs) for most subsystems (defaults + to None and when unspecified collectl will use default of 1 second). + interval2: Setup polling interval (secs) for process information + (defaults to None and when unspecified collectl will use default to + 60 seconds). + interval3: Setup polling interval (secs) for environment information + (defaults to None and when unspecified collectl will use default to + 300 seconds). + procfilt: Optional argument to procfilt. (defaults to None). + flush : Optional flush interval (defaults to None). + """ + + def __init__( self, **kwargs ): + command_args = {} + command_args[ "collectl_path" ] = kwargs.get( "collectl_path", "collectl" ) + playback_path = kwargs.get( "playback_path", None ) + self.mode = MODE_RECORD if not playback_path else MODE_PLAYBACK + if self.mode == MODE_RECORD: + mode_arg = "" + elif self.mode == MODE_PLAYBACK: + mode_arg = "-P -p '%s'" % playback_path + else: + raise Exception( "Invalid mode supplied to CollectlCli - %s" % self.mode ) + command_args[ "mode_arg" ] = mode_arg + command_args[ "interval_arg" ] = self.__interval_arg( kwargs ) + destination = kwargs.get( "destination_path", None ) + if destination: + destination_arg = "-f '%s'" % destination + else: + destination_arg = "" + command_args[ "destination_arg" ] = destination_arg + procfilt = kwargs.get( "procfilt", None ) + command_args[ "procfilt_arg" ] = "" if not procfilt else "--procfilt %s" % procfilt + command_args[ "subsystems_arg" ] = self.__subsystems_arg( kwargs.get( "subsystems", [] ) ) + flush = kwargs.get( "flush", None ) + command_args[ "flush_arg"] = "--flush %s" % flush if flush else "" + sep = kwargs.get( "sep", None ) + command_args[ "sep_arg" ] = "--sep=%s" % sep if sep else "" + + self.command_args = command_args + + def __subsystems_arg( self, subsystems ): + if subsystems: + return "-s%s" % "".join( [ s.command_line_arg for s in subsystems ] ) + else: + return "" + + def __interval_arg( self, kwargs ): + if self.mode != MODE_RECORD: + return "" + + interval = kwargs.get( "interval", None ) + if not interval: + return "" + + self.__validate_interval_arg( interval ) + interval_arg = "-i %s" % interval + interval2 = kwargs.get( "interval2", None ) + if not interval2: + return interval_arg + self.__validate_interval_arg( interval2, multiple_of=int( interval ) ) + interval_arg = "%s:%s" % ( interval_arg, interval2 ) + + interval3 = kwargs.get( "interval3", None ) + if not interval3: + return interval_arg + self.__validate_interval_arg( interval3, multiple_of=int( interval ) ) + interval_arg = "%s:%s" % ( interval_arg, interval3 ) + return interval_arg + + def __validate_interval_arg( self, value, multiple_of=None ): + if value and not str(value).isdigit(): + raise Exception( "Invalid interval argument supplied, must be integer %s" % value ) + if multiple_of: + if int( value ) % multiple_of != 0: + raise Exception( "Invalid interval argument supplied, must multiple of %s" % multiple_of ) + + def build_command_line( self ): + return COMMAND_LINE_TEMPLATE.substitute( **self.command_args ) + + def run( self, stdout=subprocess.PIPE, stderr=subprocess.PIPE ): + command_line = self.build_command_line() + log.info( "Executing %s" % command_line ) + proc = subprocess.Popen( command_line, shell=True, stdout=stdout, stderr=stderr ) + return_code = proc.wait() + if return_code: + raise Exception( "Problem running collectl command." ) + +__all__ = [ CollectlCli ] diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a lib/galaxy/jobs/metrics/collectl/processes.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/processes.py @@ -0,0 +1,252 @@ +""" Modules will run collectl in playback mode and collect various process +statistics for a given pid's process and process ancestors. +""" +import collections +import csv +import tempfile + +from galaxy import util + +from ..collectl import stats + +import logging +log = logging.getLogger( __name__ ) + +# Collectl process information cheat sheet: +# +# Record process information for current user. +# % collectl -sZ -f./__instrument_collectl -i 10:10 --procfilt U$USER +# +# TSV Replay of processing information in plottable mode... +# +# % collectl -sZ -P --sep=9 -p __instrument_collectl-jlaptop13-20140322-120919.raw.gz +# +# Has following columns: +# Date Time PID User PR PPID THRD S VmSize VmLck VmRSS VmData VmStk VmExe VmLib CPU SysT UsrT PCT AccumT RKB WKB RKBC WKBC RSYS WSYS CNCL MajF MinF Command +# + +# Process data dumped one row per process per interval. +#http://collectl.sourceforge.net/Data-detail.html +PROCESS_COLUMNS = [ + "#Date", # Date of interval - e.g. 20140322 + "Time", # Time of interval - 12:18:58 + "PID", # Process pid. + "User", # Process user. + "PR", # Priority of process. + "PPID", # Parent PID of process. + "THRD", # Thread??? + "S", # Process state - S - Sleeping, D - Uninterruptable Sleep, R - Running, Z - Zombie or T - Stopped/Traced + ## Memory options - http://ewx.livejournal.com/579283.html + "VmSize", + "VmLck", + "VmRSS", + "VmData", + "VmStk", + "VmExe", + "VmLib", + "CPU", # CPU number of process + "SysT", # Amount of system time consumed during interval + "UsrT", # Amount user time consumed during interval + "PCT", # Percentage of current interval consumed by task + "AccumT", # Total accumulated System and User time since the process began execution + # kilobytes read/written - requires I/O level monitoring to be enabled in kernel. + "RKB", # kilobytes read by process - requires I/O monitoring in kernel + "WKB", + "RKBC", + "WKBC", + "RSYS", # Number of read system calls + "WSYS", # Number of write system calls + "CNCL", + "MajF", # Number of major page faults + "MinF", # Number of minor page faults + "Command", # Command executed +] + +# Types of statistics this module can summarize +STATISTIC_TYPES = [ "max", "min", "sum", "count", "avg" ] + +COLUMN_INDICES = dict( [ ( col, i ) for i, col in enumerate( PROCESS_COLUMNS ) ] ) +PID_INDEX = COLUMN_INDICES[ "PID" ] +PARENT_PID_INDEX = COLUMN_INDICES[ "PPID" ] + +DEFAULT_STATISTICS = [ + ("max", "VmSize"), + ("avg", "VmSize"), + ("max", "VmRSS"), + ("avg", "VmRSS"), + ("sum", "SysT"), + ("sum", "UsrT"), + ("max", "PCT"), + ("avg", "PCT"), + ("max", "AccumT"), + ("sum", "RSYS"), + ("sum", "WSYS"), +] + + +def parse_process_statistics( statistics ): + """ Turn string or list of strings into list of tuples in format ( stat, + resource ) where stat is a value from STATISTIC_TYPES and resource is a + value from PROCESS_COLUMNS. + """ + if statistics is None: + statistics = DEFAULT_STATISTICS + + statistics = util.listify( statistics ) + statistics = map( __tuplize_statistic, statistics ) + # Check for validity... + for statistic in statistics: + if statistic[ 0 ] not in STATISTIC_TYPES: + raise Exception( "Unknown statistic type encountered %s" % statistic[ 0 ] ) + if statistic[ 1 ] not in PROCESS_COLUMNS: + raise Exception( "Unknown process column encountered %s" % statistic[ 1 ] ) + return statistics + + +def generate_process_statistics( collectl_playback_cli, pid, statistics=DEFAULT_STATISTICS ): + """ Playback collectl file and generate summary statistics. + """ + with tempfile.NamedTemporaryFile( ) as tmp_tsv: + collectl_playback_cli.run( stdout=tmp_tsv ) + with open( tmp_tsv.name, "r" ) as tsv_file: + return __read_process_statistics( tsv_file, pid, statistics ) + + +def __read_process_statistics( tsv_file, pid, statistics ): + process_summarizer = CollectlProcessSummarizer( pid, statistics ) + current_interval = None + + for row in csv.reader( tsv_file, dialect="excel-tab" ): + if current_interval is None: + for header, expected_header in zip( row, PROCESS_COLUMNS ): + if header.lower() != expected_header.lower(): + raise Exception( "Unknown header value encountered while processing collectl playback - %s" % header ) + + # First row, check contains correct header. + current_interval = CollectlProcessInterval() + continue + + if current_interval.row_is_in( row ): + current_interval.add_row( row ) + else: + process_summarizer.handle_interval( current_interval ) + current_interval = CollectlProcessInterval() + + # Do we have unsummarized rows... + if current_interval and current_interval.rows: + process_summarizer.handle_interval( current_interval ) + + return process_summarizer.get_statistics() + + +class CollectlProcessSummarizer( object ): + + def __init__( self, pid, statistics ): + self.pid = pid + self.statistics = statistics + self.columns_of_interest = set( [ s[ 1 ] for s in statistics ] ) + self.tree_statistics = collections.defaultdict( stats.StatisticsTracker ) + self.process_accum_statistics = collections.defaultdict( stats.StatisticsTracker ) + self.interval_count = 0 + + def handle_interval( self, interval ): + self.interval_count += 1 + rows = self.__rows_for_process( interval.rows, self.pid ) + for column_name in self.columns_of_interest: + column_index = COLUMN_INDICES[ column_name ] + + if column_name == "AccumT": + # Should not sum this across pids each interval, sum max at end... + for r in rows: + pid_seconds = self.__time_to_seconds( r[ column_index ] ) + self.process_accum_statistics[ r[ PID_INDEX ] ].track( pid_seconds ) + else: + # All other stastics should be summed across whole process tree + # at each interval I guess. + if column_name in [ "SysT", "UsrT", "PCT" ]: + to_num = float + else: + to_num = long + + interval_stat = sum( to_num( r[ column_index ] ) for r in rows ) + self.tree_statistics[ column_name ].track( interval_stat ) + + def get_statistics( self ): + if self.interval_count == 0: + return [] + + computed_statistics = [] + for statistic in self.statistics: + statistic_type, column = statistic + if column == "AccumT": + # Only thing that makes sense is sum + if statistic_type != "max": + log.warn( "Only statistic max makes sense for AccumT" ) + continue + + value = sum( [ v.max for v in self.process_accum_statistics.itervalues() ] ) + else: + statistics_tracker = self.tree_statistics[ column ] + value = getattr( statistics_tracker, statistic_type ) + + computed_statistic = ( statistic, value ) + computed_statistics.append( computed_statistic ) + + return computed_statistics + + def __rows_for_process( self, rows, pid ): + process_rows = [] + pids = self.__all_child_pids( rows, pid ) + for row in rows: + if row[ PID_INDEX ] in pids: + process_rows.append( row ) + return process_rows + + def __all_child_pids( self, rows, pid ): + pids_in_process_tree = set( [ str( self.pid ) ] ) + added = True + while added: + added = False + for row in rows: + pid = row[ PID_INDEX ] + parent_pid = row[ PARENT_PID_INDEX ] + if parent_pid in pids_in_process_tree and pid not in pids_in_process_tree: + pids_in_process_tree.add( pid ) + added = True + return pids_in_process_tree + + def __time_to_seconds( self, minutes_str ): + parts = minutes_str.split( ":" ) + seconds = 0.0 + for i, val in enumerate( parts ): + seconds += float(val) * ( 60 ** ( len( parts ) - ( i + 1 ) ) ) + return seconds + + +class CollectlProcessInterval( object ): + """ Represent all rows in collectl playback file for given time slice with + ability to filter out just rows corresponding to the process tree + corresponding to a given pid. + """ + + def __init__( self ): + self.rows = [] + + def row_is_in( self, row ): + if not self.rows: # No rows, this row defines interval. + return True + first_row = self.rows[ 0 ] + return first_row[ 0 ] == row[ 0 ] and first_row[ 1 ] == row[ 1 ] + + def add_row( self, row ): + self.rows.append( row ) + + +def __tuplize_statistic( statistic ): + if not isinstance( statistic, tuple ): + statistic_split = statistic.split( "_", 1 ) + statistic = ( statistic_split[ 0 ].lower(), statistic_split[ 1 ] ) + return statistic + + +__all__ = [ generate_process_statistics ] diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a lib/galaxy/jobs/metrics/collectl/stats.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/stats.py @@ -0,0 +1,27 @@ +""" Primitive module for tracking running statistics without storing values in +memory. +""" + + +class StatisticsTracker( object ): + + def __init__( self ): + self.min = None + self.max = None + self.count = 0 + self.sum = 0 + + def track( self, value ): + if self.min is None or value < self.min: + self.min = value + if self.max is None or value > self.max: + self.max = value + self.count += 1 + self.sum += value + + @property + def avg( self ): + if self.count > 0: + return self.sum / self.count + else: + return None diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a lib/galaxy/jobs/metrics/collectl/subsystems.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/subsystems.py @@ -0,0 +1,72 @@ +from abc import ABCMeta +from abc import abstractmethod + + +class CollectlSubsystem( object ): + """ Class providing an abstraction of collectl subsytems. + """ + __metaclass__ = ABCMeta + + @property + @abstractmethod + def command_line_arg( self ): + """ Return single letter command-line argument used by collectl CLI. + """ + + @property + @abstractmethod + def name( self, job_directory ): + """ High-level name for subsystem as consumed by this module. + """ + + +class ProcessesSubsystem( CollectlSubsystem ): + command_line_arg = "Z" + name = "process" + + +class CpuSubsystem( CollectlSubsystem ): + command_line_arg = "C" + name = "cpu" + + +class DiskSubsystem( CollectlSubsystem ): + command_line_arg = "D" + name = "disk" + + +class NetworkSubsystem( CollectlSubsystem ): + command_line_arg = "N" + name = "network" + + +class EnvironmentSubsystem( CollectlSubsystem ): + command_line_arg = "E" + name = "environment" + + +class MemorySubsystem( CollectlSubsystem ): + command_line_arg = "M" + name = "memory" + + +SUBSYSTEMS = [ + ProcessesSubsystem(), + CpuSubsystem(), + DiskSubsystem(), + NetworkSubsystem(), + EnvironmentSubsystem(), + MemorySubsystem(), +] +SUBSYSTEM_DICT = dict( [ (s.name, s) for s in SUBSYSTEMS ] ) + + +def get_subsystem( name ): + """ + + >>> get_subsystem( "process" ).command_line_arg == "Z" + True + """ + return SUBSYSTEM_DICT[ name ] + +__all__ = [ get_subsystem ] diff -r 54d890bb498dc7bb90a1a61c5b4dc636daf8385a -r 8cd359ea420e3b0c3cbadcbc6fdf99ef614ad28a lib/galaxy/jobs/metrics/instrumenters/collectl.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/collectl.py @@ -0,0 +1,214 @@ +import os +import shutil + +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting +from ..collectl import subsystems +from ..collectl import cli +from ..collectl import processes + +from galaxy import util +from galaxy.util import directory_hash + +import logging +log = logging.getLogger( __name__ ) + +DEFAULT_PROCFILT_ON = "username" # By default, only grab statistics for user + # processes (as identifiers by username). +DEFAULT_SUBSYSTEMS = "process" +DEFAULT_FLUSH_INTERVAL = "0" # Set to zero to flush every collection. + +FORMATTED_RESOURCE_TITLES = { + "PCT": "Percent CPU Usage", + "RSYS": "Disk Reads", + "WSYS": "Disk Writes", +} + +EMPTY_COLLECTL_FILE_MESSAGE = "Skipping process summary due to empty file... job probably did not run long enough for collectl to gather data." + + +class CollectlFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + if key == "pid": + return ( "Process ID", int( value ) ) + elif key == "raw_log_path": + return ( "Relative Path of Full Collectl Log", value ) + elif key == "process_max_AccumT": + return ( "Job Runtime (System+User)", formatting.seconds_to_str( float( value ) ) ) + else: + _, stat_type, resource_type = key.split( "_", 2 ) + if resource_type.startswith( "Vm"): + value_str = "%s KB" % int( value ) + elif resource_type in [ "RSYS", "WSYS" ] and stat_type in [ "count", "max", "sum" ]: + value_str = "%d (# system calls)" % int( value ) + else: + value_str = str( value ) + resource_title = FORMATTED_RESOURCE_TITLES.get( resource_type, resource_type ) + return ( "%s (%s)" % ( resource_title, stat_type ), value_str ) + + +class CollectlPlugin( InstrumentPlugin ): + """ Run collectl along with job to capture system and/or process data + according to specified collectl subsystems. + """ + plugin_type = "collectl" + formatter = CollectlFormatter() + + def __init__( self, **kwargs ): + self.__configure_paths( kwargs ) + self.__configure_subsystems( kwargs ) + saved_logs_path = kwargs.get( "saved_logs_path", None ) + if "app" in kwargs: + saved_logs_path = kwargs[ "app" ].config.resolve_path( saved_logs_path ) + self.saved_logs_path = saved_logs_path + self.__configure_collectl_recorder_args( kwargs ) + self.summarize_process_data = util.asbool( kwargs.get( "summarize_process_data", True ) ) + self.log_collectl_program_output = util.asbool( kwargs.get( "log_collectl_program_output", False ) ) + if self.summarize_process_data: + if subsystems.get_subsystem( "process" ) not in self.subsystems: + raise Exception( "Collectl plugin misconfigured - cannot summarize_process_data without process subsystem being enabled." ) + + process_statistics = kwargs.get( "process_statistics", None ) + # None will let processes module use default set of statistics + # defined there. + self.process_statistics = processes.parse_process_statistics( process_statistics ) + + def pre_execute_instrument( self, job_directory ): + commands = [] + # Capture PID of process so we can walk its ancestors when building + # statistics for the whole job. + commands.append( '''echo "$$" > '%s' ''' % self.__pid_file( job_directory ) ) + # Run collectl in record mode to capture process and system level + # statistics according to supplied subsystems. + commands.append( self.__collectl_record_command( job_directory ) ) + return commands + + def post_execute_instrument( self, job_directory ): + commands = [] + # collectl dies when job script completes, perhaps capture pid of + # collectl above and check if it is still alive to allow tracking if + # collectl ran successfully through the whole job. + return commands + + def job_properties( self, job_id, job_directory ): + pid = open( self.__pid_file( job_directory ), "r" ).read().strip() + contents = os.listdir( job_directory ) + try: + rel_path = filter( self._is_instrumented_collectl_log, contents )[ 0 ] + path = os.path.join( job_directory, rel_path ) + except IndexError: + message = "Failed to find collectl log in directory %s, files were %s" % ( job_directory, contents ) + raise Exception( message ) + + properties = dict( + pid=int( pid ), + ) + + if self.saved_logs_path: + destination_rel_dir = os.path.join( *directory_hash.directory_hash_id( job_id ) ) + destination_rel_path = os.path.join( destination_rel_dir, rel_path ) + destination_path = os.path.join( self.saved_logs_path, destination_rel_path ) + destination_dir = os.path.dirname( destination_path ) + if not os.path.isdir( destination_dir ): + os.makedirs( destination_dir ) + shutil.copyfile( path, destination_path ) + properties[ "raw_log_path" ] = destination_rel_path + + if self.summarize_process_data: + # Run collectl in playback and generate statistics of interest + summary_statistics = self.__summarize_process_data( pid, path ) + for statistic, value in summary_statistics: + properties[ "process_%s" % "_".join( statistic ) ] = value + + return properties + + def __configure_paths( self, kwargs ): + # 95% of time I would expect collectl to just be installed with apt or + # yum, but if it is manually installed on not on path, allow + # configuration of explicit path - and allow path to be different + # between galaxy job handler (local_collectl_path) and compute node + # (remote_collectl_path). + collectl_path = kwargs.get( "collectl_path", "collectl" ) + self.remote_collectl_path = kwargs.get( "remote_collectl_path", collectl_path ) + self.local_collectl_path = kwargs.get( "local_collectl_path", collectl_path ) + + def __configure_subsystems( self, kwargs ): + raw_subsystems_str = kwargs.get( "subsystems", DEFAULT_SUBSYSTEMS ) + raw_subsystems = util.listify( raw_subsystems_str, do_strip=True ) + self.subsystems = map( subsystems.get_subsystem, raw_subsystems ) + + def __configure_collectl_recorder_args( self, kwargs ): + collectl_recorder_args = kwargs.copy() + + # Allow deployer to configure separate system and process intervals, + # but if they specify just one - use it for both. Thinking here is this + # plugin's most useful feature is the process level information so + # this is likely what the deployer is attempting to configure. + if "interval" in kwargs and "interval2" not in kwargs: + collectl_recorder_args[ "interval2" ] = kwargs[ "interval"] + + if "flush" not in kwargs: + collectl_recorder_args[ "flush" ] = DEFAULT_FLUSH_INTERVAL + + procfilt_on = kwargs.get( "procfilt_on", DEFAULT_PROCFILT_ON ).lower() + # Calculate explicit arguments, rest can just be passed through from + # constructor arguments. + explicit_args = dict( + collectl_path=self.remote_collectl_path, + procfilt=procfilt_argument( procfilt_on ), + subsystems=self.subsystems, + ) + collectl_recorder_args.update( explicit_args ) + self.collectl_recorder_args = collectl_recorder_args + + def __summarize_process_data( self, pid, collectl_log_path ): + playback_cli_args = dict( + collectl_path=self.local_collectl_path, + playback_path=collectl_log_path, + sep="9" + ) + if not os.stat( collectl_log_path ).st_size: + log.debug( EMPTY_COLLECTL_FILE_MESSAGE ) + return [ ] + + playback_cli = cli.CollectlCli( **playback_cli_args ) + return processes.generate_process_statistics( playback_cli, pid, self.process_statistics ) + + def __collectl_recorder_cli( self, job_directory ): + cli_args = self.collectl_recorder_args.copy() + cli_args[ "destination_path" ] = self._instrument_file_path( job_directory, "log" ) + return cli.CollectlCli( **cli_args ) + + def __collectl_record_command( self, job_directory ): + collectl_cli = self.__collectl_recorder_cli( job_directory ) + if self.log_collectl_program_output: + redirect_to = self._instrument_file_path( job_directory, "program_output" ) + else: + redirect_to = "/dev/null" + return "%s > %s 2>&1 &" % ( + collectl_cli.build_command_line(), + redirect_to, + ) + + def __pid_file( self, job_directory ): + return self._instrument_file_path( job_directory, "pid" ) + + def _is_instrumented_collectl_log( self, filename ): + prefix = self._instrument_file_name( "log" ) + return filename.startswith( prefix ) and filename.endswith( ".raw.gz" ) + + +def procfilt_argument( procfilt_on ): + if procfilt_on == "username": + return "U$USER" + elif procfilt_on == "uid": + return "u$UID" + else: + # Ensure it is empty of None + if procfilt_on or procfilt_on.lower() != "none": + raise Exception( "Invalid procfilt_on argument encountered") + return "" + + +__all__ = [ CollectlPlugin ] https://bitbucket.org/galaxy/galaxy-central/commits/b24137a3a980/ Changeset: b24137a3a980 User: jmchilton Date: 2014-04-23 04:06:03 Summary: Merge pull request #352. Affected #: 27 files diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -62,6 +62,11 @@ <!-- Warning: Local slot count doesn't tie up additional worker threads, to prevent over allocating machine define a second local runner with different name and fewer workers to run this destination. --> + <job_metrics /> + <!-- Above element demonstrates embedded job metrics definition - see + job_metrics_conf.xml.sample for full documentation on possible nested + elements. This empty block will simply disable job metrics for the + corresponding destination. --></destination><destination id="pbs" runner="pbs" tags="mycluster"/><destination id="pbs_longjobs" runner="pbs" tags="mycluster,longjobs"> diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 job_metrics_conf.xml.sample --- /dev/null +++ b/job_metrics_conf.xml.sample @@ -0,0 +1,124 @@ +<?xml version="1.0"?> +<!-- If job_metrics.xml exists, this file will define the default job metric + plugin used for all jobs. Individual job_conf.xml destinations can + disable metric collection by setting metrics="off" on that destination. + The metrics attribute on destination definition elements can also be + a path - in which case that XML metrics file will be loaded and used for + that destination. Finally, the destination element may contain a job_metrics + child element (with all options defined below) to define job metrics in an + embedded manner directly in the job_conf.xml file. +--> +<job_metrics> + <!-- Each element in this file corresponds to a job instrumentation plugin + used to generate metrics in lib/galaxy/jobs/metrics/instrumenters. --> + + <!-- Core plugin captures Galaxy slots, start and end of job (in seconds + since epoch) and computes runtime in seconds. --> + <core /> + + <!-- Uncomment to dump processor count for each job - linux only. --> + <!-- <cpuinfo /> --> + <!-- Uncomment to dump information about all processors for for each + job - this is likely too much data. Linux only. --> + <!-- <cpuinfo verbose="true" /> --> + + <!-- Uncomment to dump system memory information for each job - linux + only. --> + <!-- <meminfo /> --> + + <!-- Uncomment to record operating system each job is executed on - linux + only. --> + <!-- <uname /> --> + + <!-- Uncomment following to enable plugin dumping complete environment + for each job, potentially useful for debuging --> + <!-- <env /> --> + <!-- env plugin can also record more targetted, obviously useful variables + as well. --> + <!-- <env variables="HOSTNAME,SLURM_CPUS_ON_NODE,SLURM_JOBID" /> --> + + <!-- <collectl /> --> + <!-- Collectl (http://collectl.sourceforge.net/) is a powerful monitoring + utility capable of gathering numerous system and process level + statistics of running applications. The Galaxy collectl job metrics + plugin by default will grab a variety of process level metrics + aggregated across all processes corresponding to a job, this behavior + is highly customiziable - both using the attributes documented below + or simply hacking up the code in lib/galaxy/jobs/metrics. + + Warning: In order to use this plugin collectl must be available on the + compute server the job runs on and on the local Galaxy server as well + (unless in this latter case summarize_process_data is set to False). + + Attributes (the follow describes attributes that can be used with + the collectl job metrics element above to modify its behavior). + + 'summarize_process_data': Boolean indicating whether to run collectl + in playback mode after jobs complete and gather process level + statistics for the job run. These statistics can be customized + with the 'process_statistics' attribute. (defaults to True) + + 'saved_logs_path': If set (it is off by default), all collectl logs + will be saved to the specified path after jobs complete. These + logs can later be replayed using collectl offline to generate + full time-series data corresponding to a job run. + + 'subsystems': Comma separated list of collectl subystems to collect + data for. Plugin doesn't currently expose all of them or offer + summary data for any of them except 'process' but extensions + would be welcome. May seem pointless to include subsystems + beside process since they won't be processed online by Galaxy - + but if 'saved_logs_path' these files can be played back at anytime. + + Available subsystems - 'process', 'cpu', 'memory', 'network', + 'disk', 'network'. (Default 'process'). + + Warning: If you override this - be sure to include 'process' + unless 'summarize_process_data' is set to false. + + 'process_statistics': If 'summarize_process_data' this attribute can be + specified as a comma separated list to override the statistics + that are gathered. Each statistics is of the for X_Y where X + if one of 'min', 'max', 'count', 'avg', or 'sum' and Y is a + value from 'S', 'VmSize', 'VmLck', 'VmRSS', 'VmData', 'VmStk', + 'VmExe', 'VmLib', 'CPU', 'SysT', 'UsrT', 'PCT', 'AccumT' 'WKB', + 'RKBC', 'WKBC', 'RSYS', 'WSYS', 'CNCL', 'MajF', 'MinF'. Consult + lib/galaxy/jobs/metrics/collectl/processes.py for more details + on what each of these resource types means. + + Defaults to 'max_VmSize,avg_VmSize,max_VmRSS,avg_VmRSS,sum_SysT,sum_UsrT,max_PCT avg_PCT,max_AccumT,sum_RSYS,sum_WSYS' + as variety of statistics roughly describing CPU and memory + usage of the program and VERY ROUGHLY describing I/O consumption. + + 'procfilt_on': By default Galaxy will tell collectl to only collect + 'process' level data for the current user (as identified) + by 'username' (default) - this can be disabled by settting this + to 'none' - the plugin will still only aggregate process level + statistics for the jobs process tree - but the additional + information can still be used offline with 'saved_logs_path' + if set. Obsecurely, this can also be set 'uid' to identify + the current user to filter on by UID instead of username - + this may needed on some clusters(?). + + 'interval': The time (in seconds) between data collection points. + Collectl uses a variety of different defaults for different + subsystems if this is not set, but process information (likely + the most pertinent for Galaxy jobs will collect data every + 60 seconds). + + 'flush': Interval (in seconds I think) between when collectl will + flush its buffer to disk. Galaxy overrides this to disable + flushing by default if not set. + + 'local_collectl_path', 'remote_collectl_path', 'collectl_path': + By default, jobs will just assume collectl is on the PATH, but + it can be overridden with 'local_collectl_path' and + 'remote_collectl_path' (or simply 'collectl_path' if it is not + on the path but installed in the same location both locally and + remotely). + + There are more and more increasingly obsecure options including - + log_collectl_program_output, interval2, and interval3. Consult + source code for more details. + --> +</job_metrics> diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -15,7 +15,7 @@ from galaxy.sample_tracking import external_service_types from galaxy.openid.providers import OpenIDProviders from galaxy.tools.data_manager.manager import DataManagers - +from galaxy.jobs import metrics as job_metrics from galaxy.web.base import pluginframework import logging @@ -60,6 +60,10 @@ self._configure_tool_data_tables( from_shed_config=False ) + # Initialize job metrics manager, needs to be in place before + # config so per-destination modifications can be made. + self.job_metrics = job_metrics.JobMetrics( self.config.job_metrics_config_file, app=self ) + # Initialize the job management configuration self.job_config = jobs.JobConfiguration(self) diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -137,6 +137,7 @@ self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root ) self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root ) self.dependency_resolvers_config_file = resolve_path( kwargs.get( 'dependency_resolvers_config_file', 'dependency_resolvers_conf.xml' ), self.root ) + self.job_metrics_config_file = resolve_path( kwargs.get( 'job_metrics_config_file', 'job_metrics_conf.xml' ), self.root ) self.job_config_file = resolve_path( kwargs.get( 'job_config_file', 'job_conf.xml' ), self.root ) self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) ) self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) ) @@ -455,6 +456,11 @@ admin_users = [ x.strip() for x in self.get( "admin_users", "" ).split( "," ) ] return ( user is not None and user.email in admin_users ) + def resolve_path( self, path ): + """ Resolve a path relative to Galaxy's root. + """ + return resolve_path( path, self.root ) + def get_database_engine_options( kwargs, model_prefix='' ): """ Allow options for the SQLAlchemy database engine to be passed by using diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -162,8 +162,21 @@ # Parse destinations destinations = root.find('destinations') + job_metrics = self.app.job_metrics for destination in self.__findall_with_required(destinations, 'destination', ('id', 'runner')): id = destination.get('id') + destination_metrics = destination.get( "metrics", None ) + if destination_metrics: + if not util.asbool( destination_metrics ): + # disable + job_metrics.set_destination_instrumenter( id, None ) + else: + metrics_conf_path = self.app.config.resolve_path( destination_metrics ) + job_metrics.set_destination_conf_file( id, metrics_conf_path ) + else: + metrics_elements = self.__findall_with_required( destination, 'job_metrics', () ) + if metrics_elements: + job_metrics.set_destination_conf_element( id, metrics_elements[ 0 ] ) job_destination = JobDestination(**dict(destination.items())) job_destination['params'] = self.__get_params(destination) self.destinations[id] = (job_destination,) @@ -1068,8 +1081,10 @@ # Finally set the job state. This should only happen *after* all # dataset creation, and will allow us to eliminate force_history_refresh. job.state = final_job_state + if not job.tasks: + # If job was composed of tasks, don't attempt to recollect statisitcs + self._collect_metrics( job ) self.sa_session.flush() - log.debug( 'job %d ended' % self.job_id ) delete_files = self.app.config.cleanup_job == 'always' or ( job.state == job.states.OK and self.app.config.cleanup_job == 'onsuccess' ) self.cleanup( delete_files=delete_files ) @@ -1094,6 +1109,16 @@ except: log.exception( "Unable to cleanup job %d" % self.job_id ) + def _collect_metrics( self, has_metrics ): + job = has_metrics.get_job() + per_plugin_properties = self.app.job_metrics.collect_properties( job.destination_id, self.job_id, self.working_directory ) + if per_plugin_properties: + log.info( "Collecting job metrics for %s" % has_metrics ) + for plugin, properties in per_plugin_properties.iteritems(): + for metric_name, metric_value in properties.iteritems(): + if metric_value is not None: + has_metrics.add_metric( plugin, metric_name, metric_value ) + def get_output_sizes( self ): sizes = [] output_paths = self.get_output_fnames() @@ -1508,6 +1533,7 @@ task.stdout = util.shrink_string_by_size( stdout, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) if len( stderr ) > DATABASE_MAX_STRING_SIZE: log.error( "stderr for task %d is greater than %s, only a portion will be logged to database" % ( task.id, DATABASE_MAX_STRING_SIZE_PRETTY ) ) + self._collect_metrics( task ) task.stderr = util.shrink_string_by_size( stderr, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) task.exit_code = tool_exit_code task.command_line = self.command_line diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/__init__.py @@ -0,0 +1,134 @@ +import collections +import os + +from xml.etree import ElementTree + +from galaxy.util.submodules import submodules +from galaxy import util + +from ..metrics import formatting + +import logging +log = logging.getLogger( __name__ ) + + +DEFAULT_FORMATTER = formatting.JobMetricFormatter() + + +class JobMetrics( object ): + + def __init__( self, conf_file=None, **kwargs ): + """ + """ + self.plugin_classes = self.__plugins_dict() + self.default_job_instrumenter = JobInstrumenter.from_file( self.plugin_classes, conf_file, **kwargs ) + self.job_instrumenters = collections.defaultdict( lambda: self.default_job_instrumenter ) + + def format( self, plugin, key, value ): + if plugin in self.plugin_classes: + plugin_class = self.plugin_classes[ plugin ] + formatter = plugin_class.formatter + else: + formatter = DEFAULT_FORMATTER + return formatter.format( key, value ) + + def set_destination_conf_file( self, destination_id, conf_file ): + instrumenter = JobInstrumenter.from_file( self.plugin_classes, conf_file ) + self.set_destination_instrumenter( destination_id, instrumenter ) + + def set_destination_conf_element( self, destination_id, element ): + instrumenter = JobInstrumenter( self.plugin_classes, element ) + self.set_destination_instrumenter( destination_id, instrumenter ) + + def set_destination_instrumenter( self, destination_id, job_instrumenter=None ): + if job_instrumenter is None: + job_instrumenter = NULL_JOB_INSTRUMENTER + self.job_instrumenters[ destination_id ] = job_instrumenter + + def collect_properties( self, destination_id, job_id, job_directory ): + return self.job_instrumenters[ destination_id ].collect_properties( job_id, job_directory ) + + def __plugins_dict( self ): + plugin_dict = {} + for plugin_module in self.__plugin_modules(): + for clazz in plugin_module.__all__: + plugin_type = getattr( clazz, 'plugin_type', None ) + if plugin_type: + plugin_dict[ plugin_type ] = clazz + return plugin_dict + + def __plugin_modules( self ): + import galaxy.jobs.metrics.instrumenters + return submodules( galaxy.jobs.metrics.instrumenters ) + + +class NullJobInstrumenter( object ): + + def pre_execute_commands( self, job_directory ): + return None + + def post_execute_commands( self, job_directory ): + return None + + def collect_properties( self, job_id, job_directory ): + return {} + +NULL_JOB_INSTRUMENTER = NullJobInstrumenter() + + +class JobInstrumenter( object ): + + def __init__( self, plugin_classes, metrics_element, **kwargs ): + self.extra_kwargs = kwargs + self.plugin_classes = plugin_classes + self.plugins = self.__plugins_for_element( metrics_element ) + + def pre_execute_commands( self, job_directory ): + commands = [] + for plugin in self.plugins: + try: + plugin_commands = plugin.pre_execute_instrument( job_directory ) + if plugin_commands: + commands.extend( util.listify( plugin_commands ) ) + except Exception: + log.exception( "Failed to generate pre-execute commands for plugin %s" % plugin ) + return "\n".join( [ c for c in commands if c ] ) + + def post_execute_commands( self, job_directory ): + commands = [] + for plugin in self.plugins: + try: + plugin_commands = plugin.post_execute_instrument( job_directory ) + if plugin_commands: + commands.extend( util.listify( plugin_commands ) ) + except Exception: + log.exception( "Failed to generate post-execute commands for plugin %s" % plugin ) + return "\n".join( [ c for c in commands if c ] ) + + def collect_properties( self, job_id, job_directory ): + per_plugin_properites = {} + for plugin in self.plugins: + try: + properties = plugin.job_properties( job_id, job_directory ) + if properties: + per_plugin_properites[ plugin.plugin_type ] = properties + except Exception: + log.exception( "Failed to collect job properties for plugin %s" % plugin ) + return per_plugin_properites + + def __plugins_for_element( self, plugins_element ): + plugins = [] + for plugin_element in plugins_element.getchildren(): + plugin_type = plugin_element.tag + plugin_kwds = dict( plugin_element.items() ) + plugin_kwds.update( self.extra_kwargs ) + plugin = self.plugin_classes[ plugin_type ]( **plugin_kwds ) + plugins.append( plugin ) + return plugins + + @staticmethod + def from_file( plugin_classes, conf_file, **kwargs ): + if not conf_file or not os.path.exists( conf_file ): + return NULL_JOB_INSTRUMENTER + plugins_element = ElementTree.parse( conf_file ).getroot() + return JobInstrumenter( plugin_classes, plugins_element, **kwargs ) diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/collectl/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/__init__.py @@ -0,0 +1,5 @@ +""" This module contains helper functions and data structures for interacting +with collectl and collectl generated data. More information on collectl can be +found at: http://collectl.sourceforge.net/. + +""" diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/collectl/cli.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/cli.py @@ -0,0 +1,128 @@ +from string import Template +import subprocess + +import logging +log = logging.getLogger( __name__ ) + +COMMAND_LINE_TEMPLATE = Template( + "$collectl_path $destination_arg $mode_arg $subsystems_arg $interval_arg $procfilt_arg $flush_arg $sep_arg" +) +MODE_RECORD = "record" +MODE_PLAYBACK = "playback" + + +class CollectlCli( object ): + """ Abstraction over (some of) the command-line arguments of collectl. + Ideally this will be useful for building up command line arguments for + remote execution as well as runnning directly on local host. + + This is meant to be a fairly generic utility - for interfacing with + collectl CLI - logic more directly related to the Galaxy job metric plugin + plugin should be placed in other modules. + + Keyword Arguments: + collectl_path: Path to collectl executable (defaults to collectl - i.e. + search the PATH). + + playback_path (defaults to None): If this is None collectl will run in + record mode, else it will playback specified file. + + Playback Mode Options: + + sep : Separator used in playback mode (set to 9 to produce tsv) + (defaults to None). + + Record Mode Options (some of these may work in playback mode also): + + destination_path: Location of path files to write to (defaults to None + and collectl will just use cwd). Really this is just to prefix - + collectl will append hostname and datetime to file. + interval: Setup polling interval (secs) for most subsystems (defaults + to None and when unspecified collectl will use default of 1 second). + interval2: Setup polling interval (secs) for process information + (defaults to None and when unspecified collectl will use default to + 60 seconds). + interval3: Setup polling interval (secs) for environment information + (defaults to None and when unspecified collectl will use default to + 300 seconds). + procfilt: Optional argument to procfilt. (defaults to None). + flush : Optional flush interval (defaults to None). + """ + + def __init__( self, **kwargs ): + command_args = {} + command_args[ "collectl_path" ] = kwargs.get( "collectl_path", "collectl" ) + playback_path = kwargs.get( "playback_path", None ) + self.mode = MODE_RECORD if not playback_path else MODE_PLAYBACK + if self.mode == MODE_RECORD: + mode_arg = "" + elif self.mode == MODE_PLAYBACK: + mode_arg = "-P -p '%s'" % playback_path + else: + raise Exception( "Invalid mode supplied to CollectlCli - %s" % self.mode ) + command_args[ "mode_arg" ] = mode_arg + command_args[ "interval_arg" ] = self.__interval_arg( kwargs ) + destination = kwargs.get( "destination_path", None ) + if destination: + destination_arg = "-f '%s'" % destination + else: + destination_arg = "" + command_args[ "destination_arg" ] = destination_arg + procfilt = kwargs.get( "procfilt", None ) + command_args[ "procfilt_arg" ] = "" if not procfilt else "--procfilt %s" % procfilt + command_args[ "subsystems_arg" ] = self.__subsystems_arg( kwargs.get( "subsystems", [] ) ) + flush = kwargs.get( "flush", None ) + command_args[ "flush_arg"] = "--flush %s" % flush if flush else "" + sep = kwargs.get( "sep", None ) + command_args[ "sep_arg" ] = "--sep=%s" % sep if sep else "" + + self.command_args = command_args + + def __subsystems_arg( self, subsystems ): + if subsystems: + return "-s%s" % "".join( [ s.command_line_arg for s in subsystems ] ) + else: + return "" + + def __interval_arg( self, kwargs ): + if self.mode != MODE_RECORD: + return "" + + interval = kwargs.get( "interval", None ) + if not interval: + return "" + + self.__validate_interval_arg( interval ) + interval_arg = "-i %s" % interval + interval2 = kwargs.get( "interval2", None ) + if not interval2: + return interval_arg + self.__validate_interval_arg( interval2, multiple_of=int( interval ) ) + interval_arg = "%s:%s" % ( interval_arg, interval2 ) + + interval3 = kwargs.get( "interval3", None ) + if not interval3: + return interval_arg + self.__validate_interval_arg( interval3, multiple_of=int( interval ) ) + interval_arg = "%s:%s" % ( interval_arg, interval3 ) + return interval_arg + + def __validate_interval_arg( self, value, multiple_of=None ): + if value and not str(value).isdigit(): + raise Exception( "Invalid interval argument supplied, must be integer %s" % value ) + if multiple_of: + if int( value ) % multiple_of != 0: + raise Exception( "Invalid interval argument supplied, must multiple of %s" % multiple_of ) + + def build_command_line( self ): + return COMMAND_LINE_TEMPLATE.substitute( **self.command_args ) + + def run( self, stdout=subprocess.PIPE, stderr=subprocess.PIPE ): + command_line = self.build_command_line() + log.info( "Executing %s" % command_line ) + proc = subprocess.Popen( command_line, shell=True, stdout=stdout, stderr=stderr ) + return_code = proc.wait() + if return_code: + raise Exception( "Problem running collectl command." ) + +__all__ = [ CollectlCli ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/collectl/processes.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/processes.py @@ -0,0 +1,252 @@ +""" Modules will run collectl in playback mode and collect various process +statistics for a given pid's process and process ancestors. +""" +import collections +import csv +import tempfile + +from galaxy import util + +from ..collectl import stats + +import logging +log = logging.getLogger( __name__ ) + +# Collectl process information cheat sheet: +# +# Record process information for current user. +# % collectl -sZ -f./__instrument_collectl -i 10:10 --procfilt U$USER +# +# TSV Replay of processing information in plottable mode... +# +# % collectl -sZ -P --sep=9 -p __instrument_collectl-jlaptop13-20140322-120919.raw.gz +# +# Has following columns: +# Date Time PID User PR PPID THRD S VmSize VmLck VmRSS VmData VmStk VmExe VmLib CPU SysT UsrT PCT AccumT RKB WKB RKBC WKBC RSYS WSYS CNCL MajF MinF Command +# + +# Process data dumped one row per process per interval. +#http://collectl.sourceforge.net/Data-detail.html +PROCESS_COLUMNS = [ + "#Date", # Date of interval - e.g. 20140322 + "Time", # Time of interval - 12:18:58 + "PID", # Process pid. + "User", # Process user. + "PR", # Priority of process. + "PPID", # Parent PID of process. + "THRD", # Thread??? + "S", # Process state - S - Sleeping, D - Uninterruptable Sleep, R - Running, Z - Zombie or T - Stopped/Traced + ## Memory options - http://ewx.livejournal.com/579283.html + "VmSize", + "VmLck", + "VmRSS", + "VmData", + "VmStk", + "VmExe", + "VmLib", + "CPU", # CPU number of process + "SysT", # Amount of system time consumed during interval + "UsrT", # Amount user time consumed during interval + "PCT", # Percentage of current interval consumed by task + "AccumT", # Total accumulated System and User time since the process began execution + # kilobytes read/written - requires I/O level monitoring to be enabled in kernel. + "RKB", # kilobytes read by process - requires I/O monitoring in kernel + "WKB", + "RKBC", + "WKBC", + "RSYS", # Number of read system calls + "WSYS", # Number of write system calls + "CNCL", + "MajF", # Number of major page faults + "MinF", # Number of minor page faults + "Command", # Command executed +] + +# Types of statistics this module can summarize +STATISTIC_TYPES = [ "max", "min", "sum", "count", "avg" ] + +COLUMN_INDICES = dict( [ ( col, i ) for i, col in enumerate( PROCESS_COLUMNS ) ] ) +PID_INDEX = COLUMN_INDICES[ "PID" ] +PARENT_PID_INDEX = COLUMN_INDICES[ "PPID" ] + +DEFAULT_STATISTICS = [ + ("max", "VmSize"), + ("avg", "VmSize"), + ("max", "VmRSS"), + ("avg", "VmRSS"), + ("sum", "SysT"), + ("sum", "UsrT"), + ("max", "PCT"), + ("avg", "PCT"), + ("max", "AccumT"), + ("sum", "RSYS"), + ("sum", "WSYS"), +] + + +def parse_process_statistics( statistics ): + """ Turn string or list of strings into list of tuples in format ( stat, + resource ) where stat is a value from STATISTIC_TYPES and resource is a + value from PROCESS_COLUMNS. + """ + if statistics is None: + statistics = DEFAULT_STATISTICS + + statistics = util.listify( statistics ) + statistics = map( __tuplize_statistic, statistics ) + # Check for validity... + for statistic in statistics: + if statistic[ 0 ] not in STATISTIC_TYPES: + raise Exception( "Unknown statistic type encountered %s" % statistic[ 0 ] ) + if statistic[ 1 ] not in PROCESS_COLUMNS: + raise Exception( "Unknown process column encountered %s" % statistic[ 1 ] ) + return statistics + + +def generate_process_statistics( collectl_playback_cli, pid, statistics=DEFAULT_STATISTICS ): + """ Playback collectl file and generate summary statistics. + """ + with tempfile.NamedTemporaryFile( ) as tmp_tsv: + collectl_playback_cli.run( stdout=tmp_tsv ) + with open( tmp_tsv.name, "r" ) as tsv_file: + return __read_process_statistics( tsv_file, pid, statistics ) + + +def __read_process_statistics( tsv_file, pid, statistics ): + process_summarizer = CollectlProcessSummarizer( pid, statistics ) + current_interval = None + + for row in csv.reader( tsv_file, dialect="excel-tab" ): + if current_interval is None: + for header, expected_header in zip( row, PROCESS_COLUMNS ): + if header.lower() != expected_header.lower(): + raise Exception( "Unknown header value encountered while processing collectl playback - %s" % header ) + + # First row, check contains correct header. + current_interval = CollectlProcessInterval() + continue + + if current_interval.row_is_in( row ): + current_interval.add_row( row ) + else: + process_summarizer.handle_interval( current_interval ) + current_interval = CollectlProcessInterval() + + # Do we have unsummarized rows... + if current_interval and current_interval.rows: + process_summarizer.handle_interval( current_interval ) + + return process_summarizer.get_statistics() + + +class CollectlProcessSummarizer( object ): + + def __init__( self, pid, statistics ): + self.pid = pid + self.statistics = statistics + self.columns_of_interest = set( [ s[ 1 ] for s in statistics ] ) + self.tree_statistics = collections.defaultdict( stats.StatisticsTracker ) + self.process_accum_statistics = collections.defaultdict( stats.StatisticsTracker ) + self.interval_count = 0 + + def handle_interval( self, interval ): + self.interval_count += 1 + rows = self.__rows_for_process( interval.rows, self.pid ) + for column_name in self.columns_of_interest: + column_index = COLUMN_INDICES[ column_name ] + + if column_name == "AccumT": + # Should not sum this across pids each interval, sum max at end... + for r in rows: + pid_seconds = self.__time_to_seconds( r[ column_index ] ) + self.process_accum_statistics[ r[ PID_INDEX ] ].track( pid_seconds ) + else: + # All other stastics should be summed across whole process tree + # at each interval I guess. + if column_name in [ "SysT", "UsrT", "PCT" ]: + to_num = float + else: + to_num = long + + interval_stat = sum( to_num( r[ column_index ] ) for r in rows ) + self.tree_statistics[ column_name ].track( interval_stat ) + + def get_statistics( self ): + if self.interval_count == 0: + return [] + + computed_statistics = [] + for statistic in self.statistics: + statistic_type, column = statistic + if column == "AccumT": + # Only thing that makes sense is sum + if statistic_type != "max": + log.warn( "Only statistic max makes sense for AccumT" ) + continue + + value = sum( [ v.max for v in self.process_accum_statistics.itervalues() ] ) + else: + statistics_tracker = self.tree_statistics[ column ] + value = getattr( statistics_tracker, statistic_type ) + + computed_statistic = ( statistic, value ) + computed_statistics.append( computed_statistic ) + + return computed_statistics + + def __rows_for_process( self, rows, pid ): + process_rows = [] + pids = self.__all_child_pids( rows, pid ) + for row in rows: + if row[ PID_INDEX ] in pids: + process_rows.append( row ) + return process_rows + + def __all_child_pids( self, rows, pid ): + pids_in_process_tree = set( [ str( self.pid ) ] ) + added = True + while added: + added = False + for row in rows: + pid = row[ PID_INDEX ] + parent_pid = row[ PARENT_PID_INDEX ] + if parent_pid in pids_in_process_tree and pid not in pids_in_process_tree: + pids_in_process_tree.add( pid ) + added = True + return pids_in_process_tree + + def __time_to_seconds( self, minutes_str ): + parts = minutes_str.split( ":" ) + seconds = 0.0 + for i, val in enumerate( parts ): + seconds += float(val) * ( 60 ** ( len( parts ) - ( i + 1 ) ) ) + return seconds + + +class CollectlProcessInterval( object ): + """ Represent all rows in collectl playback file for given time slice with + ability to filter out just rows corresponding to the process tree + corresponding to a given pid. + """ + + def __init__( self ): + self.rows = [] + + def row_is_in( self, row ): + if not self.rows: # No rows, this row defines interval. + return True + first_row = self.rows[ 0 ] + return first_row[ 0 ] == row[ 0 ] and first_row[ 1 ] == row[ 1 ] + + def add_row( self, row ): + self.rows.append( row ) + + +def __tuplize_statistic( statistic ): + if not isinstance( statistic, tuple ): + statistic_split = statistic.split( "_", 1 ) + statistic = ( statistic_split[ 0 ].lower(), statistic_split[ 1 ] ) + return statistic + + +__all__ = [ generate_process_statistics ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/collectl/stats.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/stats.py @@ -0,0 +1,27 @@ +""" Primitive module for tracking running statistics without storing values in +memory. +""" + + +class StatisticsTracker( object ): + + def __init__( self ): + self.min = None + self.max = None + self.count = 0 + self.sum = 0 + + def track( self, value ): + if self.min is None or value < self.min: + self.min = value + if self.max is None or value > self.max: + self.max = value + self.count += 1 + self.sum += value + + @property + def avg( self ): + if self.count > 0: + return self.sum / self.count + else: + return None diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/collectl/subsystems.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/collectl/subsystems.py @@ -0,0 +1,72 @@ +from abc import ABCMeta +from abc import abstractmethod + + +class CollectlSubsystem( object ): + """ Class providing an abstraction of collectl subsytems. + """ + __metaclass__ = ABCMeta + + @property + @abstractmethod + def command_line_arg( self ): + """ Return single letter command-line argument used by collectl CLI. + """ + + @property + @abstractmethod + def name( self, job_directory ): + """ High-level name for subsystem as consumed by this module. + """ + + +class ProcessesSubsystem( CollectlSubsystem ): + command_line_arg = "Z" + name = "process" + + +class CpuSubsystem( CollectlSubsystem ): + command_line_arg = "C" + name = "cpu" + + +class DiskSubsystem( CollectlSubsystem ): + command_line_arg = "D" + name = "disk" + + +class NetworkSubsystem( CollectlSubsystem ): + command_line_arg = "N" + name = "network" + + +class EnvironmentSubsystem( CollectlSubsystem ): + command_line_arg = "E" + name = "environment" + + +class MemorySubsystem( CollectlSubsystem ): + command_line_arg = "M" + name = "memory" + + +SUBSYSTEMS = [ + ProcessesSubsystem(), + CpuSubsystem(), + DiskSubsystem(), + NetworkSubsystem(), + EnvironmentSubsystem(), + MemorySubsystem(), +] +SUBSYSTEM_DICT = dict( [ (s.name, s) for s in SUBSYSTEMS ] ) + + +def get_subsystem( name ): + """ + + >>> get_subsystem( "process" ).command_line_arg == "Z" + True + """ + return SUBSYSTEM_DICT[ name ] + +__all__ = [ get_subsystem ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/formatting.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/formatting.py @@ -0,0 +1,18 @@ + + +class JobMetricFormatter( object ): + """ Format job metric key-value pairs for human consumption in Web UI. """ + + def format( self, key, value ): + return ( str( key ), str( value ) ) + + +## Formatting utilities + +def seconds_to_str( value ): + if value < 60: + return "%s seconds" % value + elif value < 3600: + return "%s minutes" % ( value / 60 ) + else: + return "%s days and %s minutes" % ( value / 3600, ( value % 3600 ) / 60 ) diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/__init__.py @@ -0,0 +1,53 @@ +from abc import ABCMeta +from abc import abstractmethod + +import os.path + +from ...metrics import formatting + + +INSTRUMENT_FILE_PREFIX = "__instrument" + + +class InstrumentPlugin( object ): + """ A plugin describing how to instrument Galaxy jobs and retrieve metrics + from this instrumentation. + """ + __metaclass__ = ABCMeta + formatter = formatting.JobMetricFormatter() + + @property + @abstractmethod + def plugin_type( self ): + """ Short string providing labelling this plugin """ + + def pre_execute_instrument( self, job_directory ): + """ Optionally return one or more commands to instrument job. These + commands will be executed on the compute server prior to the job + running. + """ + return None + + def post_execute_instrument( self, job_directory ): + """ Optionally return one or more commands to instrument job. These + commands will be executed on the compute server after the tool defined + command is ran. + """ + return None + + @abstractmethod + def job_properties( self, job_id, job_directory ): + """ Collect properties for this plugin from specified job directory. + This method will run on the Galaxy server and can assume files created + in job_directory with pre_execute_instrument and + post_execute_instrument are available. + """ + + def _instrument_file_name( self, name ): + """ Provide a common pattern for naming files used by instrumentation + plugins - to ease their staging out of remote job directories. + """ + return "%s_%s_%s" % ( INSTRUMENT_FILE_PREFIX, self.plugin_type, name ) + + def _instrument_file_path( self, job_directory, name ): + return os.path.join( job_directory, self._instrument_file_name( name ) ) diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/collectl.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/collectl.py @@ -0,0 +1,214 @@ +import os +import shutil + +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting +from ..collectl import subsystems +from ..collectl import cli +from ..collectl import processes + +from galaxy import util +from galaxy.util import directory_hash + +import logging +log = logging.getLogger( __name__ ) + +DEFAULT_PROCFILT_ON = "username" # By default, only grab statistics for user + # processes (as identifiers by username). +DEFAULT_SUBSYSTEMS = "process" +DEFAULT_FLUSH_INTERVAL = "0" # Set to zero to flush every collection. + +FORMATTED_RESOURCE_TITLES = { + "PCT": "Percent CPU Usage", + "RSYS": "Disk Reads", + "WSYS": "Disk Writes", +} + +EMPTY_COLLECTL_FILE_MESSAGE = "Skipping process summary due to empty file... job probably did not run long enough for collectl to gather data." + + +class CollectlFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + if key == "pid": + return ( "Process ID", int( value ) ) + elif key == "raw_log_path": + return ( "Relative Path of Full Collectl Log", value ) + elif key == "process_max_AccumT": + return ( "Job Runtime (System+User)", formatting.seconds_to_str( float( value ) ) ) + else: + _, stat_type, resource_type = key.split( "_", 2 ) + if resource_type.startswith( "Vm"): + value_str = "%s KB" % int( value ) + elif resource_type in [ "RSYS", "WSYS" ] and stat_type in [ "count", "max", "sum" ]: + value_str = "%d (# system calls)" % int( value ) + else: + value_str = str( value ) + resource_title = FORMATTED_RESOURCE_TITLES.get( resource_type, resource_type ) + return ( "%s (%s)" % ( resource_title, stat_type ), value_str ) + + +class CollectlPlugin( InstrumentPlugin ): + """ Run collectl along with job to capture system and/or process data + according to specified collectl subsystems. + """ + plugin_type = "collectl" + formatter = CollectlFormatter() + + def __init__( self, **kwargs ): + self.__configure_paths( kwargs ) + self.__configure_subsystems( kwargs ) + saved_logs_path = kwargs.get( "saved_logs_path", None ) + if "app" in kwargs: + saved_logs_path = kwargs[ "app" ].config.resolve_path( saved_logs_path ) + self.saved_logs_path = saved_logs_path + self.__configure_collectl_recorder_args( kwargs ) + self.summarize_process_data = util.asbool( kwargs.get( "summarize_process_data", True ) ) + self.log_collectl_program_output = util.asbool( kwargs.get( "log_collectl_program_output", False ) ) + if self.summarize_process_data: + if subsystems.get_subsystem( "process" ) not in self.subsystems: + raise Exception( "Collectl plugin misconfigured - cannot summarize_process_data without process subsystem being enabled." ) + + process_statistics = kwargs.get( "process_statistics", None ) + # None will let processes module use default set of statistics + # defined there. + self.process_statistics = processes.parse_process_statistics( process_statistics ) + + def pre_execute_instrument( self, job_directory ): + commands = [] + # Capture PID of process so we can walk its ancestors when building + # statistics for the whole job. + commands.append( '''echo "$$" > '%s' ''' % self.__pid_file( job_directory ) ) + # Run collectl in record mode to capture process and system level + # statistics according to supplied subsystems. + commands.append( self.__collectl_record_command( job_directory ) ) + return commands + + def post_execute_instrument( self, job_directory ): + commands = [] + # collectl dies when job script completes, perhaps capture pid of + # collectl above and check if it is still alive to allow tracking if + # collectl ran successfully through the whole job. + return commands + + def job_properties( self, job_id, job_directory ): + pid = open( self.__pid_file( job_directory ), "r" ).read().strip() + contents = os.listdir( job_directory ) + try: + rel_path = filter( self._is_instrumented_collectl_log, contents )[ 0 ] + path = os.path.join( job_directory, rel_path ) + except IndexError: + message = "Failed to find collectl log in directory %s, files were %s" % ( job_directory, contents ) + raise Exception( message ) + + properties = dict( + pid=int( pid ), + ) + + if self.saved_logs_path: + destination_rel_dir = os.path.join( *directory_hash.directory_hash_id( job_id ) ) + destination_rel_path = os.path.join( destination_rel_dir, rel_path ) + destination_path = os.path.join( self.saved_logs_path, destination_rel_path ) + destination_dir = os.path.dirname( destination_path ) + if not os.path.isdir( destination_dir ): + os.makedirs( destination_dir ) + shutil.copyfile( path, destination_path ) + properties[ "raw_log_path" ] = destination_rel_path + + if self.summarize_process_data: + # Run collectl in playback and generate statistics of interest + summary_statistics = self.__summarize_process_data( pid, path ) + for statistic, value in summary_statistics: + properties[ "process_%s" % "_".join( statistic ) ] = value + + return properties + + def __configure_paths( self, kwargs ): + # 95% of time I would expect collectl to just be installed with apt or + # yum, but if it is manually installed on not on path, allow + # configuration of explicit path - and allow path to be different + # between galaxy job handler (local_collectl_path) and compute node + # (remote_collectl_path). + collectl_path = kwargs.get( "collectl_path", "collectl" ) + self.remote_collectl_path = kwargs.get( "remote_collectl_path", collectl_path ) + self.local_collectl_path = kwargs.get( "local_collectl_path", collectl_path ) + + def __configure_subsystems( self, kwargs ): + raw_subsystems_str = kwargs.get( "subsystems", DEFAULT_SUBSYSTEMS ) + raw_subsystems = util.listify( raw_subsystems_str, do_strip=True ) + self.subsystems = map( subsystems.get_subsystem, raw_subsystems ) + + def __configure_collectl_recorder_args( self, kwargs ): + collectl_recorder_args = kwargs.copy() + + # Allow deployer to configure separate system and process intervals, + # but if they specify just one - use it for both. Thinking here is this + # plugin's most useful feature is the process level information so + # this is likely what the deployer is attempting to configure. + if "interval" in kwargs and "interval2" not in kwargs: + collectl_recorder_args[ "interval2" ] = kwargs[ "interval"] + + if "flush" not in kwargs: + collectl_recorder_args[ "flush" ] = DEFAULT_FLUSH_INTERVAL + + procfilt_on = kwargs.get( "procfilt_on", DEFAULT_PROCFILT_ON ).lower() + # Calculate explicit arguments, rest can just be passed through from + # constructor arguments. + explicit_args = dict( + collectl_path=self.remote_collectl_path, + procfilt=procfilt_argument( procfilt_on ), + subsystems=self.subsystems, + ) + collectl_recorder_args.update( explicit_args ) + self.collectl_recorder_args = collectl_recorder_args + + def __summarize_process_data( self, pid, collectl_log_path ): + playback_cli_args = dict( + collectl_path=self.local_collectl_path, + playback_path=collectl_log_path, + sep="9" + ) + if not os.stat( collectl_log_path ).st_size: + log.debug( EMPTY_COLLECTL_FILE_MESSAGE ) + return [ ] + + playback_cli = cli.CollectlCli( **playback_cli_args ) + return processes.generate_process_statistics( playback_cli, pid, self.process_statistics ) + + def __collectl_recorder_cli( self, job_directory ): + cli_args = self.collectl_recorder_args.copy() + cli_args[ "destination_path" ] = self._instrument_file_path( job_directory, "log" ) + return cli.CollectlCli( **cli_args ) + + def __collectl_record_command( self, job_directory ): + collectl_cli = self.__collectl_recorder_cli( job_directory ) + if self.log_collectl_program_output: + redirect_to = self._instrument_file_path( job_directory, "program_output" ) + else: + redirect_to = "/dev/null" + return "%s > %s 2>&1 &" % ( + collectl_cli.build_command_line(), + redirect_to, + ) + + def __pid_file( self, job_directory ): + return self._instrument_file_path( job_directory, "pid" ) + + def _is_instrumented_collectl_log( self, filename ): + prefix = self._instrument_file_name( "log" ) + return filename.startswith( prefix ) and filename.endswith( ".raw.gz" ) + + +def procfilt_argument( procfilt_on ): + if procfilt_on == "username": + return "U$USER" + elif procfilt_on == "uid": + return "u$UID" + else: + # Ensure it is empty of None + if procfilt_on or procfilt_on.lower() != "none": + raise Exception( "Invalid procfilt_on argument encountered") + return "" + + +__all__ = [ CollectlPlugin ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/core.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/core.py @@ -0,0 +1,85 @@ +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting +import time + +import logging +log = logging.getLogger( __name__ ) + +GALAXY_SLOTS_KEY = "galaxy_slots" +START_EPOCH_KEY = "start_epoch" +END_EPOCH_KEY = "end_epoch" +RUNTIME_SECONDS_KEY = "runtime_seconds" + + +class CorePluginFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + value = int( value ) + if key == GALAXY_SLOTS_KEY: + return ( "Cores Allocated", "%d" % value ) + elif key == RUNTIME_SECONDS_KEY: + return ( "Job Runtime (Wall Clock)", formatting.seconds_to_str( value ) ) + else: + # TODO: Use localized version of this from universe_wsgi.ini + title = "Job Start Time" if key == START_EPOCH_KEY else "Job End Time" + return (title, time.strftime( '%Y-%m-%d %H:%M:%S', time.localtime( value ) ) ) + + +class CorePlugin( InstrumentPlugin ): + """ Simple plugin that collects data without external dependencies. In + particular it currently collects value set for Galaxy slots. + """ + plugin_type = "core" + formatter = CorePluginFormatter() + + def __init__( self, **kwargs ): + pass + + def pre_execute_instrument( self, job_directory ): + commands = [] + commands.append( self.__record_galaxy_slots_command( job_directory ) ) + commands.append( self.__record_seconds_since_epoch_to_file( job_directory, "start" ) ) + return commands + + def post_execute_instrument( self, job_directory ): + commands = [] + commands.append( self.__record_seconds_since_epoch_to_file( job_directory, "end" ) ) + return commands + + def job_properties( self, job_id, job_directory ): + galaxy_slots_file = self.__galaxy_slots_file( job_directory ) + + properties = {} + properties[ GALAXY_SLOTS_KEY ] = self.__read_integer( galaxy_slots_file ) + start = self.__read_seconds_since_epoch( job_directory, "start" ) + end = self.__read_seconds_since_epoch( job_directory, "end" ) + if start is not None and end is not None: + properties[ START_EPOCH_KEY ] = start + properties[ END_EPOCH_KEY ] = end + properties[ RUNTIME_SECONDS_KEY ] = end - start + return properties + + def __record_galaxy_slots_command( self, job_directory ): + galaxy_slots_file = self.__galaxy_slots_file( job_directory ) + return '''echo "$GALAXY_SLOTS" > '%s' ''' % galaxy_slots_file + + def __record_seconds_since_epoch_to_file( self, job_directory, name ): + path = self._instrument_file_path( job_directory, "epoch_%s" % name ) + return 'date +"%s" > ' + path + + def __read_seconds_since_epoch( self, job_directory, name ): + path = self._instrument_file_path( job_directory, "epoch_%s" % name ) + return self.__read_integer( path ) + + def __galaxy_slots_file( self, job_directory ): + return self._instrument_file_path( job_directory, "galaxy_slots" ) + + def __read_integer( self, path ): + value = None + try: + value = int( open( path, "r" ).read() ) + except Exception: + pass + return value + +__all__ = [ CorePlugin ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/cpuinfo.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/cpuinfo.py @@ -0,0 +1,62 @@ +import re + +from galaxy import util + +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + +import logging +log = logging.getLogger( __name__ ) + +PROCESSOR_LINE = re.compile(r"processor\s*\:\s*(\d+)") + + +class CpuInfoFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + if key == "processor_count": + return "Processor Count", "%s" % int( value ) + else: + return key, value + + +class CpuInfoPlugin( InstrumentPlugin ): + """ Gather information about processor configuration from /proc/cpuinfo. + Linux only. + """ + plugin_type = "cpuinfo" + formatter = CpuInfoFormatter() + + def __init__( self, **kwargs ): + self.verbose = util.asbool( kwargs.get( "verbose", False ) ) + + def pre_execute_instrument( self, job_directory ): + return "cat /proc/cpuinfo > '%s'" % self.__instrument_cpuinfo_path( job_directory ) + + def job_properties( self, job_id, job_directory ): + properties = {} + processor_count = 0 + with open( self.__instrument_cpuinfo_path( job_directory ) ) as f: + current_processor = None + for line in f: + line = line.strip().lower() + if not line: # Skip empty lines + continue + + processor_line_match = PROCESSOR_LINE.match( line ) + if processor_line_match: + processor_count += 1 + current_processor = processor_line_match.group( 1 ) + elif current_processor and self.verbose: + # If verbose, dump information about each processor + # into database... + key, value = line.split( ":", 1 ) + key = "processor_%s_%s" % ( current_processor, key.strip() ) + value = value + properties[ "processor_count" ] = processor_count + return properties + + def __instrument_cpuinfo_path( self, job_directory ): + return self._instrument_file_path( job_directory, "cpuinfo" ) + +__all__ = [ CpuInfoPlugin ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/env.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/env.py @@ -0,0 +1,54 @@ +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + +import logging +log = logging.getLogger( __name__ ) + + +class EnvFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + return ( "%s (runtime environment variable)" % key, value ) + + +class EnvPlugin( InstrumentPlugin ): + """ Instrumentation plugin capable of recording all or specific environment + variables for a job at runtime. + """ + plugin_type = "env" + formatter = EnvFormatter() + + def __init__( self, **kwargs ): + variables_str = kwargs.get( "variables", None ) + if variables_str: + variables = [ v.strip() for v in variables_str.split(",") ] + else: + variables = None + self.variables = variables + + def pre_execute_instrument( self, job_directory ): + """ Use env to dump all environment variables to a file. + """ + return "env > '%s'" % self.__env_file( job_directory ) + + def post_execute_instrument( self, job_directory ): + return None + + def job_properties( self, job_id, job_directory ): + """ Recover environment variables dumped out on compute server and filter + out specific variables if needed. + """ + variables = self.variables + + properties = {} + for line in open( self.__env_file( job_directory ) ).readlines(): + var, value = line.split( "=", 1 ) + if not variables or var in variables: + properties[ var ] = value + + return properties + + def __env_file( self, job_directory ): + return self._instrument_file_path( job_directory, "vars" ) + +__all__ = [ EnvPlugin ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/meminfo.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/meminfo.py @@ -0,0 +1,59 @@ +import re + +from galaxy import util + +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + +MEMINFO_LINE = re.compile(r"(\w+)\s*\:\s*(\d+) kB") + +# Important (non-verbose) meminfo property titles. +MEMINFO_TITLES = { + "memtotal": "Total System Memory", + "swaptotal": "Total System Swap" +} + + +class MemInfoFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + title = MEMINFO_TITLES.get( key, key ) + return title, util.nice_size( value * 1000 ) # kB = *1000, KB = *1024 - wikipedia + + +class MemInfoPlugin( InstrumentPlugin ): + """ Gather information about processor configuration from /proc/cpuinfo. + Linux only. + """ + plugin_type = "meminfo" + formatter = MemInfoFormatter() + + def __init__( self, **kwargs ): + self.verbose = util.asbool( kwargs.get( "verbose", False ) ) + + def pre_execute_instrument( self, job_directory ): + return "cat /proc/meminfo > '%s'" % self.__instrument_meminfo_path( job_directory ) + + def job_properties( self, job_id, job_directory ): + properties = {} + with open( self.__instrument_meminfo_path( job_directory ) ) as f: + for line in f: + line = line.strip() + if not line: # Skip empty lines + continue + line_match = MEMINFO_LINE.match( line ) + if not line_match: + continue + key = line_match.group( 1 ).lower() + # By default just grab important meminfo properties with titles + # defined for formatter. Grab everything in verbose mode for + # an arbitrary snapshot of memory at beginning of run. + if key in MEMINFO_TITLES or self.verbose: + value = long( line_match.group( 2 ) ) + properties[ key ] = value + return properties + + def __instrument_meminfo_path( self, job_directory ): + return self._instrument_file_path( job_directory, "meminfo" ) + +__all__ = [ MemInfoPlugin ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/metrics/instrumenters/uname.py --- /dev/null +++ b/lib/galaxy/jobs/metrics/instrumenters/uname.py @@ -0,0 +1,34 @@ +from ..instrumenters import InstrumentPlugin +from ...metrics import formatting + + +class UnameFormatter( formatting.JobMetricFormatter ): + + def format( self, key, value ): + return "Operating System", value + + +class UnamePlugin( InstrumentPlugin ): + """ Use uname to gather operating system information about remote system + job is running on. Linux only. + """ + plugin_type = "uname" + formatter = UnameFormatter() + + def __init__( self, **kwargs ): + self.uname_args = kwargs.get( "args", "-a" ) + + def pre_execute_instrument( self, job_directory ): + return "uname %s > '%s'" % ( self.uname_args, self.__instrument_uname_path( job_directory ) ) + + def job_properties( self, job_id, job_directory ): + properties = {} + with open( self.__instrument_uname_path( job_directory ) ) as f: + properties[ "uname" ] = f.read() + return properties + + def __instrument_uname_path( self, job_directory ): + return self._instrument_file_path( job_directory, "uname" ) + + +__all__ = [ UnamePlugin ] diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -251,7 +251,10 @@ log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) def get_job_file(self, job_wrapper, **kwds): + job_metrics = job_wrapper.app.job_metrics + job_instrumenter = job_metrics.job_instrumenters[ job_wrapper.job_destination.id ] options = dict( + job_instrumenter=job_instrumenter, galaxy_lib=job_wrapper.galaxy_lib_dir, env_setup_commands=job_wrapper.get_env_setup_clause(), working_directory=os.path.abspath( job_wrapper.working_directory ), diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh --- a/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -12,6 +12,8 @@ export PYTHONPATH fi $env_setup_commands +$instrument_pre_commands cd $working_directory $command echo $? > $exit_code_path +$instrument_post_commands diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/jobs/runners/util/job_script/__init__.py --- a/lib/galaxy/jobs/runners/util/job_script/__init__.py +++ b/lib/galaxy/jobs/runners/util/job_script/__init__.py @@ -18,6 +18,8 @@ 'headers': '', 'env_setup_commands': '', 'slots_statement': SLOTS_STATEMENT_CLUSTER_DEFAULT, + 'instrument_pre_commands': '', + 'instrument_post_commands': '', } @@ -47,6 +49,13 @@ """ if any([param not in kwds for param in REQUIRED_TEMPLATE_PARAMS]): raise Exception("Failed to create job_script, a required parameter is missing.") + job_instrumenter = kwds.get("job_instrumenter", None) + if job_instrumenter: + del kwds[ "job_instrumenter" ] + working_directory = kwds["working_directory"] + kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or '' + kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or '' + template_params = OPTIONAL_TEMPLATE_PARAMS.copy() template_params.update(**kwds) if not isinstance(template, Template): diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -17,6 +17,7 @@ import json import socket import time +import numbers from uuid import UUID, uuid4 from string import Template from itertools import ifilter @@ -87,6 +88,26 @@ return name +class HasJobMetrics: + + def _init_metrics( self ): + self.text_metrics = [] + self.numeric_metrics = [] + + def add_metric( self, plugin, metric_name, metric_value ): + if isinstance( metric_value, numbers.Number ): + metric = self._numeric_metric( plugin, metric_name, metric_value ) + self.numeric_metrics.append( metric ) + else: + metric = self._text_metric( plugin, metric_name, metric_value ) + self.text_metrics.append( metric ) + + @property + def metrics( self ): + # TODO: Make iterable, concatenate with chain + return self.text_metrics + self.numeric_metrics + + class User( object, Dictifiable ): use_pbkdf2 = True """ @@ -226,7 +247,31 @@ return Template( in_string ).safe_substitute( environment ) -class Job( object, Dictifiable ): +class BaseJobMetric( object ): + + def __init__( self, plugin, metric_name, metric_value ): + self.plugin = plugin + self.metric_name = metric_name + self.metric_value = metric_value + + +class JobMetricText( BaseJobMetric ): + pass + + +class JobMetricNumeric( BaseJobMetric ): + pass + + +class TaskMetricText( BaseJobMetric ): + pass + + +class TaskMetricNumeric( BaseJobMetric ): + pass + + +class Job( object, HasJobMetrics, Dictifiable ): dict_collection_visible_keys = [ 'id', 'state', 'exit_code', 'update_time', 'create_time' ] dict_element_visible_keys = [ 'id', 'state', 'exit_code', 'update_time', 'create_time' ] @@ -234,6 +279,9 @@ A job represents a request to run a tool given input datasets, tool parameters, and output datasets. """ + _numeric_metric = JobMetricNumeric + _text_metric = JobMetricText + states = Bunch( NEW = 'new', UPLOAD = 'upload', WAITING = 'waiting', @@ -267,6 +315,7 @@ self.imported = False self.handler = None self.exit_code = None + self._init_metrics() @property def finished( self ): @@ -370,6 +419,12 @@ self.info = info def set_runner_name( self, job_runner_name ): self.job_runner_name = job_runner_name + + def get_job( self ): + # Added so job and task have same interface (.get_job() ) to get at + # underlying job object. + return self + def set_runner_external_id( self, job_runner_external_id ): self.job_runner_external_id = job_runner_external_id def set_post_job_actions( self, post_job_actions ): @@ -472,10 +527,14 @@ return rval -class Task( object ): + +class Task( object, HasJobMetrics ): """ A task represents a single component of a job. """ + _numeric_metric = TaskMetricNumeric + _text_metric = TaskMetricText + states = Bunch( NEW = 'new', WAITING = 'waiting', QUEUED = 'queued', @@ -498,6 +557,7 @@ self.stderr = "" self.exit_code = None self.prepare_input_files_cmd = prepare_files_cmd + self._init_metrics() def get_param_values( self, app ): """ @@ -608,6 +668,7 @@ def set_prepare_input_files_cmd( self, prepare_input_files_cmd ): self.prepare_input_files_cmd = prepare_input_files_cmd + class JobParameter( object ): def __init__( self, name, value ): self.name = name diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -470,6 +470,52 @@ Column( "archive_dir", TEXT ) ) + +JOB_METRIC_MAX_LENGTH = 1023 + +model.JobMetricText.table = Table( + "job_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(JOB_METRIC_MAX_LENGTH), ), +) + +model.TaskMetricText.table = Table( + "task_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(JOB_METRIC_MAX_LENGTH), ), +) + + +model.JobMetricNumeric.table = Table( + "job_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + +model.TaskMetricNumeric.table = Table( + "task_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + model.GenomeIndexToolData.table = Table( "genome_index_tool_data", metadata, Column( "id", Integer, primary_key=True ), Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), @@ -1569,6 +1615,26 @@ job=relation( model.Job ), dataset=relation( model.LibraryDatasetDatasetAssociation, lazy=False ) ) ) +simple_mapping( + model.JobMetricText, + job=relation( model.Job, backref="text_metrics" ), +) + +simple_mapping( + model.TaskMetricText, + task=relation( model.Task, backref="text_metrics" ), +) + +simple_mapping( + model.JobMetricNumeric, + job=relation( model.Job, backref="numeric_metrics" ), +) + +simple_mapping( + model.TaskMetricNumeric, + task=relation( model.Task, backref="numeric_metrics" ), +) + mapper( model.JobParameter, model.JobParameter.table ) mapper( model.JobExternalOutputMetadata, model.JobExternalOutputMetadata.table, diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 lib/galaxy/model/migrate/versions/0119_job_metrics.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0119_job_metrics.py @@ -0,0 +1,102 @@ +""" +Migration script for job metric plugins. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import datetime +now = datetime.datetime.utcnow + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData() + +TEXT_METRIC_MAX_LENGTH = 1023 + +JobMetricText_table = Table( + "job_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(TEXT_METRIC_MAX_LENGTH), ), +) + + +TaskMetricText_table = Table( + "task_metric_text", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Unicode(TEXT_METRIC_MAX_LENGTH), ), +) + + +JobMetricNumeric_table = Table( + "job_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + +TaskMetricNumeric_table = Table( + "task_metric_numeric", + metadata, + Column( "id", Integer, primary_key=True ), + Column( "task_id", Integer, ForeignKey( "task.id" ), index=True ), + Column( "plugin", Unicode(255), ), + Column( "metric_name", Unicode(255), ), + Column( "metric_value", Numeric( 22, 7 ), ), +) + + +TABLES = [ + JobMetricText_table, + TaskMetricText_table, + JobMetricNumeric_table, + TaskMetricNumeric_table, +] + + +def upgrade( migrate_engine ): + metadata.bind = migrate_engine + print __doc__ + metadata.reflect() + + for table in TABLES: + __create(table) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + for table in TABLES: + __drop(table) + + +def __create(table): + try: + table.create() + except Exception as e: + print str(e) + log.debug("Creating %s table failed: %s" % (table.name, str( e ) ) ) + + +def __drop(table): + try: + table.drop() + except Exception as e: + print str(e) + log.debug("Dropping %s table failed: %s" % (table.name, str( e ) ) ) diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 templates/show_params.mako --- a/templates/show_params.mako +++ b/templates/show_params.mako @@ -126,6 +126,13 @@ %if job and job.command_line and trans.user_is_admin(): <tr><td>Job Command-Line:</td><td>${ job.command_line | h }</td></tr> %endif + %if job and trans.user_is_admin(): + <% job_metrics = trans.app.job_metrics %> + %for metric in job.metrics: + <% metric_title, metric_value = job_metrics.format( metric.plugin, metric.metric_name, metric.metric_value ) %> + <tr><td>${ metric_title | h }</td><td>${ metric_value | h }</td></tr> + %endfor + %endif </table><br /> diff -r bc3fd881b07f5904ccad6d2d187e7d8e3c506652 -r b24137a3a980cf913ba1b5ae023623fb416b54e6 test/unit/test_galaxy_mapping.py --- a/test/unit/test_galaxy_mapping.py +++ b/test/unit/test_galaxy_mapping.py @@ -210,6 +210,46 @@ assert hist1.name == "History 2b" # gvk TODO need to ad test for GalaxySessions, but not yet sure what they should look like. + def test_jobs( self ): + model = self.model + u = model.User( email="jobtest@foo.bar.baz", password="password" ) + job = model.Job() + job.user = u + job.tool_id = "cat1" + + self.persist( u, job ) + + loaded_job = model.session.query( model.Job ).filter( model.Job.user == u ).first() + assert loaded_job.tool_id == "cat1" + + def test_job_metrics( self ): + model = self.model + u = model.User( email="jobtest@foo.bar.baz", password="password" ) + job = model.Job() + job.user = u + job.tool_id = "cat1" + + job.add_metric( "gx", "galaxy_slots", 5 ) + job.add_metric( "system", "system_name", "localhost" ) + + self.persist( u, job ) + + task = model.Task( job=job, working_directory="/tmp", prepare_files_cmd="split.sh" ) + task.add_metric( "gx", "galaxy_slots", 5 ) + task.add_metric( "system", "system_name", "localhost" ) + self.persist( task ) + + def test_tasks( self ): + model = self.model + u = model.User( email="jobtest@foo.bar.baz", password="password" ) + job = model.Job() + task = model.Task( job=job, working_directory="/tmp", prepare_files_cmd="split.sh" ) + job.user = u + self.persist( u, job, task ) + + loaded_task = model.session.query( model.Task ).filter( model.Task.job == job ).first() + assert loaded_task.prepare_input_files_cmd == "split.sh" + def test_history_contents( self ): model = self.model u = model.User( email="contents@foo.bar.baz", password="password" ) https://bitbucket.org/galaxy/galaxy-central/commits/e58259b27d22/ Changeset: e58259b27d22 User: jmchilton Date: 2014-04-23 04:25:04 Summary: Job metrics - ensure all text values are unicode before persisting. Gets rid of the following warning: "SAWarning: Unicode type received non-unicode bind param value.". Affected #: 1 file diff -r b24137a3a980cf913ba1b5ae023623fb416b54e6 -r e58259b27d229b5bdfe3e70772cef94804dda03c lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -95,10 +95,18 @@ self.numeric_metrics = [] def add_metric( self, plugin, metric_name, metric_value ): + if isinstance( plugin, str ): + plugin = unicode( plugin, 'utf-8' ) + + if isinstance( metric_name, str ): + metric_name = unicode( metric_name, 'utf-8' ) + if isinstance( metric_value, numbers.Number ): metric = self._numeric_metric( plugin, metric_name, metric_value ) self.numeric_metrics.append( metric ) else: + if isinstance( metric_value, str ): + metric_value = unicode( metric_value, 'utf-8' ) metric = self._text_metric( plugin, metric_name, metric_value ) self.text_metrics.append( metric ) https://bitbucket.org/galaxy/galaxy-central/commits/26575805a77b/ Changeset: 26575805a77b User: jmchilton Date: 2014-04-23 04:40:08 Summary: Fix test_job_configuration.py for job metrics PR. Still need to go in and actually write some tests for the nested parsing that can happen. Affected #: 1 file diff -r e58259b27d229b5bdfe3e70772cef94804dda03c -r 26575805a77b61e893a6601401499c970f156a0c test/unit/jobs/test_job_configuration.py --- a/test/unit/jobs/test_job_configuration.py +++ b/test/unit/jobs/test_job_configuration.py @@ -22,7 +22,7 @@ use_tasked_jobs=False, ) self.__write_config_from( SIMPLE_JOB_CONF ) - self.app = bunch.Bunch( config=self.config ) + self.app = bunch.Bunch( config=self.config, job_metrics=MockJobMetrics() ) self.__job_configuration = None def tearDown( self ): @@ -111,6 +111,8 @@ assert limits.concurrent_jobs[ "longjobs" ] == 1 assert limits.walltime_delta == datetime.timedelta( 0, 0, 0, 0, 0, 24 ) + # TODO: Add job metrics parsing test. + @property def job_config( self ): if not self.__job_configuration: @@ -126,3 +128,12 @@ def __write_config( self, contents ): with open( os.path.join( self.temp_directory, "job_conf.xml" ), "w" ) as f: f.write( contents ) + + +class MockJobMetrics( object ): + + def __init__( self ): + pass + + def set_destination_conf_element( self, id, element ): + pass Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org