Source code for chrysalio.lib.attachment

"""Various functions to manage attachments."""

from __future__ import annotations
from os import makedirs, remove, close
from os.path import join, exists, splitext, basename
from tempfile import mkdtemp, mkstemp

from webob.compat import cgi_FieldStorage as FieldStorage

from pyramid.request import Request


# =============================================================================
[docs] def attachment_path( request: Request, category: str, key: str, filename: str) -> str | None: """Return the absolute path of an attachement file or ``None`` if it does not exist or attachments are not activated. :type request: pyramid.request.Request :param request: Current request. :param str category: Category of attachement (e.g. Users). :param str key: Key to access to the attachments. :param str filename: Name of file to retrieve. :rtype: str """ if not key or not filename: return None attachments = request.registry.settings.get('attachments') if not attachments: return None attachment = join(attachments, category, key, filename) return attachment if exists(attachment) else None
# =============================================================================
[docs] def attachment_url( request: Request, category: str, key: str, filename: str) -> str | None: """Return the URL of an attachement file or ``None`` if it does not exist or attachments are not activated. :type request: pyramid.request.Request :param request: Current request. :param str category: Category of attachement (e.g. Users). :param str key: Key to access to the attachments. :param str filename: Name of file to retrieve. :rtype: str """ if not key or not filename: return None attachments = request.registry.settings.get('attachments') if not attachments: return None attachment = join(attachments, category, key, filename) if not exists(attachment): return None return request.route_path('attachment', path=(category, key, filename))
# =============================================================================
[docs] def attachment_update( request: Request, category: str, key: str, field: FieldStorage, replace: str | None = None, prefix: str = '') -> tuple[str, str | None]: """Update the content of the attachment directory. :type request: pyramid.request.Request :param request: Current request. :param str category: Category of attachement (e.g. Users). :param str key: Key to access to the attachments. :type field: webob.compat.cgi_FieldStorage :param field: File to retrieve. :param str replace: (optional) File to delete before adding the new file. :param str prefix: (optional) Prefix for newly created attachment key. :rtype: tuple :return: A tuple such as ``(attachments_key, file_path)``. """ # pylint: disable = too-many-return-statements if not isinstance(field, FieldStorage): return key, None # Directory for attachments attachments = request.registry.settings.get('attachments') if not attachments: return key, replace attachments = join(attachments, category) if not exists(attachments): try: makedirs(attachments) except (OSError, IOError): # pragma: nocover request.session.flash(('Permission refused.'), 'alert') return key, replace # Personal attachment directory try: if key is None: key = basename(mkdtemp(prefix=prefix, dir=attachments)) attachments = join(attachments, key) if not exists(attachments): makedirs(attachments) except (OSError, IOError): # pragma: nocover request.session.flash(('Permission refused.'), 'alert') return key, replace # Remove old file if replace and exists(join(attachments, replace)): try: remove(join(attachments, replace)) except (OSError, IOError): # pragma: nocover request.session.flash(('Permission refused.'), 'alert') return key, replace # Copy new file try: hdl1, filename = mkstemp( suffix=splitext(field.filename)[1], prefix='', dir=attachments) close(hdl1) with open(filename, 'wb') as hdl2: hdl2.write(field.file.read()) except (OSError, IOError): # pragma: nocover request.session.flash(('Permission refused.'), 'alert') return key, None return key, basename(filename)