Source code for chrysalio.lib.restful

"""Functions to manage RESTful API."""

from datetime import datetime
from json import loads, dumps
from math import fabs
from http.client import BadStatusLine

from urllib.parse import urlencode
from urllib.request import urlopen
from urllib.error import URLError

from .i18n import _
from .utils import encrypt, decrypt
from ..models.dbuser import DBUser


TOKEN_TTL = 1.0


# =============================================================================
[docs]def restful_call(url, login, key, data=None): """RESTful call. :param str url: RESTful URL to call. :param str login: Login of an authorized user. :param str key: Key to decrypt the token. :param dict data: (optional) Data to transfer. :rtype: tuple :return: A tuple such as ``(response, error)``. """ if data is None: data = {} for k in data: if isinstance(data[k], (tuple, list, dict)): data[k] = dumps(data[k]) data['login'] = login data['token'] = encrypt(datetime.utcnow().isoformat(), key or '-') data = urlencode(data).encode('ascii') try: with urlopen(url, data) as hdl: response = loads(hdl.read().decode('utf8')) except (URLError, ValueError, BadStatusLine) as error: return None, str(error) if response['error'] is not None: return None, response['error'] return response, None
# =============================================================================
[docs]def restful_login(request, key, token_ttl=None): """Login during a RESTful call. :type request: pyramid.request.Request :param request: Current request. :param str key: Key to decrypt the token. :param float token_ttl: (optional) Validity period in seconds for the token. :rtype: str :return: Translated error message or ``None``. """ if 'user' in request.session: return None # Check the token token = request.params.get('token') if not token: return request.localizer.translate(_('Token is missing.')) token = decrypt(token, key or '-') if not token: return request.localizer.translate(_('Token is invalid.')) token = datetime.strptime(token, '%Y-%m-%dT%H:%M:%S.%f') delta = (datetime.utcnow() - token).total_seconds() if fabs(delta) > float(token_ttl or TOKEN_TTL): return request.localizer.translate(_('Token has expired.')) # Set up user session dbuser = DBUser.get(request, request.params.get('login'))[0] if dbuser is None: return request.localizer.translate(_('Access denied.')) dbuser.set_session(request) return None