"""User management view callables."""
from os.path import join, exists
from datetime import date, datetime
from hashlib import sha1
from shutil import rmtree
from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError
from colander import Mapping, SchemaNode, String, All, Length, Email
from pyramid.httpexceptions import HTTPFound, HTTPNotFound, HTTPForbidden
from pyramid.view import view_config
from pyramid.security import NO_PERMISSION_REQUIRED
from ..lib.i18n import _
from ..lib.log import log_info
from ..lib.utils import make_id, age
from ..lib.form import get_action, SameAs, Form
from ..lib.paging import PAGE_SIZES, Paging
from ..lib.filter import Filter
from ..lib.tabset import Tabset
from ..lib.mailing import Mailing
from ..lib.attachment import attachment_url, attachment_update
from ..includes.themes import theme_static_prefix
from ..models import LABEL_LEN
from ..models.populate import xml2db, db2web, web2db
from ..models.dbprofile import DBProfile
from ..models.dbuser import DBUser, DBUserProfile
from ..models.dbgroup import DBGroup, DBGroupUser
from . import BaseView
# =============================================================================
[docs]
class UserView(BaseView):
"""Class to manage user views.
:type request: pyramid.request.Request
:param request:
Current request.
"""
_DBUser = DBUser
_xml2db = (xml2db,)
_invitation_html = 'chrysalio:Templates/email_invitation.pt'
_invitation_text = 'chrysalio:Templates/email_invitation.txt'
_reset_password_html = \
'chrysalio:Templates/email_reset_password.pt' # nosec
_reset_password_text = \
'chrysalio:Templates/email_reset_password.txt' # nosec
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='user_index', renderer='chrysalio:Templates/user_index.pt',
permission='user-view')
@view_config(route_name='user_index', renderer='json', xhr=True)
def index(self):
"""List all users."""
# Ajax
i_creator = self._request.has_permission('user-create')
if self._request.is_xhr:
if i_creator:
self._import_users()
return {}
# Action
action, items = get_action(self._request)
i_editor = self._request.has_permission('user-edit')
if action[:4] == 'mel!' and i_editor:
errors = self._send_invitation(items)
if not errors: # pragma: nocover
self._request.session.flash(_('The invitation has been sent.'))
else:
for error in errors:
self._request.session.flash(error, 'alert')
elif action[:4] == 'del!' and i_creator:
self._delete_users(items)
elif action == 'imp!' and i_creator:
self._import_users()
elif action[:4] == 'exp!' and i_editor:
action = self._users2response(items)
if action:
return action
# Filter
paging_id = 'users'
pfilter = Filter(
self._request, paging_id, (
('login', _('Login'), False, ''),
('last_name', _('Last name'), False, ''),
('email', _('Email'), False, ''),
# ('email_hidden', _('Hidden Email'), False, True),
('status', _('Status'), False,
[('', ' ')] + list(self._DBUser.status_labels.items()))),
(('status', '=', 'active'),),
remove=action[:4] == 'crm!' and action[4:] or None)
# Paging
defaults = Paging.params(self._request, paging_id, '+last_name')
dbquery = pfilter.sql(
self._request.dbsession.query(self._DBUser), 'users')
oby = getattr(self._DBUser, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
user_paging = Paging(self._request, paging_id, dbquery, defaults)
user_paging.set_current_ids('user_id')
# Form & completed action
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
if action and action[3] == '!':
action = ''
# Breadcrumbs & documentation
self._request.breadcrumbs(_('Users'), 1)
self._request.documentation = '/admin/user/index'
return {
'age': age, 'action': action, 'items': items, 'form': form,
'pfilter': pfilter, 'paging': user_paging,
'status_labels': self._DBUser.status_labels,
'PAGE_SIZES': PAGE_SIZES, 'i_creator': i_creator,
'i_editor': i_editor, 'has_attachments': bool(
self._request.registry.settings.get('attachments')),
'download_max_size': self._request.registry['settings'][
'download-max-size'],
'attachment_url': attachment_url}
# -------------------------------------------------------------------------
[docs]
@view_config(route_name='user_index_filter', renderer='json', xhr=True)
def index_filter(self):
"""Return a list to autocomplete a filter field."""
return Filter.sql_autocomplete(self._request, self._DBUser)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='user_view', renderer='chrysalio:Templates/user_view.pt')
@view_config(
route_name='user_account', renderer='chrysalio:Templates/user_view.pt')
def view(self):
"""Show user settings."""
if not self.can_view():
raise HTTPForbidden()
dbuser = self._get_user(str(self._request.session['user']['user_id']))
is_me = dbuser.user_id == self._request.session['user']['user_id']
picture = self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, dbuser.attachments_dir, dbuser.attachments_key,
dbuser.picture) or
(dbuser.honorific == 'Mrs' and
'{0}/images/user_picture_girl.svg'.format(
theme_static_prefix(self._request))) or
'{0}/images/user_picture_boy.svg'.format(
theme_static_prefix(self._request)))
navigator = Paging.navigator(
self._request, 'users', dbuser.user_id,
self._request.route_path('user_view', user_id='_ID_')) \
if self._request.matched_route.name == 'user_view' else ''
action = get_action(self._request)[0]
if action == 'exp!':
action = self._users2response((dbuser.user_id,))
if action:
return action
elif action == 'mel!':
errors = self._send_invitation((dbuser.user_id,))
if not errors: # pragma: nocover
self._request.session.flash(_('The invitation has been sent.'))
else:
self._request.session.flash(errors[0], 'alert')
action = ''
# Breadcrumbs & documentation
user_name = '{0} {1}'.format(
dbuser.first_name or '', dbuser.last_name).strip()
if self._request.referrer and \
'/user/view/' in self._request.referrer:
self._request.breadcrumbs.pop()
self._request.breadcrumbs(
_('My Account') if is_me else _('${n} Account', {'n': user_name}),
replace=self._request.route_path(
'user_edit', user_id=dbuser.user_id))
self._request.documentation = '/admin/user/view'
return {
'form': Form(self._request),
'tabset': Tabset(
self._request, 'tabUser', dbuser.settings_tabs(self._request)),
'dbuser': dbuser, 'is_me': is_me, 'user_name': user_name,
'navigator': navigator, 'picture': picture,
'i_editor': self._request.has_permission('user-edit')}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='user_create', renderer='chrysalio:Templates/user_edit.pt')
@view_config(
route_name='user_edit', renderer='chrysalio:Templates/user_edit.pt')
@view_config(route_name='user_edit', renderer='json', xhr=True)
def edit(self):
"""Create or edit user settings."""
# Authorization
dbuser = self._get_user() if 'user_id' in self._request.matchdict \
else None
if (not dbuser and not self.can_create()) or \
(dbuser and not self.can_edit()):
raise HTTPForbidden()
# Ajax
if self._request.is_xhr:
if dbuser is not None:
dbuser.attachments_key, dbuser.picture = attachment_update(
self._request, dbuser.attachments_dir,
dbuser.attachments_key, self._request.POST['picture'],
replace=dbuser.picture,
prefix=dbuser.login.replace('@', '_')[:12])
dbuser.account_update = datetime.now()
log_info(self._request, 'user_update_picture', dbuser.login)
return {}
# Initialization
is_me = dbuser and \
dbuser.user_id == self._request.session['user']['user_id']
profiles = {
k.profile_id: (
k.label(self._request), k.description(self._request))
for k in self._request.dbsession.query(DBProfile)}
groups = {
k.group_id: (
k.label(self._request), k.description(self._request))
for k in self._request.dbsession.query(DBGroup)}
# Form and action
form = Form(
self._request,
*self._DBUser.settings_schema(
self._request, profiles, groups, dbuser),
obj=dbuser)
action = get_action(self._request)[0]
if action == 'pct!' and dbuser is not None:
dbuser.attachments_key, dbuser.picture = attachment_update(
self._request, dbuser.attachments_dir, dbuser.attachments_key,
self._request.POST['picture'], replace=dbuser.picture,
prefix=dbuser.login.replace('@', '_')[:12])
dbuser.account_update = datetime.now()
log_info(self._request, 'user_update_picture', dbuser.login)
elif action == 'sav!' and form.validate():
dbuser = self._save(dbuser, profiles, groups, form.values)
if dbuser is not None:
if is_me:
dbuser.set_session(self._request)
if 'user_id' not in self._request.matchdict:
self._request.breadcrumbs.pop()
log_info(
self._request, 'user_id' in self._request.matchdict and
'user_edit' or 'user_create', dbuser.login)
return HTTPFound(self._request.route_path(
'user_view', user_id=dbuser.user_id))
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
# Picture
picture = \
dbuser and self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, DBUser.attachments_dir,
dbuser.attachments_key, dbuser.picture) or
(dbuser.honorific == 'Mrs' and
'{0}/images/user_picture_girl.svg'.format(
theme_static_prefix(self._request))) or
'{0}/images/user_picture_boy.svg'.format(
theme_static_prefix(self._request)))
# Breadcrumbs & documentation
user_name = dbuser and '{0} {1}'.format(
dbuser.first_name or '', dbuser.last_name).strip()
if self._request.referrer and \
'/user/account' in self._request.referrer:
self._request.breadcrumbs.pop()
if not dbuser:
self._request.breadcrumbs(_('User Creation'))
else:
self._request.breadcrumbs(
_('My Account Edition') if is_me else _(
'${n} Edition', {'n': user_name}),
replace=self._request.route_path(
'user_view', user_id=dbuser.user_id))
self._request.documentation = '/admin/user/edit'
return {
'form': form, 'dbuser': dbuser or self._DBUser,
'user_name': user_name, 'action': action, 'profiles': profiles,
'groups': groups, 'is_me': is_me, 'picture': picture,
'tabset': Tabset(
self._request, 'tabUser',
self._DBUser.settings_tabs(self._request))}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='user_password_forgot',
renderer='chrysalio:Templates/user_password_forgot.pt',
permission=NO_PERMISSION_REQUIRED)
def password_forgot(self):
"""Get email address to send a token to reset the password."""
schema = SchemaNode(Mapping())
schema.add(SchemaNode(
String(), name='email',
validator=All(Email(), Length(max=LABEL_LEN))))
form = Form(self._request, schema)
if form.validate():
dbuser = self._request.dbsession.query(self._DBUser).filter(
self._DBUser.email.ilike(form.values['email'])).first()
if dbuser is None:
self._request.session.flash(
_('Please use the email associated with your account.'),
'alert')
elif self._send_reset_password_link(dbuser): # pragma: nocover
self._request.session.flash(
_('A link to reset your password has been sent by mail.'))
log_info(
self._request, 'user_password_forgot', dbuser.login)
return HTTPFound(self._request.route_path('home'))
self._request.breadcrumbs(_('Forgot Password'), 1)
self._request.documentation = '/user/password/forgot'
return {'form': form}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='user_password_reset',
renderer='chrysalio:Templates/user_password_reset.pt',
permission=NO_PERMISSION_REQUIRED)
def password_reset(self):
"""Reset the user password."""
if 'user' in self._request.session:
del self._request.session['user']
dbuser = self._get_user()
token = '{0}{1}'.format(dbuser.login, date.today().isoformat())
token = sha1(token.encode('utf8')).hexdigest() # nosec
if token != self._request.matchdict['token']:
raise HTTPNotFound(comment=_('This page does not exist anymore.'))
password_min_length = self._request.registry['settings'][
'password-min-length']
schema = SchemaNode(Mapping())
schema.add(SchemaNode(
String(), name='password1',
validator=All(
Length(min=password_min_length),
SameAs(self._request, 'password2'))))
schema.add(SchemaNode(String(), name='password2'))
form = Form(self._request, schema)
if form.validate():
dbuser.set_password(form.values['password1'])
dbuser.password_mustchange = False
self._request.session.flash(
_('Your password has been changed. You can use it.'))
log_info(self._request, 'user_password_reset')
return HTTPFound(self._request.route_path('home'))
self._request.breadcrumbs(_('Password Reset'), 1)
self._request.documentation = '/user/password/reset'
return {'form': form}
# -------------------------------------------------------------------------
[docs]
def can_view(self):
"""Check if the current user can view the user account.
:rtype: bool
"""
if self._request.has_permission('user-view'):
return True
user_id = self._request.matchdict.get('user_id')
if user_id is None or (
user_id.isdigit() and
int(user_id) == self._request.session['user']['user_id']):
return True
return False
# -------------------------------------------------------------------------
[docs]
def can_create(self):
"""Check if the current user can create a new user.
:rtype: bool
"""
if not self._request.has_permission('user-create'):
return False
return True
# -------------------------------------------------------------------------
[docs]
def can_edit(self):
"""Check if the current user can edit an user account.
:rtype: bool
"""
if self._request.has_permission('user-edit'):
return True
user_id = self._request.matchdict['user_id']
if user_id.isdigit() and \
int(user_id) == self._request.session['user']['user_id']:
return True
return False
# -------------------------------------------------------------------------
def _get_user(self, my_user_id=''):
"""Return the SqlAlchemy object of the selected user or raise an
HTTPNotFound exception.
:param str my_user_id: (optional)
User ID of current user.
:rtype: .models.dbuser.DBUser
"""
user_id = self._request.matchdict.get('user_id', my_user_id)
if not user_id.isdigit():
raise HTTPNotFound()
dbuser = self._request.dbsession.query(self._DBUser).filter_by(
user_id=int(user_id)).first()
if dbuser is None:
raise HTTPNotFound()
return dbuser
# -------------------------------------------------------------------------
def _delete_users(self, user_ids):
"""Delete users.
:param list user_ids:
List of user IDs to delete.
"""
user_ids = {int(k) for k in user_ids}
# Do not delete myself
if self._request.session['user']['user_id'] in user_ids:
self._request.session.flash(
_("You can't delete your own user."), 'alert')
return
# Do not delete an administrator
if 1 in user_ids or \
(not self._request.has_permission('system.administrator') and
self._request.dbsession.query(self._DBUser.user_id).filter(
self._DBUser.user_id.in_(user_ids)).filter_by(
status='administrator').first() is not None):
self._request.session.flash(
_("You can't delete an administrator."), 'alert')
return
# Delete
deleted = []
attachments = self._request.registry.settings.get('attachments')
for dbuser in self._request.dbsession.query(self._DBUser).filter(
self._DBUser.user_id.in_(user_ids)):
if attachments and dbuser.attachments_key:
attachment = join(
attachments, DBUser.attachments_dir,
dbuser.attachments_key)
if exists(attachment):
rmtree(attachment)
deleted.append(dbuser.login)
self._request.dbsession.delete(dbuser)
if deleted:
log_info(self._request, 'user_delete', ' '.join(deleted))
# -------------------------------------------------------------------------
def _import_users(self):
"""Import users."""
# Get current IDs
user_logins = {
k[0] for k in self._request.dbsession.query(DBUser.login)}
# Update database
web2db(self._request, self._xml2db[0], 'user')
# Get new IDs
user_logins = {k[0] for k in self._request.dbsession.query(
DBUser.login)} - user_logins
if user_logins:
log_info(self._request, 'user_import', ' '.join(user_logins))
# -------------------------------------------------------------------------
def _users2response(self, user_ids):
"""Export users as an XML file embedded in a Pyramid response.
:param list user_ids:
List of user IDs to export.
:rtype: :class:`pyramid.response.Response` or ``''``
"""
dbitems = [
k for k in self._request.dbsession.query(self._DBUser).filter(
self._DBUser.user_id.in_(user_ids))
if k.user_id != 1 or k.status != 'administrator']
if not dbitems:
self._request.session.flash(
_('You cannot export the main administrator.'), 'alert')
return ''
filename = '{0}.{1}.xml'.format(
len(dbitems) == 1 and dbitems[0].login or
make_id(self._request.registry['settings']['title'], 'token'),
self._DBUser.suffix)
log_info(
self._request, 'user_export',
' '.join([k.login for k in dbitems]))
return db2web(self._request, dbitems, filename)
# -------------------------------------------------------------------------
def _send_invitation(self, user_ids):
"""Send an e-mail invitation to each user.
:param list user_ids:
List of user IDs to invite.
:rtype: list
:return:
A list of errors.
"""
site_title = self._request.registry['settings']['title']
email_template = {
'subject': _('Invitation to join ${n}', {'n': site_title}),
'from': self._request.session['user']['email'],
'html_template': self._invitation_html,
'text_template': self._invitation_text}
recipients = []
for dbuser in self._request.dbsession.query(self._DBUser).filter(
self._DBUser.user_id.in_(user_ids)):
recipients.append({
'to': dbuser.email,
'site_title': site_title,
'login': dbuser.login,
'honorific': dbuser.honorific,
'first_name': dbuser.first_name or '',
'last_name': dbuser.last_name,
'url': self._request.route_url('home')})
log_info(
self._request, 'user_send_invitation',
' '.join([k['login'] for k in recipients]))
return Mailing(self._request).mailing(email_template, recipients)
# -------------------------------------------------------------------------
def _send_reset_password_link(self, dbuser):
"""Send an e-mail with a link to the page to reset the password.
:type dbuser: .models.dbuser.DBUser
:param dbuser:
The concerned user.
:rtype: bool
"""
site_title = self._request.registry['settings']['title']
from_field = self._request.registry['settings']['email']
email_template = {
'subject': _('Password Reset on ${n}', {'n': site_title}),
'from': from_field,
'html_template': self._reset_password_html,
'text_template': self._reset_password_text}
token = '{0}{1}'.format(dbuser.login, date.today().isoformat())
token = sha1(token.encode('utf8')).hexdigest() # nosec
recipient = {
'to': dbuser.email,
'site_title': site_title,
'login': dbuser.login,
'honorific': dbuser.honorific,
'first_name': dbuser.first_name or '',
'last_name': dbuser.last_name,
'url': self._request.route_url(
'user_password_reset', user_id=dbuser.user_id, token=token)}
error = Mailing(self._request).mailing(email_template, [recipient])
if error:
self._request.session.flash(error[0], 'alert')
return False
return True # pragma: nocover
# -------------------------------------------------------------------------
def _save(self, dbuser, profiles, groups, values):
"""Save a user settings.
:type dbuser: :class:`.models.dbuser.DBUser` or ``None``
:param dbuser:
User to save.
:param dict profiles:
A dictionary such as ``{profile_id: (label, description),...}``.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict values:
Form values.
:rtype: :class:`~.models.dbuser.DBUser` instance or ``None``
"""
creation = dbuser is None
dbuser = dbuser or self._DBUser()
# Update user
record = {k: values[k] for k in values if hasattr(self._DBUser, k)}
record['password'] = values['password1'] or dbuser.password
error = dbuser.record_format(record)
if error: # pragma: nocover
self._request.session.flash(error, 'alert')
return None
modified = creation
record.update({
k: None for k in values
if not values[k] and hasattr(self._DBUser, k)})
for field in record:
if getattr(dbuser, field) != record[field]:
modified = True
setattr(dbuser, field, record[field])
if not creation and values['password1']:
modified = True
dbuser.password_update = datetime.now()
# Save
if creation:
try:
self._request.dbsession.add(dbuser)
self._request.dbsession.flush()
except (IntegrityError, FlushError):
self._request.session.flash(
_('This user already exists.'), 'alert')
return None
# Update extra informations
modified |= self._save_extra(dbuser, profiles, groups, values)
if modified:
dbuser.account_update = datetime.now()
return dbuser
# -------------------------------------------------------------------------
def _save_extra(self, dbuser, profiles, groups, values):
"""Save extra information on a user .
:type dbuser: :class:`.models.dbuser.DBUser` or ``None``
:param dbuser:
User to save.
:param dict profiles:
A dictionary such as ``{profile_id: (label, description),...}``.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict values:
Form values.
:rtype: bool
"""
modified = False
if not self._request.has_permission('user-create'):
return False
# Update profiles
user_profiles = {k.profile_id: k for k in dbuser.profiles}
for profile_id in profiles:
value = values['pfl:{0}'.format(profile_id)]
if value and profile_id not in user_profiles:
modified = True
self._request.dbsession.add(DBUserProfile(
user_id=dbuser.user_id, profile_id=profile_id))
elif not value and profile_id in user_profiles:
modified = True
self._request.dbsession.delete(
self._request.dbsession.query(DBUserProfile).filter_by(
user_id=dbuser.user_id, profile_id=profile_id)
.first())
# Update groups
user_groups = {k.group_id: k for k in dbuser.groups}
for group_id in groups:
value = values['grp:{0}'.format(group_id)]
if value and group_id not in user_groups:
modified = True
self._request.dbsession.add(DBGroupUser(
group_id=group_id, user_id=dbuser.user_id))
elif not value and group_id in user_groups:
modified = True
self._request.dbsession.delete(
self._request.dbsession.query(DBGroupUser).filter_by(
group_id=group_id, user_id=dbuser.user_id)
.first())
return modified