Source code for chrysalio.lib.ftp

"""FTP library."""

from sys import version_info
from os import makedirs
from os.path import join, exists, getsize, splitext, basename
from ftplib import FTP, FTP_TLS, all_errors

from .i18n import _
from .utils import EXCLUDED_FILES, tounicode, tostr


# =============================================================================
[docs]class Ftp(object): """Class to manage FTP. :param log_error: Function to record errors. """ # ------------------------------------------------------------------------- def __init__(self, log_error): """Constructor method.""" self._log_error = log_error self.connection = None # -------------------------------------------------------------------------
[docs] def connect(self, values): """FTP connection using ``values`` to get FTP information. :param dict values: FTP values. :rtype: bool """ # Previous connection? self.quit() # Check FTP arguments host = values.get('ftp_host') user = values.get('ftp_user', 'anonymous') password = values.get('ftp_password', 'anonymous@') if host is None: self._log_error(_('FTP error: bad arguments!')) return False # Connection self.connection = FTP_TLS() if values.get('ftp_tls') else FTP() try: self.connection.connect(host, values.get('ftp_port', 21)) self.connection.login(user, password) except all_errors as error: self.connection = None self._log_ftp_error(error) return False self.connection.set_pasv(values.get('ftp_pasv', False)) # Change to root directory if values.get('ftp_path') and not self.cwd(values['ftp_path']): self.quit() return False return True
# -------------------------------------------------------------------------
[docs] def cwd(self, directory): """Change working directory. :param str directory: Directory. :rtype: bool """ if self.connection is None: return False try: self.connection.cwd(directory) except all_errors as error: self._log_ftp_error(error) return False return True
# -------------------------------------------------------------------------
[docs] def mkdir(self, directory): """Make a directory. :param str directory: Directory. :rtype: bool """ if self.connection is None: return False try: self.connection.mkd(directory) except all_errors as error: # pragma: nocover self._log_ftp_error(error) return False return True
# -------------------------------------------------------------------------
[docs] def list_directory(self): """Return the list of files in the current FTP directory. :rtype: tuple :return: A tuple such as ``(dirs_infos, files_infos)``. """ if self.connection is None: return {}, {} listing = [] try: self.connection.retrlines('MLSD', listing.append) except all_errors as error: # pragma: nocover self._log_ftp_error(error) return {}, {} dirs = {} files = {} for item in listing: item = item.partition(' ') infos = { k.split('=')[0].lower(): k.split('=')[1] for k in item[0].split(';') if k} name = item[2] if infos['type'] == 'dir': dirs[name] = infos elif infos['type'] == 'file': files[name] = infos return dirs, files
# -------------------------------------------------------------------------
[docs] def delete(self, filename): """Delete the file ``filename``. :rtype: bool """ if self.connection is None: return False try: self.connection.delete(filename) except all_errors as error: # pragma: nocover self._log_ftp_error(error) return False return True
# -------------------------------------------------------------------------
[docs] def rmtree(self, directory): """Remove the directory ``directory``. :rtype: bool """ if self.connection is None or not self.cwd(directory): return False dirs, files = self.list_directory() for name in files: if not self.delete(name): return False # pragma: nocover for name in dirs: if not self.rmtree(name): return False # pragma: nocover self.connection.cwd('..') try: self.connection.rmd(directory) except all_errors as error: # pragma: nocover self._log_ftp_error(error) return False return True
# -------------------------------------------------------------------------
[docs] def download(self, destination, exclude=None): """Transfer the current FTP directory into the ``destination`` directory. :rtype: bool :return: ``True`` if the download completed. :param list exclude: (optional) List of files to exclude. """ # pylint: disable = too-many-branches if self.connection is None: return False dirs, files = self.list_directory() if not dirs and not files: return True if not exists(destination): makedirs(destination) if exclude is None: exclude = EXCLUDED_FILES completed = True for name, infos in files.items(): if name in exclude: continue if splitext(name)[1] in ('.part', '.filepart'): completed = False continue try: fullname = join(destination, tounicode(name)) except UnicodeDecodeError as error: # pragma: nocover self._log_ftp_error(error) continue if not exists(fullname) or \ getsize(fullname) != int(infos['size']): with open(fullname, 'wb') as hdl: self.connection.retrbinary( tostr(f'RETR {tounicode(name)}'), hdl.write) completed = False for name in dirs: if name in exclude: continue try: self.connection.cwd(name) except all_errors as error: # pragma: nocover self._log_ftp_error(error) continue try: new_destination = join(destination, tounicode(name)) except UnicodeDecodeError as error: # pragma: nocover self._log_ftp_error(error) continue if not exists(new_destination): makedirs(new_destination) completed &= self.download(new_destination) self.connection.cwd('..') return completed
# -------------------------------------------------------------------------
[docs] def upload(self, filename, upload_name=None): """Upload a file in the current directory. :param str filename: Absolute path to the file to upload. :param str upload_name: (optional) Name of the uploaded file. """ if self.connection is None: return stor = 'STOR {0}'.format(upload_name or basename(filename)) stor = stor.encode('utf8') if version_info < (3, 0) else stor with open(filename, 'rb') as hdl: self.connection.storbinary(stor, hdl)
# -------------------------------------------------------------------------
[docs] def quit(self): """Quit the FTP connection.""" if self.connection is not None: try: self.connection.quit() self.connection.close() except all_errors as error: # pragma: nocover self._log_ftp_error(error) self.connection = None
# ------------------------------------------------------------------------- def _log_ftp_error(self, error): """Add an error message in the log cache. :param str text: Error. """ self._log_error(_('FTP error: ${e}', {'e': tounicode(error)}))