Source code for chrysalio.views.backup

"""Backup view callables."""

from __future__ import annotations
from os import scandir, walk
from os.path import exists, join, relpath
from time import time
from tempfile import mkdtemp
from shutil import rmtree
from zipfile import ZIP_DEFLATED, ZipFile, LargeZipFile
from configparser import ConfigParser

from lxml import etree
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import close_all_sessions
from transaction import manager

from pyramid.view import view_config
from pyramid.response import Response, FileResponse

from ..lib.i18n import _
from ..lib.utils import tounicode
from ..lib.form import get_action, Form
from ..lib.xml import create_entire_xml
from ..lib.config import config_get_namespace
from ..lib.log import log_info, log_error
from ..models import DB_METADATA, get_tm_dbsession
from ..models.dbuser import DBUser
from ..models.populate import db2xml, xml2db, web2db
from . import BaseView


# =============================================================================
[docs] class BackupView(BaseView): """Class to manage backup and restore operations. :type request: pyramid.request.Request :param request: Current request. """ _DBUser = DBUser _db2xml = (db2xml, ) _xml2db = (xml2db, ) # -------------------------------------------------------------------------
[docs] @view_config( route_name='backup', renderer='chrysalio:Templates/backup.pt', permission='backup-create') @view_config( route_name='backup', renderer='json', xhr=True, permission='backup-create') def index(self) -> dict | Response: """Choose between backup and restore.""" # Ajax if self._request.is_xhr: self._restore() return {} # Action action = get_action(self._request)[0] if action == 'bck!': response = self._backup() if response is not None: return response if action == 'rst!': self._restore() # Breadcrumbs & documentation self._request.breadcrumbs(_('Configuration Backup'), 1) self._request.documentation = '/backup' return { # yapf: disable 'form': Form(self._request), 'action': action, 'download_max_size': self._request.registry[ 'settings']['download-max-size'] }
# ------------------------------------------------------------------------- def _backup(self) -> Response | None: """Backup the configuration into a ZIP or an XML file. :rtype: pyramid.response.Response """ # Create the XML file elements = self._db2xml[0]( self._request.dbsession, modules=self._request.registry.get('modules')) root_elt = create_entire_xml( self._request.registry['relaxng'], elements) # pylint: disable = protected-access if not isinstance(root_elt, etree._Element): # pragma: nocover self._request.session.flash(root_elt, 'alert') return None # pylint: enable = protected-access # Return a XML file site_uid = self._request.registry.settings['site.uid'] attachments = self._request.registry.settings.get('attachments') if not attachments or not exists(attachments) \ or not tuple(scandir(attachments)): response = Response( body=etree.tostring( root_elt, pretty_print=True, xml_declaration=True, encoding='utf-8'), content_type='application/xml') response.headerlist.append(( # yapf: disable 'Content-Disposition', f'attachment; filename="{site_uid}.xml"')) log_info(self._request, 'backup without attachments') return response # Return a Zip # pylint: disable = consider-using-with tmp_dir = mkdtemp( prefix=site_uid, dir=self._request.registry.settings.get('temporary')) zip_file = ZipFile( join(tmp_dir, '{0}.zip'.format(site_uid)), 'w', ZIP_DEFLATED) zip_file.writestr( f'{site_uid}.xml', etree.tostring( root_elt, encoding='utf-8', xml_declaration=True, pretty_print=True)) for root, unused_, files in walk(attachments): for name in files: name = join(root, name) try: zip_file.write(name, relpath(name, attachments)) except LargeZipFile: # pragma: nocover zip_file.close() rmtree(tmp_dir) self._request.session.flash( _('This configuration is too big!'), 'alert') return None except IOError as error: # pragma: nocover zip_file.close() rmtree(tmp_dir) self._request.session.flash(error, 'alert') return None zip_file.close() filename = zip_file.filename response = FileResponse( filename, request=self._request, content_type='application/zip') response.headers['Content-Disposition'] = \ f'attachment; filename="{site_uid}.zip"' rmtree(tmp_dir) log_info(self._request, 'backup with attachments') return response # ------------------------------------------------------------------------- def _restore(self) -> bool: """Restore a configuration from a ZIP or an XML file. :rtype: bool """ # Reset the old configuration reset = self._request.params.get('reset') admin = None if reset: admin = self._administrator_retrieve() if not self._drop_dbtables(): # pragma: nocover return False attachments = self._request.registry.settings.get('attachments') if attachments and exists(attachments): rmtree(attachments) # Restore the new configuration with manager: self._request.dbsession = get_tm_dbsession( self._request.registry['dbsession_factory'], manager) if not self._administrator_restore(admin): return False web2db(self._request, self._xml2db[0], error_if_exists=reset) if self._request.session.peek_flash('alert'): return False # To be continued self._request.session.flash(_('Backup restored.')) log_info(self._request, 'restore with reset' if reset else 'restore') if reset: self._request.session.clear() self._request.session['_creation_time'] = time() self._request.session['_accessed_time'] = time() self._request.response.delete_cookie( self._request.registry.settings.get('auth.cookie', 'CIO_AUTH')) return True # ------------------------------------------------------------------------- def _administrator_retrieve(self) -> dict | None: """Retrieve the administrator record. :rtype: dict """ dbuser = self._request.dbsession.query(self._DBUser).filter_by( status='administrator').order_by('user_id').first() if dbuser is not None: return { k: dbuser.__dict__[k] for k in dbuser.__dict__ if k[0] != '_' and k != 'user_id' } return None # ------------------------------------------------------------------------- def _administrator_restore(self, record) -> bool: """Restore the administrator record. :param dict record: Dictionary representing the administrator configuration. :rtype: bool """ # Read administrator configuration from configuration file if record is None: config_file = self._request.registry.settings['__file__'] config = ConfigParser({'here': config_file}) config.read(tounicode(config_file), encoding='utf8') record = config_get_namespace(config, 'Populate', 'admin') error = self._DBUser.load_administrator( self._request.dbsession, record) if error: log_error(self._request, error) self._request.session.flash(error, 'alert') return False return True # ------------------------------------------------------------------------- def _drop_dbtables(self) -> bool: """Drop all tables of the database. :rtype: bool """ # Drop try: close_all_sessions() DB_METADATA.drop_all() except OperationalError as error: # pragma: nocover log_error(self._request, error.args[0]) self._request.session.flash(error.args[0], 'alert') return False # Recreate try: DB_METADATA.create_all() except OperationalError as error: # pragma: nocover log_error(self._request, error.args[0]) self._request.session.flash(error.args[0], 'alert') return False return True