"""User group management view callables."""
from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound, HTTPNotFound
from ..lib.i18n import _
from ..lib.log import log_info
from ..lib.form import get_action, Form
from ..lib.filter import Filter
from ..lib.paging import PAGE_SIZES, Paging
from ..lib.utils import make_id
from ..lib.tabset import Tabset
from ..lib.attachment import attachment_url, attachment_update
from ..includes.themes import theme_static_prefix
from ..models.populate import xml2db, db2web, web2db
from ..models.dbprofile import DBProfile
from ..models.dbuser import DBUser
from ..models.dbgroup import DBGroup, DBGroupUser, DBGroupProfile
from . import BaseView
# =============================================================================
[docs]
class GroupView(BaseView):
"""Class to manage user group views.
:type request: pyramid.request.Request
:param request:
Current request.
"""
_DBGroup = DBGroup
_xml2db = (xml2db,)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='group_index',
renderer='chrysalio:Templates/group_index.pt',
permission='group-view')
@view_config(route_name='group_index', renderer='json', xhr=True)
def index(self):
"""List all user groups."""
# Ajax
i_creator = self._request.has_permission('group-create')
if self._request.is_xhr:
if i_creator:
self._import_groups()
return {}
# Action
action, items = get_action(self._request)
if action[:4] == 'del!' and i_creator:
self._delete_groups(items)
elif action == 'imp!' and i_creator:
self._import_groups()
elif action[:4] == 'exp!':
action = self._groups2response(items)
if action:
return action
# Filter
paging_id = 'groups'
pfilter = Filter(
self._request, paging_id, (
('group_id', _('Identifier'), False, ''),
('i18n_label', _('Label'), False, '')),
remove=action[:4] == 'crm!' and action[4:] or None)
# Paging
defaults = Paging.params(self._request, paging_id, '+group_id')
dbquery = pfilter.sql(
self._request.dbsession.query(self._DBGroup), 'groups')
oby = getattr(self._DBGroup, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
group_paging = Paging(self._request, paging_id, dbquery, defaults)
group_paging.set_current_ids('group_id')
# Form & completed action
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
if action and action[3] == '!':
action = ''
# Breadcrumbs & documentation
self._request.breadcrumbs(_('Groups'), 1)
self._request.documentation = '/admin/group/index'
return {
'action': action, 'items': items, 'form': form, 'pfilter': pfilter,
'paging': group_paging, 'PAGE_SIZES': PAGE_SIZES,
'download_max_size': self._request.registry['settings'][
'download-max-size'],
'i_creator': i_creator,
'i_editor': self._request.has_permission('group-edit'),
'has_attachments': bool(
self._request.registry.settings.get('attachments')),
'attachment_url': attachment_url}
# -------------------------------------------------------------------------
[docs]
@view_config(route_name='group_index_filter', renderer='json', xhr=True)
def index_filter(self):
"""Return a list to autocomplete a filter field."""
return Filter.sql_autocomplete(self._request, self._DBGroup)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='group_view',
renderer='chrysalio:Templates/group_view.pt',
permission='group-view')
def view(self):
"""Show user group."""
dbgroup = self._get_group()
picture = self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, dbgroup.attachments_dir,
dbgroup.attachments_key, dbgroup.picture) or
'{0}/images/group_picture.svg'.format(
theme_static_prefix(self._request)))
action = get_action(self._request)[0]
if action == 'exp!':
action = self._groups2response((dbgroup.group_id,))
if action:
return action
# User paging
user_paging, defaults, user_filter = self._user_paging(
action, dbgroup)
# Form
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
# Breadcrumbs & documentation
label = dbgroup.label(self._request)
self._request.breadcrumbs(
_('Group "${l}"', {'l': label}),
replace=self._request.route_path(
'group_edit', group_id=dbgroup.group_id))
self._request.documentation = '/admin/group/view'
return {
'form': form, 'label': label, 'user_filter': user_filter,
'user_paging': user_paging, 'tabset': Tabset(
self._request, 'tabGroup',
dbgroup.settings_tabs(self._request)),
'picture': picture, 'dbgroup': dbgroup,
'navigator': Paging.navigator(
self._request, 'groups', dbgroup.group_id,
self._request.route_path('group_view', group_id='_ID_')),
'i_editor': self._request.has_permission('group-edit')}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='group_create',
renderer='chrysalio:Templates/group_edit.pt',
permission='group-create')
@view_config(
route_name='group_edit',
renderer='chrysalio:Templates/group_edit.pt',
permission='group-edit')
@view_config(
route_name='group_edit', renderer='json', xhr=True,
permission='group-edit')
def edit(self):
"""Create or edit a user group."""
# Initialization
dbgroup = self._get_group() \
if 'group_id' in self._request.matchdict else None
# Ajax
if self._request.is_xhr:
if dbgroup is not None:
dbgroup.attachments_key, dbgroup.picture = \
attachment_update(
self._request, dbgroup.attachments_dir,
dbgroup.attachments_key,
self._request.POST['picture'],
replace=dbgroup.picture,
prefix=dbgroup.group_id[:12])
log_info(
self._request, 'group_update_picture',
dbgroup.group_id)
return {}
# User paging
action = get_action(self._request)[0]
user_paging, defaults, user_filter = self._user_paging(action)
# Profiles
profiles = {
k.profile_id: (
k.label(self._request), k.description(self._request))
for k in self._request.dbsession.query(DBProfile)}
# Form and action
form = Form(
self._request,
*self._DBGroup.settings_schema(
self._request, defaults, profiles, dbgroup),
obj=dbgroup)
if action == 'pct!' and dbgroup is not None:
dbgroup.attachments_key, dbgroup.picture = \
attachment_update(
self._request, dbgroup.attachments_dir,
dbgroup.attachments_key, self._request.POST['picture'],
replace=dbgroup.picture,
prefix=dbgroup.group_id[:12])
log_info(
self._request, 'group_update_picture',
dbgroup.group_id)
elif action == 'sav!' and form.validate():
dbgroup = self._save(dbgroup, profiles, form.values)
if dbgroup is not None:
dbuser = self._request.dbsession.query(DBUser).filter_by(
user_id=self._request.session['user']['user_id']).first()
dbuser.set_session(self._request)
if 'group_id' not in self._request.matchdict:
self._request.breadcrumbs.pop()
log_info(
self._request,
'group_id' in self._request.matchdict and
'group_edit' or 'group_create', dbgroup.group_id)
return HTTPFound(self._request.route_path(
'group_view', group_id=dbgroup.group_id))
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
# Picture
picture = \
dbgroup \
and self._request.registry.settings.get('attachments') \
and (attachment_url(
self._request, DBGroup.attachments_dir,
dbgroup.attachments_key, dbgroup.picture) or
'{0}/images/group_picture.svg'.format(
theme_static_prefix(self._request)))
# Breadcrumbs &documentation
label = dbgroup and dbgroup.label(self._request)
if not dbgroup:
self._request.breadcrumbs(_('Group Creation'))
else:
self._request.breadcrumbs(
_('Group "${l}" Edition', {'l': label}),
replace=self._request.route_path(
'group_view', group_id=dbgroup.group_id))
self._request.documentation = '/admin/group/edit'
return {
'form': form, 'dbgroup': dbgroup or self._DBGroup,
'action': action, 'label': label, 'profiles': profiles,
'user_filter': user_filter, 'user_paging': user_paging,
'picture': picture, 'tabset': Tabset(
self._request, 'tabGroup',
self._DBGroup.settings_tabs(self._request))}
# -------------------------------------------------------------------------
def _get_group(self):
"""Return the SqlAlchemy object of the selected user group or raise
an HTTPNotFound exception.
:rtype: .models.dbgroup.DBGroup
"""
group_id = self._request.matchdict['group_id']
dbgroup = self._request.dbsession.query(self._DBGroup).filter_by(
group_id=group_id).first()
if dbgroup is None:
raise HTTPNotFound()
return dbgroup
# -------------------------------------------------------------------------
def _delete_groups(self, group_ids):
"""Delete groups.
:param list group_ids:
List of group IDs to delete.
"""
deleted = []
for dbgroup in self._request.dbsession.query(self._DBGroup).filter(
self._DBGroup.group_id.in_(group_ids)):
deleted.append(dbgroup.group_id)
self._request.dbsession.delete(dbgroup)
if deleted:
log_info(self._request, 'group_delete', ' '.join(deleted))
# -------------------------------------------------------------------------
def _import_groups(self):
"""Import groups."""
# Get current IDs
group_ids = {k[0] for k in self._request.dbsession.query(
DBGroup.group_id)}
# Update database
web2db(self._request, self._xml2db[0], 'group')
# Get new IDs
group_ids = {k[0] for k in self._request.dbsession.query(
DBGroup.group_id)} - group_ids
if group_ids:
log_info(self._request, 'group_import', ' '.join(group_ids))
# -------------------------------------------------------------------------
def _groups2response(self, group_ids):
"""Export user groups as an XML file embedded in a Pyramid response.
:param list group_ids:
List of group IDs to export.
:rtype: :class:`pyramid.response.Response` or ``''``
"""
dbitems = tuple(self._request.dbsession.query(self._DBGroup).filter(
self._DBGroup.group_id.in_(group_ids)).order_by('group_id'))
if not dbitems:
return ''
filename = '{0}.{1}.xml'.format(
len(dbitems) == 1 and dbitems[0].group_id or
make_id(self._request.registry['settings']['title'], 'token'),
self._DBGroup.suffix)
log_info(
self._request, 'group_export',
' '.join([k.group_id for k in dbitems]))
return db2web(self._request, dbitems, filename)
# -------------------------------------------------------------------------
def _save(self, dbgroup, profiles, values):
"""Save a user group.
:type dbgroup: .models.dbgroup.DBGroup
:param dbgroup:
Group to save.
:param dict profiles:
A dictionary such as ``{profile_id: (label, description),...}``.
:param dict values:
Form values.
:rtype: :class:`~.models.dbgroup.DBGroup` instance or ``None``
"""
creation = dbgroup is None
dbgroup = dbgroup or self._DBGroup()
# Update group
record = {k: values[k] for k in values if k[:4] not in (
'mbr:', 'shw:', 'pfl:')}
if not creation:
record['group_id'] = dbgroup.group_id
error = dbgroup.record_format(record)
if error: # pragma: nocover
self._request.session.flash(error, 'alert')
return None
for field in record:
if getattr(dbgroup, field) != record[field]:
setattr(dbgroup, field, record[field])
# Save
if creation:
try:
self._request.dbsession.add(dbgroup)
self._request.dbsession.flush()
except (IntegrityError, FlushError):
self._request.session.flash(
_('This group already exists.'), 'alert')
return None
# Update users
self._users_update(dbgroup)
# Update profiles
self._profiles_update(dbgroup, profiles, values)
return dbgroup
# -------------------------------------------------------------------------
def _users_update(self, dbgroup):
"""Update the list of group users.
:type dbgroup: .models.dbwarhouse.DBGroup
:param dbgroup:
SQLAlchemy object for the current group.
"""
if not self._request.has_permission('user-create'):
return
is_set = {}
for value in self._request.POST:
if value[:4] == 'mbr:':
is_set[value[4:]] = True
elif value[:4] == 'shw:' and value[4:] not in is_set:
is_set[value[4:]] = False
for user_id, user_set in is_set.items():
dbgroup_user = self._request.dbsession.query(
DBGroupUser).filter_by(
group_id=dbgroup.group_id,
user_id=int(user_id)).first()
if dbgroup_user is not None and not user_set:
self._request.dbsession.delete(dbgroup_user)
elif dbgroup_user is None and user_set:
self._request.dbsession.add(DBGroupUser(
group_id=dbgroup.group_id, user_id=int(user_id)))
# -------------------------------------------------------------------------
def _user_paging(self, action, dbgroup=None):
"""Return a paging object for users.
:param str action:
Current action.
:type dbgroup: .models.dbwarhouse.DBGroup
:param dbgroup: (optional)
If not ``None``, users are only users of the group.
:rtype: tuple
:return:
A tuple such as ``(user_paging, defaults, user_filter)``.
"""
# Filter
paging_id = 'group_users'
ufilter = Filter(
self._request, paging_id, (
('login', _('Login'), False, None),
('last_name', _('Last name'), False, None),
('email', _('Email'), False, None),
('status', _('Status'), False,
[('', ' ')] + list(DBUser.status_labels.items()))),
(('status', '=', 'active'),),
remove=action[:4] == 'crm!' and action[4:] or None)
# Database query
defaults = Paging.params(
self._request, paging_id, '+last_name', default_display='list')
dbquery = ufilter.sql(
self._request.dbsession.query(
DBUser.user_id, DBUser.login,
DBUser.first_name, DBUser.last_name,
DBUser.honorific, DBUser.email,
DBUser.email_hidden, DBUser.status,
DBUser.last_login, DBUser.attachments_key,
DBUser.picture), 'users')
if dbgroup is not None:
dbquery = dbquery.filter(DBUser.user_id.in_(
[k.user_id for k in dbgroup.users]))
oby = getattr(DBUser, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
return Paging(self._request, paging_id, dbquery, defaults), \
dict(defaults), ufilter
# -------------------------------------------------------------------------
def _profiles_update(self, dbgroup, profiles, values):
"""Update the list of profiles.
:type dbgroup: .models.dbwarhouse.DBGroup
:param dbgroup:
SQLAlchemy object for the current group.
:param dict profiles:
A dictionary such as ``{profile_id: (label, description),...}``.
:param dict values:
Form values.
"""
if not self._request.has_permission('user-create'):
return
group_profiles = {k.profile_id: k for k in dbgroup.profiles}
for profile_id in profiles:
value = values['pfl:{0}'.format(profile_id)]
if value and profile_id not in group_profiles:
self._request.dbsession.add(DBGroupProfile(
group_id=dbgroup.group_id, profile_id=profile_id))
elif not value and profile_id in group_profiles:
self._request.dbsession.delete(
self._request.dbsession.query(DBGroupProfile)
.filter_by(
group_id=dbgroup.group_id, profile_id=profile_id)
.first())