Source code for chrysalio.scripts.cioupdate

#!/usr/bin/env python
"""Console script to backup and update Chrysalio sites."""

from sys import executable
from logging import getLogger
from os import chown, makedirs, setgid, setuid, environ, walk, scandir
from os.path import join, exists, dirname, expanduser, abspath, normpath
from os.path import relpath
from argparse import REMAINDER, ArgumentParser
from pwd import getpwnam
from getpass import getuser
from compileall import compile_dir
from configparser import ConfigParser
from zipfile import ZIP_DEFLATED, ZipFile

from git.repo import Repo
from git.exc import GitCommandError

from ..lib.i18n import _, translate
from ..lib.config import config_get, config_get_namespace
from ..lib.log import setup_logging
from ..lib.utils import tounicode, encrypt, decrypt, execute, full_url


LOG = getLogger(__name__)
SECTION_PREFIX = 'Update'


# =============================================================================
[docs]def main(args=None): """Main function.""" # Parse arguments parser = ArgumentParser('Backup and update Chrysalio sites.') parser.add_argument( 'conf_file', help='configuration file (e.g. cioupdate.ini)') parser.add_argument( 'sites', nargs='*', help='only update these sites (optional)') parser.add_argument('--lang', dest='lang', help='user language') parser.add_argument( '--encrypt', dest='password', help='encrypt password and exit') parser.add_argument( '--encrypt-key', dest='key', help='optional key for encryption') parser.add_argument( '--no-backup', dest='no_backup', help='do not backup instance', action='store_true') parser.add_argument( '--no-update', dest='no_update', help='do not update instance', action='store_true') parser.add_argument( '--drop-tables', dest='drop_tables', help='drop existing tables', action='store_true') parser.add_argument( '--remove-locks', dest='remove_locks', action='store_true', help='remove locks directory') parser.add_argument( '--remove-builds', dest='remove_builds', action='store_true', help='remove builds directory') parser.add_argument( '--skip-refresh', dest='skip_refresh', action='store_true', help='skip refreshment step') parser.add_argument( '--recreate-thumbnails', dest='recreate_thumbnails', action='store_true', help='recreate thumbnails') parser.add_argument( '--reindex', dest='reindex', action='store_true', help='recreate indexes') parser.add_argument( '--log-level', dest='log_level', help='log level', default='INFO', choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')) parser.add_argument('--log-file', dest='log_file', help='log file') parser.add_argument('extra', nargs=REMAINDER, help='extra options') args = parser.parse_args(args) conf_file = expanduser(args.conf_file) if not exists(conf_file): parser.print_usage() return setup_logging(args.log_level, args.log_file) # Backup and update CioUpdate(args, conf_file).start(args.sites)
# =============================================================================
[docs]class CioUpdate(object): """Class to backup and update Chrysalio sites.""" # ------------------------------------------------------------------------- def __init__(self, args, conf_file): """Constructor method.""" self._args = args self._config = ConfigParser({'here': dirname(abspath(conf_file))}) self._config.read(tounicode(conf_file), encoding='utf8') self._key = args.key or self._config_get('Code', 'key', '-') self._cookiecutters = self._config_get('Code', 'cookiecutters') if self._cookiecutters: self._cookiecutters = expanduser(self._cookiecutters) # -------------------------------------------------------------------------
[docs] def start(self, sites=None): """Start backup and update. :param list sites: (optional) Chrysalio sites to backup and update. If empty, this method processes all Chrysalio sites defined in configuration file. :rtype: bool """ # Only encrypt password if self._args.password: LOG.info( '%s = %s', self._args.password, encrypt(self._args.password.encode('utf8'), self._key)) return True # List of sites to update if not sites: sites = [ k[len(SECTION_PREFIX):] for k in self._config.sections() if k.startswith(SECTION_PREFIX)] if 'Code' not in sites: sites.insert(0, 'Code') # Backup web sites is_ok = True if not self._args.no_backup: LOG.info('{0:=^60}'.format(self._translate(_(' Backup ')))) for site in sites: is_ok &= self._backup(site) if not is_ok or self._args.no_update: return is_ok # Update and compile sources LOG.info('{0:=^60}'.format(self._translate(_(' Update ')))) for site in sites: is_ok &= self._update_sources(site) if not is_ok: return is_ok # Populate web sites is_ok = True LOG.info('{0:=^60}'.format(self._translate(_(' Populating ')))) for site in sites: is_ok &= self._populate(site) if not is_ok: return is_ok # Restart Apache if self._config_get('Code', 'reload'): # pragma: nocover LOG.info('{0:-^60}'.format( self._translate(_(' Restarting server ')))) error = execute( self._config_get('Code', 'reload').split()) LOG.info(self._translate(error[0])) if error[1]: LOG.error(self._translate(error[1])) return False return True
# ------------------------------------------------------------------------- def _backup(self, site): """Backup a Chrysalio site. :param str site: Name of Chrysalio site to backup. :rtype: bool """ # Something to do? conf_uri = self._config_get(site, 'conf_uri') if conf_uri is None or not exists(conf_uri.partition('#')[0]): return True directory = self._config_get(site, 'backup.directory') command = self._config_get(site, 'backup.command') if directory is None or command is None: return True args = self._config_get(site, 'backup.args') user, env = self._user_ids_and_env(self._config_get(site, 'user')) # Backup LOG.info('%s %s', command, site) cmd = [executable, join(dirname(executable), command)] cmd = cmd + ['--lang', self._args.lang] if self._args.lang else cmd cmd = cmd + ['--log-file', self._args.log_file] \ if self._args.log_file else cmd cmd = cmd + args.split() if args else cmd cmd += ['--log-level', self._args.log_level, conf_uri, directory] cmd += self._args.extra output, error = execute(cmd, preexec_fn=self._demote(*user), env=env) if error: LOG.error(translate( error[33:] or _('"${c}" failed', {'c': command}))) return False if output: LOG.warning(output[33:]) return True # ------------------------------------------------------------------------- def _update_sources(self, site): """Update and possibly compile sources. :param str site: Name of Chrysalio site to backup. :rtype: bool """ sources = config_get_namespace( self._config, '{0}{1}'.format(SECTION_PREFIX, site), 'source') is_ok = True for source in sources: if not source.endswith('repository'): continue repository = normpath(self._config_get( site, f"source.{source.replace('_', '.')}")) LOG.info(repository) error = self._recursive_chown(repository, getuser()) if error: # pragma: nocover LOG.error(error) is_ok = False continue error = self._git_pull(repository, site) if error: LOG.error(self._translate(error)) is_ok = False continue code = self._config_get( site, 'source.{0}.code'.format(source[:-11])) if code is not None and exists(code): compile_dir(code, quiet=True) cookiecutters = self._config_get( site, 'source.{0}.cookiecutters'.format(source[:-11])) if self._cookiecutters and cookiecutters is not None \ and exists(cookiecutters): self._zip_cookiecutters(cookiecutters) error = self._recursive_chown( repository, self._config_get(site, 'user')) if error: LOG.error(error) is_ok = False if self._cookiecutters and exists(self._cookiecutters): self._recursive_chown( self._cookiecutters, self._config_get('Code', 'user')) return is_ok # ------------------------------------------------------------------------- def _populate(self, site): """Populate a Chrysalio site. :param str site: Name of Chrysalio site to populate. :rtype: bool """ # Something to do? conf_uri = self._config_get(site, 'conf_uri') if conf_uri is None or not exists(conf_uri.partition('#')[0]): return True command = self._config_get(site, 'populate.command') if command is None: return True args = self._config_get(site, 'populate.args') drop_tables = self._args.drop_tables or \ self._config_get(site, 'populate.drop_tables') == 'true' user, env = self._user_ids_and_env(self._config_get(site, 'user')) env['PWD'] = dirname(conf_uri.partition('#')[0]) # Populate LOG.info('###### %s %s ######', command, site) cmd = [executable, join(dirname(executable), command)] cmd = cmd + ['--drop-tables'] if drop_tables else cmd cmd = cmd + ['--remove-locks'] if self._args.remove_locks else cmd cmd = cmd + ['--remove-builds'] if self._args.remove_builds else cmd cmd = cmd + ['--skip-refresh'] if self._args.skip_refresh else cmd cmd = cmd + ['--recreate-thumbnails'] \ if self._args.recreate_thumbnails else cmd cmd = cmd + ['--reindex'] if self._args.reindex else cmd cmd = cmd + ['--lang', self._args.lang] if self._args.lang else cmd cmd = cmd + ['--log-file', self._args.log_file] \ if self._args.log_file else cmd cmd = cmd + args.split() if args else cmd cmd += ['--log-level', self._args.log_level, conf_uri] cmd += self._args.extra output, error = execute( cmd, cwd=env['PWD'], preexec_fn=self._demote(*user), env=env) if error: LOG.error(translate(output or _('"${c}" failed', {'c': command}))) return False LOG.info(output[33:]) return True # ------------------------------------------------------------------------- def _git_pull(self, repo_path, site): """Pull and update a Git repository. :param str repo_path: Absolute path to of the repository. :param str site: Name of Chrysalio site to backup. :rtype: pyramid.i18n.TranslationString :return: Error message or ``None``. """ if not exists(join(repo_path, '.git', 'config')): return _('This directory is not a Git repository.') gituser = self._config_get(site, 'gituser', 'gituser') gitpassword = decrypt( self._config_get(site, 'gitpassword', ''), self._key) repo = Repo(repo_path) if not repo.remotes: # pragma: nocover return _('No remote repository') url = repo.remotes.origin.url fullurl = full_url(url, gituser, gitpassword) try: repo.remotes.origin.set_url(fullurl) repo.remotes.origin.pull() except GitCommandError as error: # pragma: nocover return str(error).replace(fullurl, url) finally: repo.remotes.origin.set_url(url) return None # ------------------------------------------------------------------------- @classmethod def _recursive_chown(cls, path, user): """Change owner of directory ``path`` recursively. :param str path: Absolute path to directory to process. :param str user: User name. :rtype: str :return: Error message or ``None``. """ if not user: return None try: user = getpwnam(user) except KeyError as error: return error try: chown(path, user.pw_uid, user.pw_gid) except OSError as error: # pragma: nocover return error for root, dirs, files in walk(path.encode('utf8')): for name in dirs: if name == b'.tox': dirs.remove(name) # pragma: nocover else: chown(join(root, name), user.pw_uid, user.pw_gid) for name in files: chown(join(root, name), user.pw_uid, user.pw_gid) return None # ------------------------------------------------------------------------- def _zip_cookiecutters(self, cookiecutters): """Zip each cookiecutter found in ``cookiecutters`` directory. :param str cookiecutters: Absolute path to cookiecutters directory. """ if self._cookiecutters is None: return if not exists(self._cookiecutters): makedirs(self._cookiecutters) for entry in scandir(cookiecutters): if not exists(join(entry.path, 'cookiecutter.json')): continue # pragma: nocover filename = join(self._cookiecutters, '{0}.zip'.format(entry.name)) with ZipFile(filename, 'w', ZIP_DEFLATED) as zip_file: root = normpath(join(entry.path, '..')) zip_file.write(entry.path, entry.name) for path, unused_, files in walk(entry.path): for name in files: name = join(path, name) zip_file.write(name, relpath(name, root)) LOG.info(self._translate( _('${n} -> ${f}', {'n': entry.name, 'f': filename}))) # ------------------------------------------------------------------------- @classmethod def _user_ids_and_env(cls, user=None): """Return a tuple such as ``((uid, group), env)``. :param str user: (optional) User log name. :rtype: tuple """ log_name = user or getuser() try: pw_record = getpwnam(log_name) except KeyError: pw_record = getpwnam(getuser()) env = environ.copy() env.update({ 'HOME': pw_record.pw_dir, 'LOGNAME': pw_record.pw_name, 'USER': pw_record.pw_name}) return (pw_record.pw_uid, pw_record.pw_gid), env # ------------------------------------------------------------------------- @classmethod def _demote(cls, user_uid, user_gid): """Demote the current user. :param int user_uid: User UID. :param int user_gid: User group. """ def result(): # pragma: nocover """Change user.""" setgid(user_gid) setuid(user_uid) return result # ------------------------------------------------------------------------- def _config_get(self, section, option, default=None): """Retrieve a value from the `cioupdate` configuration file. :param str section: Section name. :param str option: Option name. :param str default: (optional) Default value :rtype: str """ return config_get( self._config, '{0}{1}'.format(SECTION_PREFIX, section), option, default) # ------------------------------------------------------------------------- def _translate(self, text): """Return ``text`` translated. :param str text: Text to translate. :rtype: str """ return translate(text, self._args.lang)