Files
workdock-platform/backend/workflows/views.py
2026-03-28 08:56:43 +01:00

1240 lines
46 KiB
Python

from pathlib import Path
import re
from datetime import timedelta
from tempfile import NamedTemporaryFile
import json
from io import BytesIO
from functools import wraps
from celery import current_app
from django.conf import settings
from django.db import connection
from django.db import IntegrityError
from django.db.models import Q
from django.db.models import Count
from django.shortcuts import get_object_or_404, redirect, render
from django.contrib import messages
from django.contrib.auth import get_user_model, login as auth_login
from django.contrib.auth.decorators import login_required
from django.contrib.auth.tokens import default_token_generator
from django.http import JsonResponse
from django.views.decorators.http import require_POST
from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import gettext as _, gettext_lazy
from django.utils.translation import get_language, override
from django.urls import reverse
from .app_registry import build_portal_app_sections, get_portal_app_registry_rows, normalize_portal_app_sort_orders
from .backup_ops import (
create_backup_bundle,
delete_backup_bundle,
latest_backup_health_snapshot,
list_backup_bundles,
verify_backup_bundle,
)
from .branding import get_branding_email_copy, get_company_email_domain, get_default_notification_templates, get_portal_trial_config, is_trial_expired
from . import account_views, admin_config_views, integrations_views, request_views
from .forms import AccountAvatarForm, AccountDetailsForm, AccountNotificationPreferencesForm, AccountTOTPDisableForm, AccountTOTPEnableForm, AccountTOTPRegenerateRecoveryCodesForm, AppLoginForm, AppTOTPChallengeForm, OffboardingRequestForm, OnboardingRequestForm, PortalBrandingForm, PortalCompanyConfigForm, PortalTrialConfigForm, UserManagementCreateForm
from .form_builder import (
DEFAULT_FIELD_ORDER,
DEFAULT_CONDITIONAL_RULES,
FORM_PRESETS,
LOCKED_FIELD_RULES,
LOCKED_SECTION_RULES,
OFFBOARDING_PAGE_LABELS,
OFFBOARDING_PAGE_ORDER,
ONBOARDING_DEFAULT_PAGE,
build_custom_field_key,
custom_field_target_key,
ensure_form_field_configs,
ensure_form_conditional_rule_configs,
ensure_form_section_configs,
get_custom_field_configs,
get_custom_section_configs,
get_default_page_map,
get_section_definitions,
get_section_labels,
get_section_order,
apply_form_preset,
)
from .form_builder_views import form_builder_page_impl
from .intro_builder_views import intro_builder_page_impl
from .observability_views import (
audit_log_page_impl,
backup_recovery_page_impl,
create_backup_from_admin_impl,
job_monitor_page_impl,
verify_backup_from_admin_impl,
)
from .models import AdminAuditLog, AsyncTaskLog, EmployeeProfile, FormConditionalRuleConfig, FormCustomFieldConfig, FormCustomSectionConfig, FormFieldConfig, FormOption, FormSectionConfig, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, PortalAppConfig, PortalBranding, PortalCompanyConfig, PortalTrialConfig, ScheduledWelcomeEmail, SystemEmailConfig, UserNotification, UserProfile, WorkflowConfig
from .emailing import send_system_email
from .notifications import notify_user
from .roles import ROLE_GROUP_NAMES, ROLE_LABELS, ROLE_PLATFORM_OWNER, ROLE_SUPER_ADMIN, assign_user_role, get_user_role_key, get_user_role_label, user_has_capability
from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud
from .totp import build_otpauth_uri, generate_recovery_codes, generate_totp_secret
from .tasks import (
_generate_onboarding_intro_pdf,
_generate_onboarding_intro_session_pdf,
build_intro_sections_for_request,
process_offboarding_request,
process_onboarding_request,
send_scheduled_welcome_email,
)
def _redirect_back(request, fallback: str):
target = (request.POST.get('next') or request.GET.get('next') or '').strip()
if target.startswith('/'):
return redirect(target)
referer = (request.META.get('HTTP_REFERER') or '').strip()
if referer.startswith('http://127.0.0.1') or referer.startswith('http://localhost') or referer.startswith('/'):
return redirect(referer)
return redirect(fallback)
@login_required
@require_POST
def mark_notification_read(request, notification_id: int):
notification = get_object_or_404(UserNotification, id=notification_id, user=request.user)
notification.mark_read()
return _redirect_back(request, 'home')
@login_required
@require_POST
def mark_all_notifications_read(request):
UserNotification.objects.filter(user=request.user, read_at__isnull=True).update(read_at=timezone.now())
return _redirect_back(request, 'home')
ONBOARDING_GROUPS = {
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
'employment-end-box': ['employment_end_date'],
'group-mailboxes-box': ['group_mailboxes'],
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
'extra-software-box': ['additional_software_multi', 'additional_software'],
'extra-access-box': ['additional_access_text'],
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
}
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
ONBOARDING_CHECKBOX_LISTS = {
'needed_devices_multi',
'additional_hardware_multi',
'needed_software_multi',
'additional_software_multi',
'needed_accesses_multi',
'needed_workspace_groups_multi',
'needed_resources_multi',
}
ONBOARDING_SECTION_META = {
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
}
CONDITIONAL_RULE_OPERATOR_CHOICES = [
('checked', _('ist aktiviert')),
('equals', _('ist gleich')),
('not_equals', _('ist nicht gleich')),
]
def _field_rule_summary(*, is_visible: bool, is_required, locked: bool) -> str:
if locked:
return str(_('Fixes Kernfeld, immer sichtbar.'))
if not is_visible:
return str(_('Ausgeblendet, erscheint nicht im Formular.'))
if is_required is True:
return str(_('Sichtbar und als Pflichtfeld markiert.'))
if is_required is False:
return str(_('Sichtbar und optional.'))
return str(_('Sichtbar mit Standardverhalten.'))
def _conditional_clause_sentence(clause: dict, field_label_map: dict[str, str]) -> str:
field_name = (clause.get('field') or '').strip()
operator = (clause.get('operator') or '').strip()
value = clause.get('value')
if not field_name or not operator:
return ''
field_label = field_label_map.get(field_name, field_name)
if operator == 'checked':
return _('%(field)s ist aktiviert') % {'field': field_label}
if operator == 'equals':
if value not in (None, ''):
return _('%(field)s ist gleich %(value)s') % {'field': field_label, 'value': value}
return _('%(field)s ist gleich') % {'field': field_label}
if operator == 'not_equals':
if value not in (None, ''):
return _('%(field)s ist nicht gleich %(value)s') % {'field': field_label, 'value': value}
return _('%(field)s ist nicht gleich') % {'field': field_label}
return _('%(field)s erfüllt die Bedingung') % {'field': field_label}
def _conditional_rule_summary(clauses: list[dict], field_label_map: dict[str, str]) -> str:
active_clauses = [clause for clause in clauses if clause.get('field') and clause.get('operator')]
if not active_clauses:
return str(_('Immer sichtbar.'))
parts = [str(_conditional_clause_sentence(clause, field_label_map)) for clause in active_clauses]
return str(_('Sichtbar, wenn %(conditions)s.') % {'conditions': ' und '.join(parts)})
def _normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]:
configs = ensure_form_conditional_rule_configs(form_type)
payload = {}
for target_key, cfg in configs.items():
if not cfg.is_active:
continue
clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')]
if clauses:
payload[target_key] = {'all': clauses}
return payload
def _active_conditional_target_keys(form_type: str) -> set[str]:
return set(_normalized_conditional_rule_payload(form_type).keys())
def healthz(request):
db_ok = True
try:
with connection.cursor() as cursor:
cursor.execute('SELECT 1')
cursor.fetchone()
except Exception:
db_ok = False
status_code = 200 if db_ok else 503
return JsonResponse(
{
'status': 'ok' if db_ok else 'degraded',
'service': 'workdock',
'db': 'ok' if db_ok else 'error',
'time': timezone.now().isoformat(),
},
status=status_code,
)
def login_page(request):
return account_views.login_page_impl(request)
def login_totp_page(request):
return account_views.login_totp_page_impl(request)
@login_required
def account_profile_page(request):
return account_views.account_profile_page_impl(request)
def _require_capability(capability: str):
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped(request, *args, **kwargs):
if not user_has_capability(request.user, capability):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
return view_func(request, *args, **kwargs)
return wrapped
return decorator
def _display_user_name(user) -> str:
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
username = (getattr(user, 'username', '') or '').strip()
if username:
return username
return (getattr(user, 'email', '') or '').strip()
def _audit(
request,
action: str,
*,
target_type: str = '',
target_id: int | None = None,
target_label: str = '',
details: dict | None = None,
) -> None:
if not getattr(request, 'user', None) or not request.user.is_authenticated:
return
AdminAuditLog.objects.create(
actor=request.user,
actor_display=_display_user_name(request.user),
action=action,
target_type=target_type,
target_id=target_id,
target_label=target_label,
details=details or {},
)
def _form_field_labels(form_type: str) -> dict[str, str]:
if form_type == 'onboarding':
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
if form_type == 'offboarding':
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
return {}
def _request_target_label(obj, kind: str | None = None) -> str:
request_kind = (kind or '').strip()
if not request_kind:
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
email = (getattr(obj, 'work_email', '') or '').strip()
created_at = getattr(obj, 'created_at', None)
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
parts = [request_kind.capitalize(), name]
if email:
parts.append(f'<{email}>')
if date_label:
parts.append(date_label)
return ' | '.join(parts)
def _request_status_label(status_key: str, language_code: str | None = None) -> str:
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
with override(lang):
labels = {
'submitted': _('Eingereicht'),
'processing': _('In Bearbeitung'),
'completed': _('Abgeschlossen'),
'failed': _('Fehlgeschlagen'),
}
return labels.get(status_key, status_key)
def _request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]:
form_type = 'onboarding' if kind == 'onboarding' else 'offboarding'
language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower()
values = getattr(obj, 'custom_field_values', {}) or {}
rows = []
yes_label = 'Ja' if language_code == 'de' else 'Yes'
for cfg in get_custom_field_configs(form_type, include_inactive=True):
raw_value = values.get(cfg.field_key)
if raw_value in (None, '', False, []):
continue
if isinstance(raw_value, bool):
display_value = str(yes_label) if raw_value else ''
elif isinstance(raw_value, list):
display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip())
else:
display_value = str(raw_value).strip()
if not display_value:
continue
rows.append(
{
'label': cfg.translated_label(language_code),
'value': display_value,
'section': cfg.section_key,
'sort_order': cfg.sort_order,
}
)
rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label']))
return rows
def _audit_action_label(action: str) -> str:
labels = {
'requests_deleted': _('Vorgänge gelöscht'),
'request_deleted': _('Vorgang gelöscht'),
'request_retried': _('Vorgang erneut angestoßen'),
'intro_pdf_generated': _('Einweisungs-PDF erzeugt'),
'intro_live_pdf_generated': _('Live-Protokoll erzeugt'),
'intro_session_reset': _('Einweisung zurückgesetzt'),
'intro_session_saved': _('Einweisung als Entwurf gespeichert'),
'intro_session_completed': _('Einweisung abgeschlossen'),
'form_option_deleted': _('Formularoption gelöscht'),
'form_options_saved': _('Formularoptionen gespeichert'),
'form_field_texts_saved': _('Feldtexte gespeichert'),
'form_layout_saved': _('Formularlayout gespeichert'),
'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'),
'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'),
'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'),
'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'),
'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'),
'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'),
'welcome_email_paused': _('Welcome E-Mail pausiert'),
'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'),
'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'),
'smtp_test_sent': _('SMTP-Test gesendet'),
'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'),
'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'),
'email_mode_toggled': _('E-Mail-Modus umgeschaltet'),
'integrations_saved': _('Integrationen gespeichert'),
'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'),
'mail_settings_saved': _('Mail-Einstellungen gespeichert'),
'email_routing_saved': _('E-Mail-Routing gespeichert'),
'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'),
'user_created': _('Benutzer erstellt'),
'user_updated': _('Benutzer aktualisiert'),
'user_password_reset_sent': _('Passwort-Reset-Link versendet'),
'user_deleted': _('Benutzer gelöscht'),
'backup_created': _('Backup erstellt'),
'backup_verified': _('Backup verifiziert'),
'backup_deleted': _('Backup gelöscht'),
'backup_settings_saved': _('Backup-Einstellungen gespeichert'),
'portal_app_registry_saved': _('App-Registry gespeichert'),
}
return labels.get(action, action.replace('_', ' ').strip().capitalize())
def _translate_choice_list(choices):
return [(value, str(label)) for value, label in choices]
def _build_onboarding_layout(form) -> list[dict]:
ordered_names = list(form.fields.keys())
group_by_field = {}
for group_id, group_fields in ONBOARDING_GROUPS.items():
for name in group_fields:
group_by_field[name] = group_id
conditional_target_keys = _active_conditional_target_keys('onboarding')
rendered_groups = set()
consumed = set()
blocks = []
for field_name in ordered_names:
if field_name in consumed:
continue
group_id = group_by_field.get(field_name)
if group_id:
if group_id in rendered_groups:
continue
group_fields = [
form[name]
for name in ONBOARDING_GROUPS[group_id]
if name in form.fields
]
if not group_fields:
continue
blocks.append(
{
'kind': 'group',
'id': group_id,
'hidden_default': group_id in conditional_target_keys,
'fields': group_fields,
}
)
rendered_groups.add(group_id)
consumed.update([f.name for f in group_fields])
continue
if field_name.startswith('custom__') and field_name in conditional_target_keys:
blocks.append(
{
'kind': 'group',
'id': field_name,
'hidden_default': True,
'fields': [form[field_name]],
}
)
consumed.add(field_name)
continue
blocks.append({'kind': 'field', 'field': form[field_name]})
consumed.add(field_name)
return blocks
def _section_for_block(block: dict, field_pages: dict[str, str]) -> str:
if block['kind'] == 'field':
return field_pages.get(block['field'].name, 'abschluss')
fields = block.get('fields') or []
if not fields:
return 'abschluss'
return field_pages.get(fields[0].name, 'abschluss')
def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]:
section_defs = get_section_definitions('onboarding')
section_order = [item['key'] for item in section_defs]
section_titles = {item['key']: item['title'] for item in section_defs}
grouped = {key: [] for key in section_order}
for block in blocks:
section_key = _section_for_block(block, field_pages)
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(block)
visible_keys = visible_section_keys or set(section_order)
sections = []
custom_section_keys = {item['key'] for item in section_defs if item.get('is_custom')}
for key in section_order:
if key not in visible_keys:
continue
blocks_for_section = grouped[key]
has_custom_checkbox_fields = False
for block in blocks_for_section:
candidate_fields = [block['field']] if block['kind'] == 'field' else (block.get('fields') or [])
for bound_field in candidate_fields:
widget_type = getattr(getattr(bound_field.field, 'widget', None), 'input_type', '')
if bound_field.name.startswith('custom__') and widget_type == 'checkbox':
has_custom_checkbox_fields = True
break
if has_custom_checkbox_fields:
break
sections.append(
{
'key': key,
'title': section_titles.get(key, ONBOARDING_SECTION_META.get(key, {}).get('title', key)),
'subtitle': ONBOARDING_SECTION_META.get(key, {}).get('subtitle', ''),
'blocks': blocks_for_section,
'is_custom': key in custom_section_keys,
'has_custom_checkbox_fields': has_custom_checkbox_fields,
}
)
return sections
OFFBOARDING_SECTION_META = {
'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')},
'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')},
}
def _build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]:
field_pages = getattr(form, '_field_page_keys', {})
grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER}
for field_name in form.fields.keys():
section_key = field_pages.get(field_name, 'abschluss')
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(form[field_name])
visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER)
return [
{
'key': key,
'title': OFFBOARDING_SECTION_META[key]['title'],
'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'],
'fields': grouped[key],
}
for key in OFFBOARDING_PAGE_ORDER
if key in visible_keys and grouped[key]
]
def _ops_summary_for_user(user) -> dict[str, object]:
can_view_jobs = user_has_capability(user, 'view_job_monitor')
can_manage_backups = user_has_capability(user, 'manage_backups')
summary: dict[str, object] = {
'show': can_view_jobs or can_manage_backups,
'can_view_jobs': can_view_jobs,
'can_manage_backups': can_manage_backups,
'failed_count_24h': 0,
'started_count_24h': 0,
'success_count_24h': 0,
'recent_failed_logs': [],
'backup_health': latest_backup_health_snapshot() if can_manage_backups else None,
}
if not can_view_jobs:
return summary
since = timezone.now() - timedelta(hours=24)
logs = AsyncTaskLog.objects.filter(started_at__gte=since)
counts = {
row['status']: row['count']
for row in logs.values('status').annotate(count=Count('id'))
}
summary['failed_count_24h'] = counts.get('failed', 0)
summary['started_count_24h'] = counts.get('started', 0)
summary['success_count_24h'] = counts.get('succeeded', 0)
summary['recent_failed_logs'] = list(
AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5]
)
return summary
@login_required
def home(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
role_key = get_user_role_key(request.user)
return render(
request,
'workflows/home.html',
{
'nextcloud_enabled': is_nextcloud_enabled(),
'email_test_mode': is_email_test_mode(),
'workflow_config': config,
'role_label': get_user_role_label(request.user),
'role_key': role_key,
'portal_app_sections': build_portal_app_sections(request.user),
},
)
@_require_capability('manage_app_registry')
def portal_app_registry_page(request):
return admin_config_views.portal_app_registry_page_impl(request, translate_choice_list=_translate_choice_list)
@_require_capability('view_job_monitor')
def job_monitor_page(request):
return job_monitor_page_impl(request)
@_require_capability('manage_app_registry')
@require_POST
def save_portal_app_registry(request):
return admin_config_views.save_portal_app_registry_impl(request, audit_fn=_audit)
def _user_management_rows():
user_model = get_user_model()
role_order = {
ROLE_PLATFORM_OWNER: 0,
ROLE_SUPER_ADMIN: 0,
'admin': 1,
'it_staff': 2,
'staff': 3,
}
rows = []
for user in user_model.objects.all().order_by('-is_active', 'username'):
role_key = get_user_role_key(user)
rows.append(
{
'user': user,
'role_key': role_key,
'role_label': str(ROLE_LABELS[role_key]),
'role_sort': role_order.get(role_key, 99),
'display_name': _display_user_name(user),
}
)
rows.sort(key=lambda item: (not item['user'].is_active, item['role_sort'], item['user'].username.lower()))
return rows
def _render_user_management(request, create_form=None, status_code: int = 200):
recent_user_events = list(
AdminAuditLog.objects.select_related('actor')
.filter(action__in=['user_created', 'user_updated', 'user_password_reset_sent', 'user_deleted'])
.order_by('-created_at', '-id')[:12]
)
for row in recent_user_events:
row.action_label = _audit_action_label(row.action)
role_key = (row.details or {}).get('role')
row.role_label = str(ROLE_LABELS[role_key]) if role_key in ROLE_LABELS else role_key
include_product_owner = get_user_role_key(request.user) == ROLE_PLATFORM_OWNER
return render(
request,
'workflows/user_management.html',
{
'create_form': create_form or UserManagementCreateForm(include_product_owner=include_product_owner),
'rows': _user_management_rows(),
'role_choices': [
(key, str(ROLE_LABELS[key]))
for key in ROLE_GROUP_NAMES
if include_product_owner or key != ROLE_PLATFORM_OWNER
],
'include_product_owner': include_product_owner,
'recent_user_events': recent_user_events,
},
status=status_code,
)
def _platform_owner_user_count() -> int:
user_model = get_user_model()
return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_PLATFORM_OWNER and user.is_active)
def _super_admin_user_count() -> int:
user_model = get_user_model()
return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_SUPER_ADMIN and user.is_active)
def _would_remove_last_super_admin(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool:
if get_user_role_key(user) != ROLE_SUPER_ADMIN or not user.is_active:
return False
if _super_admin_user_count() > 1:
return False
if deleting:
return True
if new_role_key is not None and new_role_key != ROLE_SUPER_ADMIN:
return True
if new_is_active is not None and not new_is_active:
return True
return False
def _would_remove_last_platform_owner(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool:
if get_user_role_key(user) != ROLE_PLATFORM_OWNER or not user.is_active:
return False
if _platform_owner_user_count() > 1:
return False
if deleting:
return True
if new_role_key is not None and new_role_key != ROLE_PLATFORM_OWNER:
return True
if new_is_active is not None and not new_is_active:
return True
return False
def _send_user_access_email(request, target_user, *, invitation: bool) -> None:
email = (target_user.email or '').strip()
if not email:
raise ValueError(_('Für diesen Benutzer ist keine E-Mail-Adresse hinterlegt.'))
uid = urlsafe_base64_encode(force_bytes(target_user.pk))
token = default_token_generator.make_token(target_user)
reset_path = reverse('password_reset_confirm', kwargs={'uidb64': uid, 'token': token})
reset_url = request.build_absolute_uri(reset_path)
branding_copy = get_branding_email_copy()
if invitation:
subject = _('Zugangseinladung für %(username)s') % {'username': target_user.username}
body = _(
'Hallo %(name)s,\n\n'
'für Sie wurde ein Benutzerkonto im %(portal_title)s angelegt.\n'
'Bitte öffnen Sie den folgenden Link, um Ihr Passwort zu setzen:\n'
'%(url)s\n\n'
'Wenn Sie diese Einladung nicht erwartet haben, melden Sie sich bitte bei Ihrem Administrator.'
) % {
'name': _display_user_name(target_user),
'portal_title': branding_copy['portal_title'],
'url': reset_url,
}
else:
subject = _('Passwort zurücksetzen für %(username)s') % {'username': target_user.username}
body = _(
'Hallo %(name)s,\n\n'
'für Ihr Konto wurde ein Link zum Zurücksetzen des Passworts erstellt.\n'
'Bitte öffnen Sie den folgenden Link:\n'
'%(url)s\n\n'
'Wenn Sie diese Anfrage nicht erwartet haben, können Sie diese E-Mail ignorieren.'
) % {
'name': _display_user_name(target_user),
'url': reset_url,
}
send_system_email(subject=subject, body=body, to=[email])
@_require_capability('manage_users')
def user_management_page(request):
return admin_config_views.user_management_page_impl(request, render_user_management_fn=_render_user_management)
@_require_capability('manage_product_branding')
def portal_branding_page(request):
return admin_config_views.portal_branding_page_impl(request, build_branding_sections_fn=_build_branding_sections)
@_require_capability('manage_product_branding')
@require_POST
def save_portal_branding(request):
return admin_config_views.save_portal_branding_impl(request, audit_fn=_audit, build_branding_sections_fn=_build_branding_sections)
def _build_branding_sections(form, branding):
sections = [
{
'key': 'identity',
'title': _('Identität'),
'subtitle': _('Titel, Firmenname und zentrale Spracheinstellungen.'),
'fields': ['portal_title', 'company_name', 'company_domain', 'default_language', 'login_subtitle'],
'field_full': {'login_subtitle'},
'hint_map': {
'company_domain': _('Wird für E-Mail-Vorschläge und Domain-bezogene Standardtexte verwendet, z. B. workdock.de.'),
},
},
{
'key': 'appearance',
'title': _('Farben & Erscheinungsbild'),
'subtitle': _('Zentrale visuelle Markenwerte und Browser-Icon.'),
'fields': ['primary_color', 'secondary_color', 'logo_image', 'favicon_image'],
'field_full': set(),
'hint_map': {
'logo_image': _('Erlaubte Formate: SVG, PNG, JPG, JPEG, WEBP. Maximal 5 MB.'),
'favicon_image': _('Erlaubte Formate: ICO, PNG, SVG, WEBP. Maximal 2 MB.'),
},
},
{
'key': 'communication',
'title': _('Kommunikation'),
'subtitle': _('Absender, Support und PDF-Branding für ausgehende Kommunikation.'),
'fields': ['support_email', 'sender_display_name', 'pdf_letterhead'],
'field_full': {'pdf_letterhead'},
'hint_map': {
'sender_display_name': _('Wird für ausgehende System-E-Mails als Anzeigename verwendet.'),
'pdf_letterhead': _('Erlaubtes Format: PDF. Maximal 10 MB.'),
},
},
{
'key': 'legal',
'title': _('Footer & Rechtliches'),
'subtitle': _('Gemeinsame Footer-Texte und rechtliche Hinweise für die Shell.'),
'fields': ['footer_text', 'legal_notice', 'footer_text_en', 'legal_notice_en'],
'field_full': {'legal_notice', 'legal_notice_en'},
'hint_map': {},
},
]
for section in sections:
rows = []
for field_name in section['fields']:
field = form[field_name]
value = getattr(branding, field_name, '') or ''
is_file = bool(getattr(field.field.widget, 'input_type', '') == 'file')
rows.append(
{
'name': field_name,
'bound_field': field,
'label': field.label,
'value': value,
'is_file': is_file,
'is_full': field_name in section.get('field_full', set()),
'hint': section.get('hint_map', {}).get(field_name, ''),
}
)
section['rows'] = rows
return sections
@_require_capability('manage_company_config')
def portal_company_config_page(request):
return admin_config_views.portal_company_config_page_impl(request, build_company_config_sections_fn=_build_company_config_sections)
@_require_capability('manage_company_config')
@require_POST
def save_portal_company_config(request):
return admin_config_views.save_portal_company_config_impl(request, audit_fn=_audit, build_company_config_sections_fn=_build_company_config_sections)
def _build_company_config_sections(form, company_config):
sections = [
{
'key': 'profile',
'title': _('Firmenprofil'),
'subtitle': _('Rechtlicher Name und zentrale Stammdaten der Firma.'),
'fields': ['legal_company_name', 'phone_number', 'website_url', 'country'],
},
{
'key': 'address',
'title': _('Adresse & Register'),
'subtitle': _('Anschrift sowie optionale Register- und Steuerangaben.'),
'fields': ['street_address', 'postal_code', 'city', 'registration_number', 'vat_id'],
},
{
'key': 'contacts',
'title': _('Kontaktpunkte'),
'subtitle': _('Zentrale Ansprechpartner für HR, IT und Operations.'),
'fields': ['hr_contact_email', 'it_contact_email', 'operations_contact_email'],
},
{
'key': 'public',
'title': _('Recht & Öffentlichkeit'),
'subtitle': _('Öffentliche Links für Website, Impressum und Datenschutz.'),
'fields': ['imprint_url', 'privacy_url'],
'hint': _('Diese Links können später im Portal-Footer oder in öffentlichen Seiten verwendet werden.'),
},
]
for section in sections:
rows = []
for field_name in section['fields']:
field = form[field_name]
rows.append(
{
'name': field_name,
'bound_field': field,
'label': field.label,
'value': getattr(company_config, field_name, '') or '',
}
)
section['rows'] = rows
return sections
@_require_capability('manage_trial_lifecycle')
def portal_trial_config_page(request):
return admin_config_views.portal_trial_config_page_impl(request)
@_require_capability('manage_trial_lifecycle')
@require_POST
def save_portal_trial_config(request):
return admin_config_views.save_portal_trial_config_impl(request, audit_fn=_audit)
@_require_capability('manage_users')
@require_POST
def create_user_from_admin(request):
return admin_config_views.create_user_from_admin_impl(
request,
render_user_management_fn=_render_user_management,
send_user_access_email_fn=_send_user_access_email,
audit_fn=_audit,
display_user_name_fn=_display_user_name,
)
@_require_capability('manage_users')
@require_POST
def update_user_from_admin(request, user_id: int):
return admin_config_views.update_user_from_admin_impl(
request,
user_id,
would_remove_last_platform_owner_fn=_would_remove_last_platform_owner,
would_remove_last_super_admin_fn=_would_remove_last_super_admin,
audit_fn=_audit,
display_user_name_fn=_display_user_name,
)
@_require_capability('manage_users')
@require_POST
def send_password_reset_from_admin(request, user_id: int):
return admin_config_views.send_password_reset_from_admin_impl(
request,
user_id,
send_user_access_email_fn=_send_user_access_email,
audit_fn=_audit,
display_user_name_fn=_display_user_name,
)
@_require_capability('manage_users')
@require_POST
def delete_user_from_admin(request, user_id: int):
return admin_config_views.delete_user_from_admin_impl(
request,
user_id,
would_remove_last_platform_owner_fn=_would_remove_last_platform_owner,
would_remove_last_super_admin_fn=_would_remove_last_super_admin,
audit_fn=_audit,
display_user_name_fn=_display_user_name,
)
@_require_capability('view_docs')
def handbook_page(request):
return admin_config_views.handbook_page_impl(request)
@_require_capability('view_docs')
def project_wiki_page(request):
return admin_config_views.project_wiki_page_impl(request)
@_require_capability('view_docs')
def developer_handbook_page(request):
return admin_config_views.developer_handbook_page_impl(request)
@_require_capability('view_docs')
def release_checklist_page(request):
return admin_config_views.release_checklist_page_impl(request)
@_require_capability('view_audit_log')
def audit_log_page(request):
return audit_log_page_impl(request)
@_require_capability('manage_backups')
def backup_recovery_page(request):
return backup_recovery_page_impl(request)
@_require_capability('manage_backups')
@require_POST
def create_backup_from_admin(request):
return create_backup_from_admin_impl(request, audit_fn=_audit)
@_require_capability('manage_backups')
@require_POST
def verify_backup_from_admin(request, backup_name: str):
return verify_backup_from_admin_impl(request, backup_name, audit_fn=_audit)
@_require_capability('manage_backups')
@require_POST
def delete_backup_from_admin(request, backup_name: str):
try:
result = delete_backup_bundle(backup_name)
_audit(
request,
'backup_deleted',
target_type='backup_bundle',
target_label=backup_name,
details={},
)
messages.success(request, _('Backup wurde gelöscht: %(name)s') % {'name': result['name']})
except Exception as exc:
messages.error(request, _('Backup konnte nicht gelöscht werden: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('view_request_timeline')
def request_timeline_page(request, kind: str, request_id: int):
return request_views.request_timeline_page_impl(
request,
kind,
request_id,
request_target_label_fn=_request_target_label,
request_custom_field_details_fn=_request_custom_field_details,
audit_action_label_fn=_audit_action_label,
)
@login_required
def requests_dashboard(request):
return request_views.requests_dashboard_impl(
request,
audit_fn=_audit,
request_target_label_fn=_request_target_label,
request_status_label_fn=_request_status_label,
)
@login_required
@ensure_csrf_cookie
def onboarding_create(request):
return request_views.onboarding_create_impl(
request,
build_onboarding_layout_fn=_build_onboarding_layout,
build_onboarding_sections_fn=_build_onboarding_sections,
normalized_conditional_rule_payload_fn=_normalized_conditional_rule_payload,
display_user_name_fn=_display_user_name,
onboarding_inline_checks=ONBOARDING_INLINE_CHECKS,
onboarding_checkbox_lists=ONBOARDING_CHECKBOX_LISTS,
)
@login_required
def onboarding_success(request, request_id: int):
return request_views.onboarding_success_impl(request, request_id)
@_require_capability('generate_intro_pdfs')
@require_POST
def generate_onboarding_intro_pdf(request, request_id: int):
return request_views.generate_onboarding_intro_pdf_impl(request, request_id, audit_fn=_audit)
@_require_capability('generate_intro_pdfs')
@require_POST
def generate_onboarding_intro_session_pdf(request, request_id: int):
return request_views.generate_onboarding_intro_session_pdf_impl(request, request_id, audit_fn=_audit, display_user_name_fn=_display_user_name)
@_require_capability('run_intro_session')
def onboarding_intro_session_page(request, request_id: int):
return request_views.onboarding_intro_session_page_impl(request, request_id, audit_fn=_audit, display_user_name_fn=_display_user_name)
@login_required
@ensure_csrf_cookie
def offboarding_create(request):
return request_views.offboarding_create_impl(
request,
build_offboarding_sections_fn=_build_offboarding_sections,
display_user_name_fn=_display_user_name,
)
@login_required
def offboarding_success(request, request_id: int):
return request_views.offboarding_success_impl(request, request_id)
@_require_capability('manage_builders')
def form_builder_page(request):
return form_builder_page_impl(
request,
audit_fn=_audit,
translate_choice_list=_translate_choice_list,
form_field_labels_fn=_form_field_labels,
field_rule_summary_fn=_field_rule_summary,
conditional_rule_summary_fn=_conditional_rule_summary,
onboarding_groups=ONBOARDING_GROUPS,
conditional_rule_operator_choices=CONDITIONAL_RULE_OPERATOR_CHOICES,
)
@_require_capability('manage_builders')
def intro_builder_page(request):
return intro_builder_page_impl(
request,
audit_fn=_audit,
translate_choice_list=_translate_choice_list,
)
@_require_capability('manage_integrations')
def integrations_setup_page(request):
return integrations_views.integrations_setup_page_impl(request)
@_require_capability('manage_welcome_emails')
def welcome_emails_page(request):
return integrations_views.welcome_emails_page_impl(request)
@_require_capability('manage_welcome_emails')
@require_POST
def trigger_welcome_email_now(request, schedule_id: int):
return integrations_views.trigger_welcome_email_now_impl(request, schedule_id, audit_fn=_audit)
@_require_capability('manage_welcome_emails')
@require_POST
def save_welcome_email_settings(request):
return integrations_views.save_welcome_email_settings_impl(request, audit_fn=_audit)
def _revoke_celery_task(task_id: str) -> None:
if not task_id:
return
try:
current_app.control.revoke(task_id, terminate=False)
except Exception:
return
def _parse_selected_schedule_ids(raw: str) -> list[int]:
if not raw:
return []
parsed: list[int] = []
seen: set[int] = set()
for token in raw.split(','):
token = token.strip()
if not token:
continue
try:
schedule_id = int(token)
except ValueError:
continue
if schedule_id in seen:
continue
seen.add(schedule_id)
parsed.append(schedule_id)
return parsed
@_require_capability('manage_welcome_emails')
@require_POST
def bulk_welcome_email_action(request):
return integrations_views.bulk_welcome_email_action_impl(request, audit_fn=_audit)
@_require_capability('manage_welcome_emails')
@require_POST
def pause_welcome_email(request, schedule_id: int):
return integrations_views.pause_welcome_email_impl(request, schedule_id, audit_fn=_audit)
@_require_capability('manage_welcome_emails')
@require_POST
def resume_welcome_email(request, schedule_id: int):
return integrations_views.resume_welcome_email_impl(request, schedule_id, audit_fn=_audit)
@_require_capability('manage_welcome_emails')
@require_POST
def cancel_welcome_email(request, schedule_id: int):
return integrations_views.cancel_welcome_email_impl(request, schedule_id, audit_fn=_audit)
@_require_capability('manage_builders')
@require_POST
def form_builder_save_order(request):
return integrations_views.form_builder_save_order_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def send_test_email(request):
return integrations_views.send_test_email_impl(request, audit_fn=_audit, redirect_back_fn=_redirect_back)
@_require_capability('manage_integrations')
@require_POST
def nextcloud_test_upload(request):
return integrations_views.nextcloud_test_upload_impl(request, audit_fn=_audit, redirect_back_fn=_redirect_back)
@_require_capability('manage_integrations')
@require_POST
def toggle_nextcloud_enabled(request):
return integrations_views.toggle_nextcloud_enabled_impl(request, audit_fn=_audit, redirect_back_fn=_redirect_back)
@_require_capability('manage_integrations')
@require_POST
def toggle_email_mode(request):
return integrations_views.toggle_email_mode_impl(request, audit_fn=_audit, redirect_back_fn=_redirect_back)
@_require_capability('manage_integrations')
@require_POST
def save_integrations_settings(request):
return integrations_views.save_integrations_settings_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def save_nextcloud_settings(request):
return integrations_views.save_nextcloud_settings_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def save_workflow_rules(request):
return integrations_views.save_workflow_rules_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def save_backup_settings(request):
return integrations_views.save_backup_settings_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def save_mail_settings(request):
return integrations_views.save_mail_settings_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def save_email_routing_settings(request):
return integrations_views.save_email_routing_settings_impl(request, audit_fn=_audit)
@_require_capability('manage_integrations')
@require_POST
def save_notification_rules(request):
return integrations_views.save_notification_rules_impl(request, audit_fn=_audit)
@_require_capability('delete_requests')
@require_POST
def delete_request_from_dashboard(request, kind: str, request_id: int):
return request_views.delete_request_from_dashboard_impl(request, kind, request_id, audit_fn=_audit, request_target_label_fn=_request_target_label)
@_require_capability('retry_requests')
@require_POST
def retry_request_from_dashboard(request, kind: str, request_id: int):
return request_views.retry_request_from_dashboard_impl(request, kind, request_id, audit_fn=_audit, request_target_label_fn=_request_target_label)