commit/galaxy-central: natefoo: Add a dataset cleanup script that's optimized for (and compatible only with) PostgreSQL 9.1+. Also add a bunch of tables for logging all cleanup events, which makes reversing such events trivial.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/e0975c9b9bd2/ changeset: e0975c9b9bd2 user: natefoo date: 2012-09-10 18:03:23 summary: Add a dataset cleanup script that's optimized for (and compatible only with) PostgreSQL 9.1+. Also add a bunch of tables for logging all cleanup events, which makes reversing such events trivial. affected #: 2 files diff -r 9207ba35fa0caad1f8eac717afceb50f5c2a61c0 -r e0975c9b9bd25d44cfcacc202470bb8037da6d6f lib/galaxy/model/migrate/versions/0105_add_cleanup_event_table.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0105_add_cleanup_event_table.py @@ -0,0 +1,83 @@ +""" +Migration script to add the cleanup_event* tables. +""" +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * +import sys, logging +from galaxy.model.custom_types import * +from sqlalchemy.exc import * +import datetime +now = datetime.datetime.utcnow + +log = logging.getLogger( __name__ ) +log.setLevel( logging.DEBUG ) +handler = logging.StreamHandler( sys.stdout ) +format = "%(name)s %(levelname)s %(asctime)s %(message)s" +formatter = logging.Formatter( format ) +handler.setFormatter( formatter ) +log.addHandler( handler ) + +metadata = MetaData( migrate_engine ) + +# New table to log cleanup events +CleanupEvent_table = Table( "cleanup_event", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "message", TrimmedString( 1024 ) ) ) + +CleanupEventDatasetAssociation_table = Table( "cleanup_event_dataset_association", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "cleanup_event_id", Integer, ForeignKey( "cleanup_event.id" ), index=True, nullable=True ), + Column( "dataset_id", Integer, ForeignKey( "dataset.id" ), index=True ) ) + +CleanupEventMetadataFileAssociation_table = Table( "cleanup_event_metadata_file_association", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "cleanup_event_id", Integer, ForeignKey( "cleanup_event.id" ), index=True, nullable=True ), + Column( "metadata_file_id", Integer, ForeignKey( "metadata_file.id" ), index=True ) ) + +CleanupEventHistoryAssociation_table = Table( "cleanup_event_history_association", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "cleanup_event_id", Integer, ForeignKey( "cleanup_event.id" ), index=True, nullable=True ), + Column( "history_id", Integer, ForeignKey( "history.id" ), index=True ) ) + +CleanupEventHistoryDatasetAssociationAssociation_table = Table( "cleanup_event_hda_association", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "cleanup_event_id", Integer, ForeignKey( "cleanup_event.id" ), index=True, nullable=True ), + Column( "hda_id", Integer, ForeignKey( "history_dataset_association.id" ), index=True ) ) + +CleanupEventImplicitlyConvertedDatasetAssociationAssociation_table = Table( "cleanup_event_icda_association", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "cleanup_event_id", Integer, ForeignKey( "cleanup_event.id" ), index=True, nullable=True ), + Column( "icda_id", Integer, ForeignKey( "implicitly_converted_dataset_association.id" ), index=True ) ) + +def upgrade(): + print __doc__ + metadata.reflect() + try: + CleanupEvent_table.create() + CleanupEventDatasetAssociation_table.create() + CleanupEventMetadataFileAssociation_table.create() + CleanupEventHistoryAssociation_table.create() + CleanupEventHistoryDatasetAssociationAssociation_table.create() + CleanupEventImplicitlyConvertedDatasetAssociationAssociation_table.create() + except Exception, e: + log.debug( "Creating table failed: %s" % str( e ) ) + +def downgrade(): + metadata.reflect() + try: + CleanupEventImplicitlyConvertedDatasetAssociationAssociation_table.drop() + CleanupEventHistoryDatasetAssociationAssociation_table.drop() + CleanupEventHistoryAssociation_table.drop() + CleanupEventMetadataFileAssociation_table.drop() + CleanupEventDatasetAssociation_table.drop() + CleanupEvent_table.drop() + except Exception, e: + log.debug( "Dropping table failed: %s" % str( e ) ) diff -r 9207ba35fa0caad1f8eac717afceb50f5c2a61c0 -r e0975c9b9bd25d44cfcacc202470bb8037da6d6f scripts/cleanup_datasets/pgcleanup.py --- /dev/null +++ b/scripts/cleanup_datasets/pgcleanup.py @@ -0,0 +1,707 @@ +#!/usr/bin/env python +""" +pgcleanup.py - A script for cleaning up datasets in Galaxy efficiently, by + bypassing the Galaxy model and operating directly on the database. + PostgreSQL 9.1 or greater is required. +""" + +import os +import sys +import shutil +import logging +import inspect +import datetime +from ConfigParser import ConfigParser +from optparse import OptionParser + +galaxy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, os.path.join(galaxy_root, 'lib')) + +from galaxy import eggs +eggs.require('psycopg2') +eggs.require('SQLAlchemy') +import psycopg2 +from sqlalchemy.engine.url import make_url + +import galaxy.config + +from galaxy.objectstore import build_object_store_from_config +from galaxy.util.bunch import Bunch + +log = logging.getLogger() + +class Cleanup(object): + def __init__(self): + self.options = None + self.args = None + self.config = None + self.conn = None + self.action_names = [] + self.logs = {} + self.disk_accounting_user_ids = [] + self.object_store = None + + self.__cache_action_names() + self.__parse_args() + self.__setup_logging() + self.__load_config() + self.__connect_db() + self.__load_object_store() + + def __cache_action_names(self): + for name, value in inspect.getmembers(self): + if not name.startswith('_') and inspect.ismethod(value): + self.action_names.append(name) + + def __parse_args(self): + default_config = os.path.abspath(os.path.join(galaxy_root, 'universe_wsgi.ini')) + + parser = OptionParser() + parser.add_option('-c', '--config', dest='config', help='Path to Galaxy config file (universe_wsgi.ini)', default=default_config) + parser.add_option('-d', '--debug', action='store_true', dest='debug', help='Enable debug logging', default=False) + parser.add_option('--dry-run', action='store_true', dest='dry_run', help="Dry run (rollback all transactions)", default=False) + parser.add_option('-o', '--older-than', type='int', dest='days', help='Only perform action(s) on objects that have not been updated since the specified number of days', default=14) + parser.add_option('-U', '--no-update-time', action='store_false', dest='update_time', help="Don't set update_time on updated objects", default=True) + parser.add_option('-s', '--sequence', dest='sequence', help='Comma-separated sequence of actions, chosen from: %s' % self.action_names, default='') + ( self.options, self.args ) = parser.parse_args() + + self.options.sequence = [ x.strip() for x in self.options.sequence.split(',') ] + + def __setup_logging(self): + if self.options.debug: + log.setLevel('DEBUG') + else: + log.setLevel('INFO') + + format = "%(funcName)s %(levelname)s %(asctime)s %(message)s" + formatter = logging.Formatter(format) + + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + + log.addHandler(handler) + + def __load_config(self): + log.info('Reading config from %s' % self.options.config) + config_parser = ConfigParser(dict(here = os.getcwd(), + database_connection = 'sqlite:///database/universe.sqlite?isolation_level=IMMEDIATE')) + config_parser.read(self.options.config) + + config_dict = {} + for key, value in config_parser.items('app:main'): + config_dict[key] = value + + self.config = galaxy.config.Configuration(**config_dict) + + def __connect_db(self): + url = make_url(self.config.database_connection) + + log.info('Connecting to database with URL: %s' % url) + args = url.translate_connect_args() + args.update(url.query) + + assert url.get_dialect().name == 'postgres', 'This script can only be used with PostgreSQL.' + + self.conn = psycopg2.connect(**args) + + def __load_object_store(self): + self.object_store = build_object_store_from_config(self.config) + + def _open_logfile(self): + action_name = inspect.stack()[1][3] + logname = os.path.join(galaxy_root, 'scripts', 'cleanup_datasets', action_name + '.log') + + if self.options.dry_run: + log.debug('--dry-run specified, logging changes to stdout instead of log file: %s' % logname) + self.logs[action_name] = sys.stdout + else: + log.debug('Opening log file: %s' % logname) + self.logs[action_name] = open(logname, 'a') + + message = '==== Log opened: %s ' % datetime.datetime.now().isoformat() + self.logs[action_name].write(message.ljust(72, '=')) + self.logs[action_name].write('\n') + + def _log(self, message, action_name=None): + if action_name is None: + action_name = inspect.stack()[1][3] + if not message.endswith('\n'): + message += '\n' + self.logs[action_name].write(message) + + def _close_logfile(self): + action_name = inspect.stack()[1][3] + + message = '==== Log closed: %s ' % datetime.datetime.now().isoformat() + self.logs[action_name].write(message.ljust(72, '=')) + self.logs[action_name].write('\n') + + if self.options.dry_run: + log.debug('--dry-run specified, changes were logged to stdout insted of log file') + else: + log.debug('Closing log file: %s' % self.logs[action_name].name) + self.logs[action_name].close() + + del self.logs[action_name] + + def _run(self): + ok = True + for name in self.options.sequence: + if name not in self.action_names: + log.error('Unknown action in sequence: %s' % name) + ok = False + if not ok: + log.critical('Exiting due to previous error(s)') + sys.exit(1) + for name in self.options.sequence: + log.info('Calling %s' % name) + self.__getattribute__(name)() + log.info('Finished %s' % name) + + def _create_event(self, message): + """ + Create a new event in the cleanup_event table. + """ + + sql = """ + INSERT INTO cleanup_event + (create_time, message) + VALUES (NOW(), %s) + RETURNING id; + """ + + log.debug("SQL is: %s" % sql % ("'" + message + "'")) + + args = (message,) + + cur = self.conn.cursor() + log.info("Executing SQL") + cur.execute(sql, args) + log.info('Database status: %s' % cur.statusmessage) + + return cur.fetchone()[0] + + def _update(self, sql, args): + if args is not None: + log.debug('SQL is: %s' % sql % args) + else: + log.debug('SQL is: %s' % sql) + + cur = self.conn.cursor() + log.info('Executing SQL') + cur.execute(sql, args) + log.info('Database status: %s' % cur.statusmessage) + + return cur + + def _flush(self): + if self.options.dry_run: + self.conn.rollback() + log.info("--dry-run specified, all changes rolled back") + else: + self.conn.commit() + log.info("All changes committed") + + def _remove_metadata_file(self, id, action_name): + metadata_file = Bunch(id=id) + filename = self.object_store.get_filename(metadata_file, extra_dir='_metadata_files', extra_dir_at_root=True, alt_name="metadata_%d.dat" % id) + + self._log('Removing from disk: %s' % filename, action_name) + if not self.options.dry_run: + try: + os.unlink(filename) + except Exception, e: + self._log('Removal of %s failed with error: %s' % (filename, e), action_name) + + def _update_user_disk_usage(self): + """ + Any operation that purges a HistoryDatasetAssociation may require + updating a user's disk usage. Rather than attempt to resolve dataset + copies at purge-time, simply maintain a list of users that have had + HDAs purged, and update their usages once all updates are complete. + + This could probably be done more efficiently. + """ + log.info('Recalculating disk usage for users whose HistoryDatasetAssociations were purged') + + for user_id in self.disk_accounting_user_ids: + + # TODO: h.purged = false should be unnecessary once all hdas in purged histories are purged. + sql = """ + UPDATE galaxy_user + SET disk_usage = (SELECT SUM(total_size) + FROM ( SELECT d.total_size + FROM history_dataset_association hda + JOIN history h ON h.id = hda.history_id + JOIN dataset d ON hda.dataset_id = d.id + WHERE h.user_id = %s + AND h.purged = false + AND hda.purged = false + AND d.purged = false + AND d.id NOT IN (SELECT dataset_id + FROM library_dataset_dataset_association) + GROUP BY d.id) sizes) + WHERE id = %s + RETURNING disk_usage; + """ + + args = (user_id, user_id) + cur = self._update(sql, args) + self._flush() + + for tup in cur: + # disk_usage might be None (e.g. user has purged all data) + log.debug('Updated disk usage for user id %i to %s bytes' % (user_id, tup[0])) + + def _shutdown(self): + self.object_store.shutdown() + self.conn.close() + for handle in self.logs.values(): + message = '==== Log closed at shutdown: %s ' % datetime.datetime.now().isoformat() + handle.write(message.ljust(72, '=')) + handle.write('\n') + handle.close() + + def update_hda_purged_flag(self): + """ + The old cleanup script does not mark HistoryDatasetAssociations as purged when deleted Histories are purged. This method can be used to rectify that situation. + """ + log.info('Marking purged all HistoryDatasetAssociations associated with purged Datasets') + + event_id = self._create_event(inspect.stack()[0][3]) + + # update_time is intentionally left unmodified. + sql = """ + WITH purged_hda_ids + AS ( UPDATE history_dataset_association + SET purged = true + FROM dataset + WHERE history_dataset_association.dataset_id = dataset.id + AND dataset.purged + AND NOT history_dataset_association.purged + RETURNING history_dataset_association.id), + hda_events + AS (INSERT INTO cleanup_event_hda_association + (create_time, cleanup_event_id, hda_id) + SELECT NOW(), %s, id + FROM purged_hda_ids) + SELECT id + FROM purged_hda_ids + ORDER BY id; + """ + + args = (event_id,) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked HistoryDatasetAssociation purged: %s' % tup[0]) + self._close_logfile() + + def delete_userless_histories(self): + """ + Mark deleted all "anonymous" Histories (not owned by a registered user) that are older than the specified number of days. + """ + log.info('Marking deleted all userless Histories older than %i days' % self.options.days) + + event_id = self._create_event(inspect.stack()[0][3]) + + sql = """ + WITH deleted_history_ids + AS ( UPDATE history + SET deleted = true%s + WHERE user_id is null + AND NOT deleted + AND update_time < (NOW() - interval '%s days') + RETURNING id), + history_events + AS (INSERT INTO cleanup_event_history_association + (create_time, cleanup_event_id, history_id) + SELECT NOW(), %s, id + FROM deleted_history_ids) + SELECT id + FROM deleted_history_ids + ORDER BY id; + """ + + update_time_sql = '' + if self.options.update_time: + update_time_sql = """, + update_time = NOW()'""" + + sql = sql % (update_time_sql, '%s', '%s') + args = (self.options.days, event_id) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked userless History deleted: %s' % tup[0]) + self._close_logfile() + + def purge_deleted_hdas(self): + """ + Mark purged all HistoryDatasetAssociations currently marked deleted that are older than the specified number of days. + Mark deleted all MetadataFiles whose hda_id is purged in this step. + Mark deleted all ImplicitlyConvertedDatasetAssociations whose hda_parent_id is purged in this step. + Mark purged all HistoryDatasetAssociations for which an ImplicitlyConvertedDatasetAssociation with matching hda_id is deleted in this step. + """ + log.info('Marking purged all deleted HistoryDatasetAssociations older than %i days' % self.options.days) + + event_id = self._create_event(inspect.stack()[0][3]) + + sql = """ + WITH purged_hda_ids + AS ( UPDATE history_dataset_association + SET purged = true%s + WHERE deleted + AND NOT purged + AND update_time < (NOW() - interval '%s days') + RETURNING id, + history_id), + deleted_metadata_file_ids + AS ( UPDATE metadata_file + SET deleted = true%s + FROM purged_hda_ids + WHERE purged_hda_ids.id = metadata_file.hda_id + RETURNING metadata_file.hda_id AS hda_id, + metadata_file.id AS id), + deleted_icda_ids + AS ( UPDATE implicitly_converted_dataset_association + SET deleted = true%s + FROM purged_hda_ids + WHERE purged_hda_ids.id = implicitly_converted_dataset_association.hda_parent_id + RETURNING implicitly_converted_dataset_association.hda_id AS hda_id, + implicitly_converted_dataset_association.hda_parent_id AS hda_parent_id, + implicitly_converted_dataset_association.id AS id), + deleted_icda_purged_child_hda_ids + AS ( UPDATE history_dataset_association + SET purged = true%s + FROM deleted_icda_ids where deleted_icda_ids.hda_id = history_dataset_association.id), + hda_events + AS (INSERT INTO cleanup_event_hda_association + (create_time, cleanup_event_id, hda_id) + SELECT NOW(), %s, id + FROM purged_hda_ids), + metadata_file_events + AS (INSERT INTO cleanup_event_metadata_file_association + (create_time, cleanup_event_id, metadata_file_id) + SELECT NOW(), %s, id + FROM deleted_metadata_file_ids), + icda_events + AS (INSERT INTO cleanup_event_icda_association + (create_time, cleanup_event_id, icda_id) + SELECT NOW(), %s, id + FROM deleted_icda_ids), + icda_hda_events + AS (INSERT INTO cleanup_event_hda_association + (create_time, cleanup_event_id, hda_id) + SELECT NOW(), %s, hda_id + FROM deleted_icda_ids) + SELECT purged_hda_ids.id, + history.user_id, + deleted_metadata_file_ids.id, + deleted_icda_ids.id, + deleted_icda_ids.hda_id + FROM purged_hda_ids + LEFT OUTER JOIN deleted_metadata_file_ids + ON deleted_metadata_file_ids.hda_id = purged_hda_ids.id + LEFT OUTER JOIN deleted_icda_ids + ON deleted_icda_ids.hda_parent_id = purged_hda_ids.id + LEFT OUTER JOIN history + ON purged_hda_ids.history_id = history.id; + """ + + update_time_sql = "" + if self.options.update_time: + update_time_sql += """, + update_time = NOW()""" + + sql = sql % (update_time_sql, '%s', update_time_sql, update_time_sql, update_time_sql, '%s', '%s', '%s', '%s') + args = (self.options.days, event_id, event_id, event_id, event_id) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked HistoryDatasetAssociations purged: %s' % tup[0]) + if tup[1] is not None and tup[1] not in self.disk_accounting_user_ids: + self.disk_accounting_user_ids.append(int(tup[1])) + if tup[2] is not None: + self._log('Purge of HDA %s caused deletion of MetadataFile: %s' % (tup[0], tup[2])) + self._remove_metadata_file(tup[2], 'purge_deleted_hdas') + if tup[3] is not None: + self._log('Purge of HDA %s caused deletion of ImplicitlyConvertedDatasetAssociation: %s and converted HistoryDatasetAssociation: %s' % (tup[0], tup[3], tup[4])) + self._close_logfile() + + def purge_deleted_histories(self): + """ + Mark purged all Histories marked deleted that are older than the specified number of days. + Mark purged all HistoryDatasetAssociations in Histories marked purged in this step (if not already purged). + """ + log.info('Marking purged all deleted histories that are older than the specified number of days.') + + event_id = self._create_event(inspect.stack()[0][3]) + + sql = """ + WITH purged_history_ids + AS ( UPDATE history + SET purged = true%s + WHERE deleted + AND NOT purged + AND update_time < (NOW() - interval '%s days') + RETURNING id, + user_id), + purged_hda_ids + AS ( UPDATE history_dataset_association + SET purged = true%s + FROM purged_history_ids + WHERE purged_history_ids.id = history_dataset_association.history_id + AND NOT history_dataset_association.purged + RETURNING history_dataset_association.history_id AS history_id, + history_dataset_association.id AS id), + deleted_metadata_file_ids + AS ( UPDATE metadata_file + SET deleted = true%s + FROM purged_hda_ids + WHERE purged_hda_ids.id = metadata_file.hda_id + RETURNING metadata_file.hda_id AS hda_id, + metadata_file.id AS id), + deleted_icda_ids + AS ( UPDATE implicitly_converted_dataset_association + SET deleted = true%s + FROM purged_hda_ids + WHERE purged_hda_ids.id = implicitly_converted_dataset_association.hda_parent_id + RETURNING implicitly_converted_dataset_association.hda_id AS hda_id, + implicitly_converted_dataset_association.hda_parent_id AS hda_parent_id, + implicitly_converted_dataset_association.id AS id), + deleted_icda_purged_child_hda_ids + AS ( UPDATE history_dataset_association + SET purged = true%s + FROM deleted_icda_ids where deleted_icda_ids.hda_id = history_dataset_association.id), + history_events + AS (INSERT INTO cleanup_event_history_association + (create_time, cleanup_event_id, history_id) + SELECT NOW(), %s, id + FROM purged_history_ids), + hda_events + AS (INSERT INTO cleanup_event_hda_association + (create_time, cleanup_event_id, hda_id) + SELECT NOW(), %s, id + FROM purged_hda_ids), + metadata_file_events + AS (INSERT INTO cleanup_event_metadata_file_association + (create_time, cleanup_event_id, metadata_file_id) + SELECT NOW(), %s, id + FROM deleted_metadata_file_ids), + icda_events + AS (INSERT INTO cleanup_event_icda_association + (create_time, cleanup_event_id, icda_id) + SELECT NOW(), %s, id + FROM deleted_icda_ids), + icda_hda_events + AS (INSERT INTO cleanup_event_hda_association + (create_time, cleanup_event_id, hda_id) + SELECT NOW(), %s, hda_id + FROM deleted_icda_ids) + SELECT purged_history_ids.id, + purged_history_ids.user_id, + purged_hda_ids.id, + deleted_metadata_file_ids.id, + deleted_icda_ids.id, + deleted_icda_ids.hda_id + FROM purged_history_ids + LEFT OUTER JOIN purged_hda_ids + ON purged_history_ids.id = purged_hda_ids.history_id + LEFT OUTER JOIN deleted_metadata_file_ids + ON deleted_metadata_file_ids.hda_id = purged_hda_ids.id + LEFT OUTER JOIN deleted_icda_ids + ON deleted_icda_ids.hda_parent_id = purged_hda_ids.id; + """ + + update_time_sql = "" + if self.options.update_time: + update_time_sql += """, + update_time = NOW()""" + + sql = sql % (update_time_sql, '%s', update_time_sql, update_time_sql, update_time_sql, update_time_sql, '%s', '%s', '%s', '%s', '%s') + args = (self.options.days, event_id, event_id, event_id, event_id, event_id) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked History purged: %s' % tup[0]) + if tup[1] is not None and tup[1] not in self.disk_accounting_user_ids: + self.disk_accounting_user_ids.append(int(tup[1])) + if tup[2] is not None: + self._log('Purge of History %s caused deletion of HistoryDatasetAssociation: %s' % (tup[0], tup[2])) + if tup[3] is not None: + self._log('Purge of HDA %s caused deletion of MetadataFile: %s' % (tup[1], tup[3])) + self._remove_metadata_file(tup[3], 'purge_deleted_histories') + if tup[4] is not None: + self._log('Purge of HDA %s caused deletion of ImplicitlyConvertedDatasetAssociation: %s and converted HistoryDatasetAssociation: %s' % (tup[1], tup[4], tup[5])) + self._close_logfile() + + def delete_exported_histories(self): + """ + Mark deleted all Datasets that are derivative of JobExportHistoryArchives that are older than the specified number of days. + """ + log.info('Marking deleted all Datasets that are derivative of JobExportHistoryArchives that are older than the specified number of days.') + + event_id = self._create_event(inspect.stack()[0][3]) + + sql = """ + WITH deleted_dataset_ids + AS ( UPDATE dataset + SET deleted = true%s + FROM job_export_history_archive + WHERE job_export_history_archive.dataset_id = dataset.id + AND dataset.update_time <= (NOW() - interval '%s days') + RETURNING dataset.id), + dataset_events + AS (INSERT INTO cleanup_event_dataset_association + (create_time, cleanup_event_id, dataset_id) + SELECT NOW(), %s, id + FROM deleted_dataset_ids) + SELECT id + FROM deleted_dataset_ids + ORDER BY id; + """ + + update_time_sql = "" + if self.options.update_time: + update_time_sql += """, + update_time = NOW()""" + + sql = sql % (update_time_sql, '%s', '%s') + args = (self.options.days, event_id) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked Dataset deleted: %s' % tup[0]) + self._close_logfile() + + def delete_datasets(self): + """ + Mark deleted all Datasets whose associations are all marked as deleted (LDDA) or purged (HDA) that are older than the specified number of days. + """ + log.info('Marking deleted all Datasets whose associations are all marked as deleted/purged that are older than the specified number of days.') + + event_id = self._create_event(inspect.stack()[0][3]) + + sql = """ + WITH deleted_dataset_ids + AS ( UPDATE dataset + SET deleted = true%s + WHERE NOT deleted + AND NOT EXISTS (SELECT true + FROM library_dataset_dataset_association + WHERE (NOT deleted + OR update_time >= (NOW() - interval '%s days')) + AND dataset.id = dataset_id) + AND NOT EXISTS (SELECT true + FROM history_dataset_association + WHERE (NOT purged + OR update_time >= (NOW() - interval '%s days')) + AND dataset.id = dataset_id) + RETURNING id), + dataset_events + AS (INSERT INTO cleanup_event_dataset_association + (create_time, cleanup_event_id, dataset_id) + SELECT NOW(), %s, id + FROM deleted_dataset_ids) + SELECT id + FROM deleted_dataset_ids + ORDER BY id; + """ + + update_time_sql = "" + if self.options.update_time: + update_time_sql += """, + update_time = NOW()""" + + sql = sql % (update_time_sql, '%s', '%s', '%s') + args = (self.options.days, self.options.days, event_id) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked Dataset deleted: %s' % tup[0]) + self._close_logfile() + + def purge_datasets(self): + """ + Mark purged all Datasets marked deleted that are older than the specified number of days. + """ + log.info('Marking purged all Datasets marked deleted that are older than the specified number of days.') + + # TODO: force retry option + + event_id = self._create_event(inspect.stack()[0][3]) + + sql = """ + WITH purged_dataset_ids + AS ( UPDATE dataset + SET purged = true%s + WHERE deleted = true + AND purged = false + AND update_time < (NOW() - interval '%s days') + RETURNING id), + dataset_events + AS (INSERT INTO cleanup_event_dataset_association + (create_time, cleanup_event_id, dataset_id) + SELECT NOW(), %s, id + FROM purged_dataset_ids) + SELECT id + FROM purged_dataset_ids + ORDER BY id; + """ + + update_time_sql = "" + if self.options.update_time: + update_time_sql += """, + update_time = NOW()""" + + sql = sql % (update_time_sql, '%s', '%s') + args = (self.options.days, event_id) + cur = self._update(sql, args) + self._flush() + + self._open_logfile() + for tup in cur: + self._log('Marked Dataset purged: %s' % tup[0]) + + # always try to remove the "object store path" - if it's at an external_filename, that file will be untouched anyway (which is what we want) + dataset = Bunch(id=tup[0]) + filename = self.object_store.get_filename(dataset) + extra_files_dir = self.object_store.get_filename(dataset, dir_only=True, extra_dir="dataset_%d_files" % tup[0]) + + # don't check for existence of the dataset, it should exist + self._log('Removing from disk: %s' % filename) + if not self.options.dry_run: + try: + os.unlink(filename) + except Exception, e: + self._log('Removal of %s failed with error: %s' % (filename, e)) + + # extra_files_dir is optional so it's checked first + if os.path.exists(extra_files_dir): + self._log('Removing from disk: %s' % extra_files_dir) + if not self.options.dry_run: + try: + shutil.rmtree(extra_files_dir) + except Exception, e: + self._log('Removal of %s failed with error: %s' % (extra_files_dir, e)) + + self._close_logfile() + +if __name__ == '__main__': + cleanup = Cleanup() + cleanup._run() + cleanup._update_user_disk_usage() + cleanup._shutdown() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket