Source code for chrysalio.scripts.ciopopulate

"""Populate the database and the attachment directory."""

from sys import exit as sys_exit
from os import makedirs, sep
from os.path import exists, join, abspath, basename
from shutil import rmtree
from logging import getLogger
from argparse import REMAINDER
from datetime import datetime

from transaction import manager

from ..lib.i18n import _
from ..lib.utils import scandir, copy_content
from ..lib.xml import load_xml2, relaxng4validation
from ..lib.config import config_get, config_get_namespace
from ..relaxng import RELAXNG
from ..models import get_tm_dbsession
from ..models.dbsettings import DBSettings
from ..models.dbuser import DBUser
from ..models.populate import xml2db
from . import Script


LOG = getLogger(__name__)


# =============================================================================
[docs]def main(args=None): """Main function.""" args = Populate.arguments(Populate.argument_parser(), args) if args is not None: sys_exit(Populate(args, DBUser, xml2db, RELAXNG).run(args.files))
# =============================================================================
[docs]class Populate(Script): """Class to populate database. :type args: argparse.Namespace :param args: Command line arguments. :param dbuser_class: Class to manage user SQL table. :param function _xml2db: Function to load XML configuration files. See :meth:`~chrysalio.models.populate.xml2db`. :param dict relaxng: A dictionary describing the main Relax NG with the following keys: ``'root'``, ``'version'`` and ``'file'``. :param list includes: (optional) List of hard coding `includes`. :type dbsession_factory: sqlalchemy.orm.session.sessionmaker :param dbsession_factory: (optional) Function to create session. """ # pylint: disable = too-many-arguments # ------------------------------------------------------------------------- def __init__(self, args, dbuser_class, _xml2db, relaxng, includes=None, dbsession_factory=None): """Constructor method.""" super(Populate, self).__init__(args, True, includes, dbsession_factory) self._dbuser_class = dbuser_class self._xml2db = _xml2db self._relaxng = relaxng # -------------------------------------------------------------------------
[docs] @classmethod def argument_parser(cls, description='Populate database.'): """Create an argument parser object to parse command line arguments. :param str description: (optional) Description of the script populating the database. :rtype: argparse.ArgumentParser """ parser = super(Populate, cls).argument_parser(description=description) parser.add_argument( 'files', nargs='*', help='optional XML configuration files to use') parser.add_argument( '--attachments', help='optional attachment directory to use') parser.add_argument( '--drop-tables', dest='drop_tables', help='drop existing tables', action='store_true') parser.add_argument( '--remove-locks', dest='remove_locks', action='store_true', help='remove locks directory') parser.add_argument( '--remove-builds', dest='remove_builds', action='store_true', help='remove builds directory') parser.add_argument( '--skip-refresh', dest='skip_refresh', action='store_true', help='skip refreshment step') parser.add_argument( '--recreate-thumbnails', dest='recreate_thumbnails', action='store_true', help='recreate thumbnails') parser.add_argument( '--reindex', dest='reindex', action='store_true', help='recreate indexes') parser.add_argument('extra', nargs=REMAINDER, help='extra options') return parser
# -------------------------------------------------------------------------
[docs] def run(self, files=None): """Check settings and initialize database. :param list files: (optional) List of files on command-line. :rtype: int :return: Exit code. """ if self.registry is None: return 1 with manager: dbsession = get_tm_dbsession( self.registry['dbsession_factory'], manager) # Load administrator LOG.info(self._translate(_('====== Adding administrator'))) error = self._dbuser_class.load_administrator( dbsession, config_get_namespace( self._config, 'Populate', 'admin')) if error: LOG.error(self._translate(error)) LOG.critical(self._translate(_('Incorrect administrator.'))) return 1 # Load XML configuration files self._populate_from_xml(dbsession, files or []) # Copy attachments self._copy_attachments(self._args.attachments) # Update settings dbsetting = dbsession.query(DBSettings).filter_by( key='populate').first() if dbsetting is None: dbsetting = DBSettings(key='populate') dbsession.add(dbsetting) dbsetting.value = datetime.now().isoformat(' ').split('.')[0] # Complete operation for module_id in self.registry['modules']: self.registry['modules'][module_id].activate( self.registry, dbsession) for module_id in self.registry['modules']: error = self.registry['modules'][module_id].populate( self._args, self.registry, dbsession) if error: # pragma: nocover LOG.error(self._translate(error)) return 1 return 0
# ------------------------------------------------------------------------- def _populate_from_xml(self, dbsession, extra_files): """Populate database with XML content. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :param list extra_files: List of files on command-line. """ if self.registry is None: return # List of files to load files = config_get_namespace(self._config, 'Populate', 'data') files = [files[k] for k in sorted(files.keys())] + list(extra_files) if not files: # pragma: nocover return # Load main Relax NG and Relax NG of each module relaxngs = relaxng4validation(self._relaxng) for module_id in self.registry['modules']: if self.registry['modules'][module_id].relaxng is not None: relaxngs.update(relaxng4validation( self.registry['modules'][module_id].relaxng)) # Load files LOG.info(self._translate(_('====== Loading configurations'))) done = set() for filename in files: filename = abspath(filename) if filename in done: continue LOG.info(filename) tree, err = load_xml2(filename, relaxngs) if err is not None: LOG.error(self._translate(err)) continue errors = self._xml2db( dbsession, tree, error_if_exists=False, modules=self.registry['modules']) for error in errors: # pragma: nocover LOG.error(self._translate(error)) done.add(filename) # ------------------------------------------------------------------------- def _copy_attachments(self, extra_attachments): """Possibly copy attachment files. :param list extra_attachments: Attachment directory on command-line. """ if self.registry is None: return destination = self.registry.settings.get('attachments') if not destination: return if config_get(self._config, 'Populate', 'drop_tables') == 'true' and \ exists(destination): rmtree(destination) if not exists(destination): makedirs(destination) # List of attachments to copy attachments = config_get_namespace( self._config, 'Populate', 'attachment') attachments = [attachments[k] for k in sorted(attachments.keys())] if extra_attachments and exists(extra_attachments): if extra_attachments.endswith(sep): extra_attachments = extra_attachments[:-len(sep)] attachments += [ join(extra_attachments, k.name) for k in scandir(extra_attachments)] for attachment in attachments: if exists(attachment): name = basename(attachment) for entry in scandir(attachment): if entry.is_dir(): copy_content( entry.path, join(destination, name, entry.name))