from pathlib import Path import re from datetime import timedelta from tempfile import NamedTemporaryFile import json from io import BytesIO from functools import wraps from celery import current_app from django.conf import settings from django.db import connection from django.db import IntegrityError from django.db.models import Q from django.db.models import Count from django.shortcuts import get_object_or_404, redirect, render from django.contrib import messages from django.contrib.auth import get_user_model, login as auth_login from django.contrib.auth.decorators import login_required from django.contrib.auth.tokens import default_token_generator from django.http import JsonResponse from django.views.decorators.http import require_POST from django.views.decorators.csrf import ensure_csrf_cookie from django.utils import timezone from django.utils.encoding import force_bytes from django.utils.http import urlsafe_base64_encode from django.utils.translation import gettext as _, gettext_lazy from django.utils.translation import get_language, override from django.urls import reverse from .app_registry import build_portal_app_sections, get_portal_app_registry_rows, normalize_portal_app_sort_orders from .backup_ops import ( create_backup_bundle, delete_backup_bundle, latest_backup_health_snapshot, list_backup_bundles, verify_backup_bundle, ) from .branding import get_branding_email_copy, get_company_email_domain, get_default_notification_templates, get_portal_trial_config, is_trial_expired from .forms import AccountAvatarForm, AccountDetailsForm, AccountNotificationPreferencesForm, AccountTOTPDisableForm, AccountTOTPEnableForm, AccountTOTPRegenerateRecoveryCodesForm, AppLoginForm, AppTOTPChallengeForm, OffboardingRequestForm, OnboardingRequestForm, PortalBrandingForm, PortalCompanyConfigForm, PortalTrialConfigForm, UserManagementCreateForm from .form_builder import ( DEFAULT_FIELD_ORDER, DEFAULT_CONDITIONAL_RULES, FORM_PRESETS, LOCKED_FIELD_RULES, LOCKED_SECTION_RULES, OFFBOARDING_PAGE_LABELS, OFFBOARDING_PAGE_ORDER, ONBOARDING_DEFAULT_PAGE, build_custom_field_key, custom_field_target_key, ensure_form_field_configs, ensure_form_conditional_rule_configs, ensure_form_section_configs, get_custom_field_configs, get_custom_section_configs, get_default_page_map, get_section_definitions, get_section_labels, get_section_order, apply_form_preset, ) from .models import AdminAuditLog, AsyncTaskLog, EmployeeProfile, FormConditionalRuleConfig, FormCustomFieldConfig, FormCustomSectionConfig, FormFieldConfig, FormOption, FormSectionConfig, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, PortalAppConfig, PortalBranding, PortalCompanyConfig, PortalTrialConfig, ScheduledWelcomeEmail, SystemEmailConfig, UserNotification, UserProfile, WorkflowConfig from .emailing import send_system_email from .notifications import notify_user from .roles import ROLE_GROUP_NAMES, ROLE_LABELS, ROLE_PLATFORM_OWNER, ROLE_SUPER_ADMIN, assign_user_role, get_user_role_key, get_user_role_label, user_has_capability from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud from .totp import build_otpauth_uri, generate_recovery_codes, generate_totp_secret from .tasks import ( _generate_onboarding_intro_pdf, _generate_onboarding_intro_session_pdf, build_intro_sections_for_request, process_offboarding_request, process_onboarding_request, send_scheduled_welcome_email, ) def _redirect_back(request, fallback: str): target = (request.POST.get('next') or request.GET.get('next') or '').strip() if target.startswith('/'): return redirect(target) referer = (request.META.get('HTTP_REFERER') or '').strip() if referer.startswith('http://127.0.0.1') or referer.startswith('http://localhost') or referer.startswith('/'): return redirect(referer) return redirect(fallback) @login_required @require_POST def mark_notification_read(request, notification_id: int): notification = get_object_or_404(UserNotification, id=notification_id, user=request.user) notification.mark_read() return _redirect_back(request, 'home') @login_required @require_POST def mark_all_notifications_read(request): UserNotification.objects.filter(user=request.user, read_at__isnull=True).update(read_at=timezone.now()) return _redirect_back(request, 'home') ONBOARDING_GROUPS = { 'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'], 'employment-end-box': ['employment_end_date'], 'group-mailboxes-box': ['group_mailboxes'], 'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'], 'extra-software-box': ['additional_software_multi', 'additional_software'], 'extra-access-box': ['additional_access_text'], 'successor-box': ['successor_name', 'inherit_phone_number_choice'], } ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'} ONBOARDING_CHECKBOX_LISTS = { 'needed_devices_multi', 'additional_hardware_multi', 'needed_software_multi', 'additional_software_multi', 'needed_accesses_multi', 'needed_workspace_groups_multi', 'needed_resources_multi', } ONBOARDING_SECTION_META = { 'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')}, 'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')}, 'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')}, 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')}, } CONDITIONAL_RULE_OPERATOR_CHOICES = [ ('checked', _('ist aktiviert')), ('equals', _('ist gleich')), ('not_equals', _('ist nicht gleich')), ] def _field_rule_summary(*, is_visible: bool, is_required, locked: bool) -> str: if locked: return str(_('Fixes Kernfeld, immer sichtbar.')) if not is_visible: return str(_('Ausgeblendet, erscheint nicht im Formular.')) if is_required is True: return str(_('Sichtbar und als Pflichtfeld markiert.')) if is_required is False: return str(_('Sichtbar und optional.')) return str(_('Sichtbar mit Standardverhalten.')) def _conditional_clause_sentence(clause: dict, field_label_map: dict[str, str]) -> str: field_name = (clause.get('field') or '').strip() operator = (clause.get('operator') or '').strip() value = clause.get('value') if not field_name or not operator: return '' field_label = field_label_map.get(field_name, field_name) if operator == 'checked': return _('%(field)s ist aktiviert') % {'field': field_label} if operator == 'equals': if value not in (None, ''): return _('%(field)s ist gleich %(value)s') % {'field': field_label, 'value': value} return _('%(field)s ist gleich') % {'field': field_label} if operator == 'not_equals': if value not in (None, ''): return _('%(field)s ist nicht gleich %(value)s') % {'field': field_label, 'value': value} return _('%(field)s ist nicht gleich') % {'field': field_label} return _('%(field)s erfüllt die Bedingung') % {'field': field_label} def _conditional_rule_summary(clauses: list[dict], field_label_map: dict[str, str]) -> str: active_clauses = [clause for clause in clauses if clause.get('field') and clause.get('operator')] if not active_clauses: return str(_('Immer sichtbar.')) parts = [str(_conditional_clause_sentence(clause, field_label_map)) for clause in active_clauses] return str(_('Sichtbar, wenn %(conditions)s.') % {'conditions': ' und '.join(parts)}) def _normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]: configs = ensure_form_conditional_rule_configs(form_type) payload = {} for target_key, cfg in configs.items(): if not cfg.is_active: continue clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')] if clauses: payload[target_key] = {'all': clauses} return payload def _active_conditional_target_keys(form_type: str) -> set[str]: return set(_normalized_conditional_rule_payload(form_type).keys()) def healthz(request): db_ok = True try: with connection.cursor() as cursor: cursor.execute('SELECT 1') cursor.fetchone() except Exception: db_ok = False status_code = 200 if db_ok else 503 return JsonResponse( { 'status': 'ok' if db_ok else 'degraded', 'service': 'workdock', 'db': 'ok' if db_ok else 'error', 'time': timezone.now().isoformat(), }, status=status_code, ) def login_page(request): if getattr(request.user, 'is_authenticated', False): return redirect('home') next_target = (request.POST.get('next') or request.GET.get('next') or '').strip() form = AppLoginForm(request=request, data=request.POST or None) if request.method == 'POST' and form.is_valid(): user = form.get_user() profile, _ = UserProfile.objects.get_or_create(user=user) safe_next = next_target if next_target.startswith('/') else '' if profile.totp_enabled: request.session['login_pending_user_id'] = user.pk request.session['login_pending_backend'] = getattr(user, 'backend', '') request.session['login_pending_next'] = safe_next return redirect('login_totp') auth_login(request, user, backend=getattr(user, 'backend', None)) now_ts = int(timezone.now().timestamp()) request.session['auth_fresh_ts'] = now_ts request.session['last_activity_ts'] = now_ts return redirect(safe_next or reverse('home')) request.session.pop('login_pending_user_id', None) request.session.pop('login_pending_backend', None) request.session.pop('login_pending_next', None) return render( request, 'workflows/auth/login.html', { 'form': form, 'next': next_target, 'login_step': 'password', }, ) def login_totp_page(request): if getattr(request.user, 'is_authenticated', False): return redirect('home') pending_user_id = request.session.get('login_pending_user_id') backend_path = (request.session.get('login_pending_backend') or '').strip() next_target = (request.session.get('login_pending_next') or '').strip() if not pending_user_id: return redirect('login') user = get_object_or_404(get_user_model(), pk=pending_user_id) profile, _ = UserProfile.objects.get_or_create(user=user) if not profile.totp_enabled: request.session.pop('login_pending_user_id', None) request.session.pop('login_pending_backend', None) request.session.pop('login_pending_next', None) return redirect('login') show_recovery = request.method == 'POST' and bool((request.POST.get('recovery_code') or '').strip()) form = AppTOTPChallengeForm(data=request.POST or None, profile=profile) if request.method == 'POST' and form.is_valid(): auth_login(request, user, backend=backend_path or 'django.contrib.auth.backends.ModelBackend') request.session.pop('login_pending_user_id', None) request.session.pop('login_pending_backend', None) request.session.pop('login_pending_next', None) now_ts = int(timezone.now().timestamp()) request.session['auth_fresh_ts'] = now_ts request.session['last_activity_ts'] = now_ts return redirect(next_target if next_target.startswith('/') else reverse('home')) return render( request, 'workflows/auth/login.html', { 'form': form, 'next': next_target, 'login_step': 'totp', 'login_totp_user': user, 'show_recovery_code': show_recovery, }, ) @login_required def account_profile_page(request): session_secret_key = 'account_totp_pending_secret' session_codes_key = 'account_totp_recovery_codes' profile, created = UserProfile.objects.get_or_create(user=request.user) recovery_codes = request.session.pop(session_codes_key, []) pending_totp_secret = request.session.get(session_secret_key) or '' if profile.totp_enabled: pending_totp_secret = '' request.session.pop(session_secret_key, None) elif not pending_totp_secret: pending_totp_secret = generate_totp_secret() request.session[session_secret_key] = pending_totp_secret avatar_form = AccountAvatarForm(instance=profile) details_form = AccountDetailsForm(user=request.user, profile=profile) notification_preferences_form = AccountNotificationPreferencesForm(profile=profile, user=request.user) totp_enable_form = AccountTOTPEnableForm(user=request.user, secret=pending_totp_secret) totp_disable_form = AccountTOTPDisableForm(user=request.user, profile=profile) totp_regenerate_form = AccountTOTPRegenerateRecoveryCodesForm(user=request.user, profile=profile) account_edit_open = False notifications_edit_open = False totp_edit_open = False if request.method == 'POST': form_kind = (request.POST.get('account_form') or '').strip() if form_kind == 'avatar': avatar_form = AccountAvatarForm(request.POST, request.FILES, instance=profile) if avatar_form.is_valid(): avatar_form.save() messages.success(request, _('Profilbild gespeichert.')) return redirect('account_profile_page') messages.error(request, _('Profilbild konnte nicht gespeichert werden.')) elif form_kind == 'details': account_edit_open = True details_form = AccountDetailsForm(request.POST, user=request.user, profile=profile) if details_form.is_valid(): details_form.save() messages.success(request, _('Profildaten gespeichert.')) return redirect('account_profile_page') messages.error(request, _('Profildaten konnten nicht gespeichert werden.')) elif form_kind == 'notification_preferences': notifications_edit_open = True notification_preferences_form = AccountNotificationPreferencesForm(request.POST, profile=profile, user=request.user) if notification_preferences_form.is_valid(): notification_preferences_form.save() messages.success(request, _('Benachrichtigungseinstellungen gespeichert.')) return redirect('account_profile_page') messages.error(request, _('Benachrichtigungseinstellungen konnten nicht gespeichert werden.')) elif form_kind == 'totp_enable': totp_edit_open = True totp_enable_form = AccountTOTPEnableForm(request.POST, user=request.user, secret=pending_totp_secret) if totp_enable_form.is_valid(): recovery_codes = generate_recovery_codes() profile.enable_totp(pending_totp_secret, recovery_codes) request.session[session_codes_key] = recovery_codes request.session.pop(session_secret_key, None) messages.success(request, _('TOTP wurde aktiviert.')) return redirect('account_profile_page') messages.error(request, _('TOTP konnte nicht aktiviert werden.')) elif form_kind == 'totp_disable': totp_edit_open = True totp_disable_form = AccountTOTPDisableForm(request.POST, user=request.user, profile=profile) if totp_disable_form.is_valid(): profile.disable_totp() request.session.pop(session_secret_key, None) messages.success(request, _('TOTP wurde deaktiviert.')) return redirect('account_profile_page') messages.error(request, _('TOTP konnte nicht deaktiviert werden.')) elif form_kind == 'totp_regenerate_codes': totp_edit_open = True totp_regenerate_form = AccountTOTPRegenerateRecoveryCodesForm(request.POST, user=request.user, profile=profile) if totp_regenerate_form.is_valid(): recovery_codes = generate_recovery_codes() profile.set_recovery_codes(recovery_codes) profile.save(update_fields=['totp_recovery_codes', 'updated_at']) request.session[session_codes_key] = recovery_codes messages.success(request, _('Recovery-Codes wurden neu erzeugt.')) return redirect('account_profile_page') messages.error(request, _('Recovery-Codes konnten nicht neu erzeugt werden.')) branding_context = get_branding_email_copy() totp_account_name = (request.user.email or request.user.username or '').strip() totp_issuer = (branding_context.get('portal_title') or branding_context.get('company_name') or 'Workdock').strip() totp_otpauth_uri = '' if profile.totp_enabled else build_otpauth_uri(pending_totp_secret, account_name=totp_account_name, issuer=totp_issuer) totp_qr_svg = '' if totp_otpauth_uri: try: import qrcode import qrcode.image.svg qr_image = qrcode.make(totp_otpauth_uri, image_factory=qrcode.image.svg.SvgPathImage) stream = BytesIO() qr_image.save(stream) totp_qr_svg = stream.getvalue().decode('utf-8') except Exception: totp_qr_svg = '' return render( request, 'workflows/account_profile.html', { 'account_user': request.user, 'account_user_profile': profile, 'avatar_form': avatar_form, 'details_form': details_form, 'notification_preferences_form': notification_preferences_form, 'notification_preference_groups': notification_preferences_form.grouped_fields(), 'totp_enable_form': totp_enable_form, 'totp_disable_form': totp_disable_form, 'totp_regenerate_form': totp_regenerate_form, 'account_edit_open': account_edit_open, 'notifications_edit_open': notifications_edit_open, 'totp_edit_open': totp_edit_open, 'role_label': get_user_role_label(request.user), 'totp_pending_secret': pending_totp_secret, 'totp_otpauth_uri': totp_otpauth_uri, 'totp_qr_svg': totp_qr_svg, 'totp_recovery_codes': recovery_codes, }, ) def _require_capability(capability: str): def decorator(view_func): @wraps(view_func) @login_required def wrapped(request, *args, **kwargs): if not user_has_capability(request.user, capability): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('home') return view_func(request, *args, **kwargs) return wrapped return decorator def _display_user_name(user) -> str: first_name = (getattr(user, 'first_name', '') or '').strip() last_name = (getattr(user, 'last_name', '') or '').strip() full_name = f'{first_name} {last_name}'.strip() if full_name: return full_name username = (getattr(user, 'username', '') or '').strip() if username: return username return (getattr(user, 'email', '') or '').strip() def _audit( request, action: str, *, target_type: str = '', target_id: int | None = None, target_label: str = '', details: dict | None = None, ) -> None: if not getattr(request, 'user', None) or not request.user.is_authenticated: return AdminAuditLog.objects.create( actor=request.user, actor_display=_display_user_name(request.user), action=action, target_type=target_type, target_id=target_id, target_label=target_label, details=details or {}, ) def _form_field_labels(form_type: str) -> dict[str, str]: if form_type == 'onboarding': return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()} if form_type == 'offboarding': return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()} return {} def _request_target_label(obj, kind: str | None = None) -> str: request_kind = (kind or '').strip() if not request_kind: request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding' name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}' email = (getattr(obj, 'work_email', '') or '').strip() created_at = getattr(obj, 'created_at', None) date_label = created_at.strftime('%Y-%m-%d') if created_at else '' parts = [request_kind.capitalize(), name] if email: parts.append(f'<{email}>') if date_label: parts.append(date_label) return ' | '.join(parts) def _request_status_label(status_key: str, language_code: str | None = None) -> str: lang = ((language_code or 'de').split('-')[0] or 'de').lower() with override(lang): labels = { 'submitted': _('Eingereicht'), 'processing': _('In Bearbeitung'), 'completed': _('Abgeschlossen'), 'failed': _('Fehlgeschlagen'), } return labels.get(status_key, status_key) def _request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]: form_type = 'onboarding' if kind == 'onboarding' else 'offboarding' language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower() values = getattr(obj, 'custom_field_values', {}) or {} rows = [] yes_label = 'Ja' if language_code == 'de' else 'Yes' for cfg in get_custom_field_configs(form_type, include_inactive=True): raw_value = values.get(cfg.field_key) if raw_value in (None, '', False, []): continue if isinstance(raw_value, bool): display_value = str(yes_label) if raw_value else '' elif isinstance(raw_value, list): display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip()) else: display_value = str(raw_value).strip() if not display_value: continue rows.append( { 'label': cfg.translated_label(language_code), 'value': display_value, 'section': cfg.section_key, 'sort_order': cfg.sort_order, } ) rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label'])) return rows def _audit_action_label(action: str) -> str: labels = { 'requests_deleted': _('Vorgänge gelöscht'), 'request_deleted': _('Vorgang gelöscht'), 'request_retried': _('Vorgang erneut angestoßen'), 'intro_pdf_generated': _('Einweisungs-PDF erzeugt'), 'intro_live_pdf_generated': _('Live-Protokoll erzeugt'), 'intro_session_reset': _('Einweisung zurückgesetzt'), 'intro_session_saved': _('Einweisung als Entwurf gespeichert'), 'intro_session_completed': _('Einweisung abgeschlossen'), 'form_option_deleted': _('Formularoption gelöscht'), 'form_options_saved': _('Formularoptionen gespeichert'), 'form_field_texts_saved': _('Feldtexte gespeichert'), 'form_layout_saved': _('Formularlayout gespeichert'), 'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'), 'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'), 'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'), 'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'), 'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'), 'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'), 'welcome_email_paused': _('Welcome E-Mail pausiert'), 'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'), 'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'), 'smtp_test_sent': _('SMTP-Test gesendet'), 'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'), 'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'), 'email_mode_toggled': _('E-Mail-Modus umgeschaltet'), 'integrations_saved': _('Integrationen gespeichert'), 'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'), 'mail_settings_saved': _('Mail-Einstellungen gespeichert'), 'email_routing_saved': _('E-Mail-Routing gespeichert'), 'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'), 'user_created': _('Benutzer erstellt'), 'user_updated': _('Benutzer aktualisiert'), 'user_password_reset_sent': _('Passwort-Reset-Link versendet'), 'user_deleted': _('Benutzer gelöscht'), 'backup_created': _('Backup erstellt'), 'backup_verified': _('Backup verifiziert'), 'backup_deleted': _('Backup gelöscht'), 'backup_settings_saved': _('Backup-Einstellungen gespeichert'), 'portal_app_registry_saved': _('App-Registry gespeichert'), } return labels.get(action, action.replace('_', ' ').strip().capitalize()) def _translate_choice_list(choices): return [(value, str(label)) for value, label in choices] def _build_onboarding_layout(form) -> list[dict]: ordered_names = list(form.fields.keys()) group_by_field = {} for group_id, group_fields in ONBOARDING_GROUPS.items(): for name in group_fields: group_by_field[name] = group_id conditional_target_keys = _active_conditional_target_keys('onboarding') rendered_groups = set() consumed = set() blocks = [] for field_name in ordered_names: if field_name in consumed: continue group_id = group_by_field.get(field_name) if group_id: if group_id in rendered_groups: continue group_fields = [ form[name] for name in ONBOARDING_GROUPS[group_id] if name in form.fields ] if not group_fields: continue blocks.append( { 'kind': 'group', 'id': group_id, 'hidden_default': group_id in conditional_target_keys, 'fields': group_fields, } ) rendered_groups.add(group_id) consumed.update([f.name for f in group_fields]) continue if field_name.startswith('custom__') and field_name in conditional_target_keys: blocks.append( { 'kind': 'group', 'id': field_name, 'hidden_default': True, 'fields': [form[field_name]], } ) consumed.add(field_name) continue blocks.append({'kind': 'field', 'field': form[field_name]}) consumed.add(field_name) return blocks def _section_for_block(block: dict, field_pages: dict[str, str]) -> str: if block['kind'] == 'field': return field_pages.get(block['field'].name, 'abschluss') fields = block.get('fields') or [] if not fields: return 'abschluss' return field_pages.get(fields[0].name, 'abschluss') def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]: section_defs = get_section_definitions('onboarding') section_order = [item['key'] for item in section_defs] section_titles = {item['key']: item['title'] for item in section_defs} grouped = {key: [] for key in section_order} for block in blocks: section_key = _section_for_block(block, field_pages) if section_key not in grouped: section_key = 'abschluss' grouped[section_key].append(block) visible_keys = visible_section_keys or set(section_order) sections = [] custom_section_keys = {item['key'] for item in section_defs if item.get('is_custom')} for key in section_order: if key not in visible_keys: continue blocks_for_section = grouped[key] has_custom_checkbox_fields = False for block in blocks_for_section: candidate_fields = [block['field']] if block['kind'] == 'field' else (block.get('fields') or []) for bound_field in candidate_fields: widget_type = getattr(getattr(bound_field.field, 'widget', None), 'input_type', '') if bound_field.name.startswith('custom__') and widget_type == 'checkbox': has_custom_checkbox_fields = True break if has_custom_checkbox_fields: break sections.append( { 'key': key, 'title': section_titles.get(key, ONBOARDING_SECTION_META.get(key, {}).get('title', key)), 'subtitle': ONBOARDING_SECTION_META.get(key, {}).get('subtitle', ''), 'blocks': blocks_for_section, 'is_custom': key in custom_section_keys, 'has_custom_checkbox_fields': has_custom_checkbox_fields, } ) return sections OFFBOARDING_SECTION_META = { 'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')}, 'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')}, 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')}, } def _build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]: field_pages = getattr(form, '_field_page_keys', {}) grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER} for field_name in form.fields.keys(): section_key = field_pages.get(field_name, 'abschluss') if section_key not in grouped: section_key = 'abschluss' grouped[section_key].append(form[field_name]) visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER) return [ { 'key': key, 'title': OFFBOARDING_SECTION_META[key]['title'], 'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'], 'fields': grouped[key], } for key in OFFBOARDING_PAGE_ORDER if key in visible_keys and grouped[key] ] def _ops_summary_for_user(user) -> dict[str, object]: can_view_jobs = user_has_capability(user, 'view_job_monitor') can_manage_backups = user_has_capability(user, 'manage_backups') summary: dict[str, object] = { 'show': can_view_jobs or can_manage_backups, 'can_view_jobs': can_view_jobs, 'can_manage_backups': can_manage_backups, 'failed_count_24h': 0, 'started_count_24h': 0, 'success_count_24h': 0, 'recent_failed_logs': [], 'backup_health': latest_backup_health_snapshot() if can_manage_backups else None, } if not can_view_jobs: return summary since = timezone.now() - timedelta(hours=24) logs = AsyncTaskLog.objects.filter(started_at__gte=since) counts = { row['status']: row['count'] for row in logs.values('status').annotate(count=Count('id')) } summary['failed_count_24h'] = counts.get('failed', 0) summary['started_count_24h'] = counts.get('started', 0) summary['success_count_24h'] = counts.get('succeeded', 0) summary['recent_failed_logs'] = list( AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5] ) return summary @login_required def home(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') role_key = get_user_role_key(request.user) return render( request, 'workflows/home.html', { 'nextcloud_enabled': is_nextcloud_enabled(), 'email_test_mode': is_email_test_mode(), 'workflow_config': config, 'role_label': get_user_role_label(request.user), 'role_key': role_key, 'portal_app_sections': build_portal_app_sections(request.user), }, ) @_require_capability('manage_app_registry') def portal_app_registry_page(request): return render( request, 'workflows/app_registry.html', { 'rows': get_portal_app_registry_rows(), 'section_choices': _translate_choice_list(PortalAppConfig.SECTION_CHOICES), }, ) @_require_capability('view_job_monitor') def job_monitor_page(request): status_filter = (request.GET.get('status') or '').strip() task_filter = (request.GET.get('task') or '').strip() logs = AsyncTaskLog.objects.all() if status_filter: logs = logs.filter(status=status_filter) if task_filter: logs = logs.filter(task_name=task_filter) logs = logs.order_by('-started_at', '-id')[:200] task_names = list(AsyncTaskLog.objects.order_by('task_name').values_list('task_name', flat=True).distinct()) since = timezone.now() - timedelta(hours=24) recent_logs = AsyncTaskLog.objects.filter(started_at__gte=since) counts = { row['status']: row['count'] for row in recent_logs.values('status').annotate(count=Count('id')) } recent_failed = list(AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5]) can_manage_backups = user_has_capability(request.user, 'manage_backups') return render( request, 'workflows/job_monitor.html', { 'logs': logs, 'status_filter': status_filter, 'task_filter': task_filter, 'task_names': task_names, 'status_choices': [('started', _('Gestartet')), ('succeeded', _('Erfolgreich')), ('failed', _('Fehlgeschlagen'))], 'job_summary': { 'started_count_24h': counts.get('started', 0), 'success_count_24h': counts.get('succeeded', 0), 'failed_count_24h': counts.get('failed', 0), 'recent_failed': recent_failed, 'can_manage_backups': can_manage_backups, 'backup_health': latest_backup_health_snapshot() if can_manage_backups else None, }, }, ) @_require_capability('manage_app_registry') @require_POST def save_portal_app_registry(request): rows = get_portal_app_registry_rows() updated_configs = [] for row in rows: config = row['config'] key = config.key config.section = (request.POST.get(f'section__{key}') or config.section).strip() if config.section not in dict(PortalAppConfig.SECTION_CHOICES): config.section = row['default_section'] config.is_enabled = request.POST.get(f'is_enabled__{key}') == 'on' config.visible_to_super_admin = request.POST.get(f'visible_to_super_admin__{key}') == 'on' config.visible_to_admin = request.POST.get(f'visible_to_admin__{key}') == 'on' config.visible_to_it_staff = request.POST.get(f'visible_to_it_staff__{key}') == 'on' config.visible_to_staff = request.POST.get(f'visible_to_staff__{key}') == 'on' try: config.sort_order = int((request.POST.get(f'sort_order__{key}') or '').strip() or row['default_sort_order']) except ValueError: config.sort_order = row['default_sort_order'] config.title_override = (request.POST.get(f'title_override__{key}') or '').strip() config.title_override_en = (request.POST.get(f'title_override_en__{key}') or '').strip() config.description_override = (request.POST.get(f'description_override__{key}') or '').strip() config.description_override_en = (request.POST.get(f'description_override_en__{key}') or '').strip() config.action_label_override = (request.POST.get(f'action_label_override__{key}') or '').strip() config.action_label_override_en = (request.POST.get(f'action_label_override_en__{key}') or '').strip() config.save() updated_configs.append(config) normalize_portal_app_sort_orders() _audit( request, 'portal_app_registry_saved', target_type='portal_app_registry', target_label='Portal App Registry', details={'updated_apps': len(rows)}, ) messages.success(request, _('App-Registry gespeichert.')) return redirect('portal_app_registry_page') def _user_management_rows(): user_model = get_user_model() role_order = { ROLE_PLATFORM_OWNER: 0, ROLE_SUPER_ADMIN: 0, 'admin': 1, 'it_staff': 2, 'staff': 3, } rows = [] for user in user_model.objects.all().order_by('-is_active', 'username'): role_key = get_user_role_key(user) rows.append( { 'user': user, 'role_key': role_key, 'role_label': str(ROLE_LABELS[role_key]), 'role_sort': role_order.get(role_key, 99), 'display_name': _display_user_name(user), } ) rows.sort(key=lambda item: (not item['user'].is_active, item['role_sort'], item['user'].username.lower())) return rows def _render_user_management(request, create_form=None, status_code: int = 200): recent_user_events = list( AdminAuditLog.objects.select_related('actor') .filter(action__in=['user_created', 'user_updated', 'user_password_reset_sent', 'user_deleted']) .order_by('-created_at', '-id')[:12] ) for row in recent_user_events: row.action_label = _audit_action_label(row.action) role_key = (row.details or {}).get('role') row.role_label = str(ROLE_LABELS[role_key]) if role_key in ROLE_LABELS else role_key include_product_owner = get_user_role_key(request.user) == ROLE_PLATFORM_OWNER return render( request, 'workflows/user_management.html', { 'create_form': create_form or UserManagementCreateForm(include_product_owner=include_product_owner), 'rows': _user_management_rows(), 'role_choices': [ (key, str(ROLE_LABELS[key])) for key in ROLE_GROUP_NAMES if include_product_owner or key != ROLE_PLATFORM_OWNER ], 'include_product_owner': include_product_owner, 'recent_user_events': recent_user_events, }, status=status_code, ) def _platform_owner_user_count() -> int: user_model = get_user_model() return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_PLATFORM_OWNER and user.is_active) def _super_admin_user_count() -> int: user_model = get_user_model() return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_SUPER_ADMIN and user.is_active) def _would_remove_last_super_admin(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool: if get_user_role_key(user) != ROLE_SUPER_ADMIN or not user.is_active: return False if _super_admin_user_count() > 1: return False if deleting: return True if new_role_key is not None and new_role_key != ROLE_SUPER_ADMIN: return True if new_is_active is not None and not new_is_active: return True return False def _would_remove_last_platform_owner(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool: if get_user_role_key(user) != ROLE_PLATFORM_OWNER or not user.is_active: return False if _platform_owner_user_count() > 1: return False if deleting: return True if new_role_key is not None and new_role_key != ROLE_PLATFORM_OWNER: return True if new_is_active is not None and not new_is_active: return True return False def _send_user_access_email(request, target_user, *, invitation: bool) -> None: email = (target_user.email or '').strip() if not email: raise ValueError(_('Für diesen Benutzer ist keine E-Mail-Adresse hinterlegt.')) uid = urlsafe_base64_encode(force_bytes(target_user.pk)) token = default_token_generator.make_token(target_user) reset_path = reverse('password_reset_confirm', kwargs={'uidb64': uid, 'token': token}) reset_url = request.build_absolute_uri(reset_path) branding_copy = get_branding_email_copy() if invitation: subject = _('Zugangseinladung für %(username)s') % {'username': target_user.username} body = _( 'Hallo %(name)s,\n\n' 'für Sie wurde ein Benutzerkonto im %(portal_title)s angelegt.\n' 'Bitte öffnen Sie den folgenden Link, um Ihr Passwort zu setzen:\n' '%(url)s\n\n' 'Wenn Sie diese Einladung nicht erwartet haben, melden Sie sich bitte bei Ihrem Administrator.' ) % { 'name': _display_user_name(target_user), 'portal_title': branding_copy['portal_title'], 'url': reset_url, } else: subject = _('Passwort zurücksetzen für %(username)s') % {'username': target_user.username} body = _( 'Hallo %(name)s,\n\n' 'für Ihr Konto wurde ein Link zum Zurücksetzen des Passworts erstellt.\n' 'Bitte öffnen Sie den folgenden Link:\n' '%(url)s\n\n' 'Wenn Sie diese Anfrage nicht erwartet haben, können Sie diese E-Mail ignorieren.' ) % { 'name': _display_user_name(target_user), 'url': reset_url, } send_system_email(subject=subject, body=body, to=[email]) @_require_capability('manage_users') def user_management_page(request): return _render_user_management(request) @_require_capability('manage_product_branding') def portal_branding_page(request): branding, created = PortalBranding.objects.get_or_create(name='Default') form = PortalBrandingForm(instance=branding) return render( request, 'workflows/branding_settings.html', { 'form': form, 'branding': branding, 'branding_sections': _build_branding_sections(form, branding), 'editing_branding_section': '', }, ) @_require_capability('manage_product_branding') @require_POST def save_portal_branding(request): branding, created = PortalBranding.objects.get_or_create(name='Default') section_key = (request.POST.get('section_key') or '').strip() data = request.POST.copy() for field_name in PortalBrandingForm.Meta.fields: if field_name not in data: field = PortalBranding._meta.get_field(field_name) if getattr(field, 'many_to_many', False): continue if getattr(field, 'null', False) and getattr(branding, field_name, None) is None: data[field_name] = '' else: data[field_name] = getattr(branding, field_name, '') or '' form = PortalBrandingForm(data, request.FILES, instance=branding) if not form.is_valid(): messages.error(request, _('Branding konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/branding_settings.html', { 'form': form, 'branding': branding, 'branding_sections': _build_branding_sections(form, branding), 'editing_branding_section': section_key, }, status=400, ) branding = form.save() _audit( request, 'portal_branding_saved', target_type='portal_branding', target_id=branding.id, target_label=branding.portal_title, details={ 'company_name': branding.company_name, 'support_email': branding.support_email, 'default_language': branding.default_language, 'has_custom_logo': bool(branding.logo_image), 'has_custom_letterhead': bool(branding.pdf_letterhead), }, ) messages.success(request, _('Portal-Branding wurde gespeichert.')) return render( request, 'workflows/branding_settings.html', { 'form': PortalBrandingForm(instance=branding), 'branding': branding, 'branding_sections': _build_branding_sections(PortalBrandingForm(instance=branding), branding), 'editing_branding_section': '', }, ) def _build_branding_sections(form, branding): sections = [ { 'key': 'identity', 'title': _('Identität'), 'subtitle': _('Titel, Firmenname und zentrale Spracheinstellungen.'), 'fields': ['portal_title', 'company_name', 'company_domain', 'default_language', 'login_subtitle'], 'field_full': {'login_subtitle'}, 'hint_map': { 'company_domain': _('Wird für E-Mail-Vorschläge und Domain-bezogene Standardtexte verwendet, z. B. workdock.de.'), }, }, { 'key': 'appearance', 'title': _('Farben & Erscheinungsbild'), 'subtitle': _('Zentrale visuelle Markenwerte und Browser-Icon.'), 'fields': ['primary_color', 'secondary_color', 'logo_image', 'favicon_image'], 'field_full': set(), 'hint_map': { 'logo_image': _('Erlaubte Formate: SVG, PNG, JPG, JPEG, WEBP. Maximal 5 MB.'), 'favicon_image': _('Erlaubte Formate: ICO, PNG, SVG, WEBP. Maximal 2 MB.'), }, }, { 'key': 'communication', 'title': _('Kommunikation'), 'subtitle': _('Absender, Support und PDF-Branding für ausgehende Kommunikation.'), 'fields': ['support_email', 'sender_display_name', 'pdf_letterhead'], 'field_full': {'pdf_letterhead'}, 'hint_map': { 'sender_display_name': _('Wird für ausgehende System-E-Mails als Anzeigename verwendet.'), 'pdf_letterhead': _('Erlaubtes Format: PDF. Maximal 10 MB.'), }, }, { 'key': 'legal', 'title': _('Footer & Rechtliches'), 'subtitle': _('Gemeinsame Footer-Texte und rechtliche Hinweise für die Shell.'), 'fields': ['footer_text', 'legal_notice', 'footer_text_en', 'legal_notice_en'], 'field_full': {'legal_notice', 'legal_notice_en'}, 'hint_map': {}, }, ] for section in sections: rows = [] for field_name in section['fields']: field = form[field_name] value = getattr(branding, field_name, '') or '' is_file = bool(getattr(field.field.widget, 'input_type', '') == 'file') rows.append( { 'name': field_name, 'bound_field': field, 'label': field.label, 'value': value, 'is_file': is_file, 'is_full': field_name in section.get('field_full', set()), 'hint': section.get('hint_map', {}).get(field_name, ''), } ) section['rows'] = rows return sections @_require_capability('manage_company_config') def portal_company_config_page(request): company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default') form = PortalCompanyConfigForm(instance=company_config) return render( request, 'workflows/company_config.html', { 'form': form, 'company_config': company_config, 'company_config_sections': _build_company_config_sections(form, company_config), 'editing_company_section': '', }, ) @_require_capability('manage_company_config') @require_POST def save_portal_company_config(request): company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default') section_key = (request.POST.get('section_key') or '').strip() data = request.POST.copy() for field_name in PortalCompanyConfigForm.Meta.fields: if field_name not in data: data[field_name] = getattr(company_config, field_name, '') or '' form = PortalCompanyConfigForm(data, instance=company_config) if not form.is_valid(): messages.error(request, _('Firmenkonfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/company_config.html', { 'form': form, 'company_config': company_config, 'company_config_sections': _build_company_config_sections(form, company_config), 'editing_company_section': section_key, }, status=400, ) company_config = form.save() _audit( request, 'portal_company_config_saved', target_type='portal_company_config', target_id=company_config.id, target_label=company_config.legal_company_name or 'Default', details={ 'website_url': company_config.website_url, 'imprint_url': company_config.imprint_url, 'privacy_url': company_config.privacy_url, 'hr_contact_email': company_config.hr_contact_email, 'it_contact_email': company_config.it_contact_email, 'operations_contact_email': company_config.operations_contact_email, }, ) messages.success(request, _('Firmenkonfiguration wurde gespeichert.')) return render( request, 'workflows/company_config.html', { 'form': PortalCompanyConfigForm(instance=company_config), 'company_config': company_config, 'company_config_sections': _build_company_config_sections(PortalCompanyConfigForm(instance=company_config), company_config), 'editing_company_section': '', }, ) def _build_company_config_sections(form, company_config): sections = [ { 'key': 'profile', 'title': _('Firmenprofil'), 'subtitle': _('Rechtlicher Name und zentrale Stammdaten der Firma.'), 'fields': ['legal_company_name', 'phone_number', 'website_url', 'country'], }, { 'key': 'address', 'title': _('Adresse & Register'), 'subtitle': _('Anschrift sowie optionale Register- und Steuerangaben.'), 'fields': ['street_address', 'postal_code', 'city', 'registration_number', 'vat_id'], }, { 'key': 'contacts', 'title': _('Kontaktpunkte'), 'subtitle': _('Zentrale Ansprechpartner für HR, IT und Operations.'), 'fields': ['hr_contact_email', 'it_contact_email', 'operations_contact_email'], }, { 'key': 'public', 'title': _('Recht & Öffentlichkeit'), 'subtitle': _('Öffentliche Links für Website, Impressum und Datenschutz.'), 'fields': ['imprint_url', 'privacy_url'], 'hint': _('Diese Links können später im Portal-Footer oder in öffentlichen Seiten verwendet werden.'), }, ] for section in sections: rows = [] for field_name in section['fields']: field = form[field_name] rows.append( { 'name': field_name, 'bound_field': field, 'label': field.label, 'value': getattr(company_config, field_name, '') or '', } ) section['rows'] = rows return sections @_require_capability('manage_trial_lifecycle') def portal_trial_config_page(request): trial_config = get_portal_trial_config() form = PortalTrialConfigForm(instance=trial_config) return render( request, 'workflows/trial_management.html', { 'form': form, 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, ) @_require_capability('manage_trial_lifecycle') @require_POST def save_portal_trial_config(request): trial_config = get_portal_trial_config() form = PortalTrialConfigForm(request.POST, instance=trial_config) if not form.is_valid(): messages.error(request, _('Trial-Konfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/trial_management.html', { 'form': form, 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, status=400, ) trial_config = form.save() _audit( request, 'portal_trial_config_saved', target_type='portal_trial_config', target_id=trial_config.id, target_label='Default', details={ 'is_trial_mode': trial_config.is_trial_mode, 'trial_started_at': trial_config.trial_started_at.isoformat() if trial_config.trial_started_at else '', 'trial_expires_at': trial_config.trial_expires_at.isoformat() if trial_config.trial_expires_at else '', 'restrict_production_integrations': trial_config.restrict_production_integrations, 'auto_cleanup_enabled': trial_config.auto_cleanup_enabled, }, ) if trial_config.is_trial_mode and trial_config.trial_expires_at: remaining = trial_config.trial_expires_at - timezone.now() if remaining.total_seconds() <= 0: notify_user( user=request.user, title=_('Trial ist abgelaufen'), body=_('Der Trial-Zeitraum ist überschritten. Nicht-Platform-Owner werden jetzt blockiert.'), level=UserNotification.LEVEL_WARNING, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) elif remaining <= timedelta(days=7): notify_user( user=request.user, title=_('Trial läuft bald ab'), body=_('Der Trial endet am %(date)s.') % {'date': timezone.localtime(trial_config.trial_expires_at).strftime('%d.%m.%Y %H:%M')}, level=UserNotification.LEVEL_WARNING, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) elif not trial_config.is_trial_mode: notify_user( user=request.user, title=_('Trial-Modus deaktiviert'), body=_('Der Trial-Modus wurde ausgeschaltet.'), level=UserNotification.LEVEL_INFO, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) messages.success(request, _('Trial-Konfiguration wurde gespeichert.')) return render( request, 'workflows/trial_management.html', { 'form': PortalTrialConfigForm(instance=trial_config), 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, ) @_require_capability('manage_users') @require_POST def create_user_from_admin(request): form = UserManagementCreateForm(request.POST, include_product_owner=(get_user_role_key(request.user) == ROLE_PLATFORM_OWNER)) if not form.is_valid(): messages.error(request, _('Benutzer konnte nicht erstellt werden. Bitte prüfen Sie die Eingaben.')) return _render_user_management(request, create_form=form, status_code=400) user = form.save() _send_user_access_email(request, user, invitation=True) _audit( request, 'user_created', target_type='user', target_id=user.id, target_label=_display_user_name(user), details={'username': user.username, 'role': get_user_role_key(user), 'invitation_sent': True}, ) messages.success(request, _('Benutzer wurde erstellt und eingeladen: %(username)s') % {'username': user.username}) return redirect('user_management_page') @_require_capability('manage_users') @require_POST def update_user_from_admin(request, user_id: int): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) role_key = (request.POST.get('role_key') or '').strip() is_active = request.POST.get('is_active') == 'on' new_password = (request.POST.get('new_password') or '').strip() if role_key not in ROLE_GROUP_NAMES: messages.error(request, _('Ungültige Rolle.')) return redirect('user_management_page') if role_key == ROLE_PLATFORM_OWNER and get_user_role_key(request.user) != ROLE_PLATFORM_OWNER: messages.error(request, _('Nur Platform Owner dürfen diese Rolle vergeben.')) return redirect('user_management_page') current_role = get_user_role_key(request.user) if target_user == request.user and current_role == ROLE_PLATFORM_OWNER and (role_key != ROLE_PLATFORM_OWNER or not is_active): messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst sperren oder herabstufen.')) return redirect('user_management_page') if target_user == request.user and current_role == ROLE_SUPER_ADMIN and (role_key != ROLE_SUPER_ADMIN or not is_active): messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst sperren oder herabstufen.')) return redirect('user_management_page') if _would_remove_last_platform_owner(target_user, new_role_key=role_key, new_is_active=is_active): messages.error(request, _('Der letzte aktive Platform Owner kann nicht deaktiviert oder herabgestuft werden.')) return redirect('user_management_page') if _would_remove_last_super_admin(target_user, new_role_key=role_key, new_is_active=is_active): messages.error(request, _('Der letzte aktive Super Admin kann nicht deaktiviert oder herabgestuft werden.')) return redirect('user_management_page') assign_user_role(target_user, role_key) target_user.is_active = is_active if new_password: target_user.set_password(new_password) target_user.save() _audit( request, 'user_updated', target_type='user', target_id=target_user.id, target_label=_display_user_name(target_user), details={'username': target_user.username, 'role': role_key, 'is_active': is_active, 'password_changed': bool(new_password)}, ) messages.success(request, _('Benutzer wurde aktualisiert: %(username)s') % {'username': target_user.username}) return redirect('user_management_page') @_require_capability('manage_users') @require_POST def send_password_reset_from_admin(request, user_id: int): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) try: _send_user_access_email(request, target_user, invitation=False) except ValueError as exc: messages.error(request, str(exc)) return redirect('user_management_page') _audit( request, 'user_password_reset_sent', target_type='user', target_id=target_user.id, target_label=_display_user_name(target_user), details={'username': target_user.username, 'email': target_user.email}, ) messages.success(request, _('Passwort-Reset-Link wurde versendet: %(username)s') % {'username': target_user.username}) return redirect('user_management_page') @_require_capability('manage_users') @require_POST def delete_user_from_admin(request, user_id: int): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) current_role = get_user_role_key(request.user) if target_user == request.user and current_role == ROLE_PLATFORM_OWNER: messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst löschen.')) return redirect('user_management_page') if target_user == request.user: messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst löschen.')) return redirect('user_management_page') if _would_remove_last_platform_owner(target_user, deleting=True): messages.error(request, _('Der letzte aktive Platform Owner kann nicht gelöscht werden.')) return redirect('user_management_page') if _would_remove_last_super_admin(target_user, deleting=True): messages.error(request, _('Der letzte aktive Super Admin kann nicht gelöscht werden.')) return redirect('user_management_page') target_label = _display_user_name(target_user) username = target_user.username target_user.delete() _audit( request, 'user_deleted', target_type='user', target_label=target_label, details={'username': username}, ) messages.success(request, _('Benutzer wurde gelöscht: %(username)s') % {'username': username}) return redirect('user_management_page') @_require_capability('view_docs') def handbook_page(request): return render(request, 'workflows/handbook.html') @_require_capability('view_docs') def project_wiki_page(request): return render(request, 'workflows/project_wiki.html') @_require_capability('view_docs') def developer_handbook_page(request): return render(request, 'workflows/developer_handbook.html') @_require_capability('view_docs') def release_checklist_page(request): return render(request, 'workflows/release_checklist.html') @_require_capability('view_audit_log') def audit_log_page(request): action = (request.GET.get('action') or '').strip() user_query = (request.GET.get('user') or '').strip() date_from = (request.GET.get('date_from') or '').strip() date_to = (request.GET.get('date_to') or '').strip() rows_qs = AdminAuditLog.objects.select_related('actor').all() if action: rows_qs = rows_qs.filter(action=action) if user_query: rows_qs = rows_qs.filter( Q(actor_display__icontains=user_query) | Q(actor__username__icontains=user_query) | Q(actor__email__icontains=user_query) ) if date_from: rows_qs = rows_qs.filter(created_at__date__gte=date_from) if date_to: rows_qs = rows_qs.filter(created_at__date__lte=date_to) rows = list(rows_qs[:300]) action_choices = ( AdminAuditLog.objects.order_by('action').values_list('action', flat=True).distinct() ) return render( request, 'workflows/audit_log.html', { 'rows': rows, 'action_choices': action_choices, 'selected_action': action, 'user_query': user_query, 'date_from': date_from, 'date_to': date_to, }, ) @_require_capability('manage_backups') def backup_recovery_page(request): rows = list_backup_bundles() return render( request, 'workflows/backup_recovery.html', { 'rows': rows, 'backup_health': latest_backup_health_snapshot(), }, ) @_require_capability('manage_backups') @require_POST def create_backup_from_admin(request): try: result = create_backup_bundle() _audit( request, 'backup_created', target_type='backup_bundle', target_label=result['name'], details={'path': result['path']}, ) notify_user( user=request.user, title=_('Backup erstellt: %(name)s') % {'name': result['name']}, body=_('Das Backup-Bundle wurde erfolgreich erstellt.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_SUCCESS, ) messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']}) except Exception as exc: notify_user( user=request.user, title=_('Backup fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_FAILURE, ) messages.error(request, _('Backup konnte nicht erstellt werden: %(error)s') % {'error': exc}) return redirect('backup_recovery_page') @_require_capability('manage_backups') @require_POST def verify_backup_from_admin(request, backup_name: str): try: result = verify_backup_bundle(backup_name) _audit( request, 'backup_verified', target_type='backup_bundle', target_label=backup_name, details={'summary': result['summary']}, ) notify_user( user=request.user, title=_('Backup verifiziert: %(name)s') % {'name': result['name']}, body=result.get('summary') or _('Das Backup wurde erfolgreich verifiziert.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_SUCCESS, ) messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']}) except Exception as exc: notify_user( user=request.user, title=_('Backup-Verifikation fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_FAILURE, ) messages.error(request, _('Backup-Verifikation fehlgeschlagen: %(error)s') % {'error': exc}) return redirect('backup_recovery_page') @_require_capability('manage_backups') @require_POST def delete_backup_from_admin(request, backup_name: str): try: result = delete_backup_bundle(backup_name) _audit( request, 'backup_deleted', target_type='backup_bundle', target_label=backup_name, details={}, ) messages.success(request, _('Backup wurde gelöscht: %(name)s') % {'name': result['name']}) except Exception as exc: messages.error(request, _('Backup konnte nicht gelöscht werden: %(error)s') % {'error': exc}) return redirect('backup_recovery_page') @_require_capability('view_request_timeline') def request_timeline_page(request, kind: str, request_id: int): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') request_label = _request_target_label(obj, kind) custom_field_details = _request_custom_field_details(obj, kind, getattr(request, 'LANGUAGE_CODE', None)) audit_rows = list( AdminAuditLog.objects.select_related('actor') .filter(target_type__in=[kind, 'request']) .filter(Q(target_id=request_id) | Q(target_label__icontains=(obj.full_name or '').strip())) .order_by('-created_at', '-id')[:200] ) timeline_rows = [ { 'created_at': obj.created_at, 'kind': 'system', 'title': _('Anfrage erstellt'), 'summary': request_label, 'meta': _('Status: %(status)s') % {'status': obj.get_processing_status_display()}, 'details': {item['label']: item['value'] for item in custom_field_details}, } ] contract_start = getattr(obj, 'contract_start', None) if contract_start: timeline_rows.append( { 'created_at': timezone.make_aware(timezone.datetime.combine(contract_start, timezone.datetime.min.time())), 'kind': 'milestone', 'title': _('Vertragsbeginn'), 'summary': str(contract_start), 'meta': _('Geplanter Start'), } ) handover_date = getattr(obj, 'handover_date', None) if handover_date: timeline_rows.append( { 'created_at': timezone.make_aware(timezone.datetime.combine(handover_date, timezone.datetime.min.time())), 'kind': 'milestone', 'title': _('Geräteübergabe / Hardware-Abholung'), 'summary': str(handover_date), 'meta': _('Geplanter Hardware-Termin'), } ) if getattr(obj, 'generated_pdf_path', ''): timeline_rows.append( { 'created_at': obj.created_at, 'kind': 'document', 'title': _('PDF verfügbar'), 'summary': Path(obj.generated_pdf_path).name, 'meta': '', 'url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}", } ) for row in audit_rows: timeline_rows.append( { 'created_at': row.created_at, 'kind': 'audit', 'title': _audit_action_label(row.action), 'summary': row.target_label or row.target_type or '-', 'meta': row.actor_display or '-', 'details': row.details, } ) if kind == 'onboarding': intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first() if intro_session: timeline_rows.append( { 'created_at': intro_session.updated_at, 'kind': 'session', 'title': _('Einweisungssitzung'), 'summary': intro_session.get_status_display(), 'meta': intro_session.completed_by_name or '-', 'url': (f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" if intro_session.exported_pdf_path else ''), } ) welcome_email = ScheduledWelcomeEmail.objects.filter(onboarding_request=obj).first() if welcome_email: timeline_rows.append( { 'created_at': welcome_email.updated_at, 'kind': 'email', 'title': _('Welcome E-Mail'), 'summary': welcome_email.get_status_display(), 'meta': welcome_email.recipient_email, } ) timeline_rows.sort(key=lambda item: item['created_at']) return render( request, 'workflows/request_timeline.html', { 'request_kind': kind, 'request_obj': obj, 'request_label': request_label, 'timeline_rows': timeline_rows, 'custom_field_details': custom_field_details, 'contract_start': getattr(obj, 'contract_start', None), 'handover_date': getattr(obj, 'handover_date', None), }, ) @login_required def requests_dashboard(request): if not user_has_capability(request.user, 'access_requests_dashboard'): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('home') if request.method == 'POST': if not user_has_capability(request.user, 'delete_requests'): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('requests_dashboard') selected = request.POST.getlist('selected_requests') single_delete = (request.POST.get('single_delete') or '').strip() if single_delete: selected = [single_delete] if not selected: messages.warning(request, _('Keine Einträge ausgewählt.')) return redirect('requests_dashboard') deleted_count = 0 invalid_count = 0 deleted_labels = [] for token in selected: try: kind, raw_id = token.split(':', 1) request_id = int(raw_id) except (ValueError, TypeError): invalid_count += 1 continue model = None if kind == 'onboarding': model = OnboardingRequest elif kind == 'offboarding': model = OffboardingRequest else: invalid_count += 1 continue obj = model.objects.filter(id=request_id).first() if not obj: continue deleted_labels.append(_request_target_label(obj, kind)) obj.delete() deleted_count += 1 if deleted_count: _audit( request, 'requests_deleted', target_type='request', target_label='Dashboard bulk/single delete', details={ 'deleted_count': deleted_count, 'invalid_count': invalid_count, 'selected': selected, 'request_labels': deleted_labels, }, ) messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count}) if invalid_count: messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count}) if not deleted_count and not invalid_count: messages.info(request, _('Keine passenden Einträge gefunden.')) return redirect('requests_dashboard') search_query = request.GET.get('q', '').strip() type_filter = (request.GET.get('type') or '').strip().lower() status_filter = (request.GET.get('status') or '').strip().lower() department_filter = (request.GET.get('department') or '').strip() date_from = (request.GET.get('date_from') or '').strip() date_to = (request.GET.get('date_to') or '').strip() onboarding_qs = OnboardingRequest.objects.order_by('-created_at') offboarding_qs = OffboardingRequest.objects.order_by('-created_at') all_onboarding = OnboardingRequest.objects.all() all_offboarding = OffboardingRequest.objects.all() if search_query: onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query)) offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query)) if status_filter in {'submitted', 'processing', 'completed', 'failed'}: onboarding_qs = onboarding_qs.filter(processing_status=status_filter) offboarding_qs = offboarding_qs.filter(processing_status=status_filter) if department_filter: onboarding_qs = onboarding_qs.filter(department=department_filter) offboarding_qs = offboarding_qs.filter(department=department_filter) if date_from: onboarding_qs = onboarding_qs.filter(created_at__date__gte=date_from) offboarding_qs = offboarding_qs.filter(created_at__date__gte=date_from) if date_to: onboarding_qs = onboarding_qs.filter(created_at__date__lte=date_to) offboarding_qs = offboarding_qs.filter(created_at__date__lte=date_to) if type_filter == 'onboarding': offboarding_qs = offboarding_qs.none() elif type_filter == 'offboarding': onboarding_qs = onboarding_qs.none() onboarding_items = onboarding_qs[:50] offboarding_items = offboarding_qs[:50] language_code = ( request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME) or getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de' ).split('-')[0].lower() rows = [] for obj in onboarding_items: intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first() if intro_session and intro_session.exported_pdf_path: intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" rows.append( { 'id': obj.id, 'kind': 'Onboarding', 'kind_slug': 'onboarding', 'name': obj.full_name, 'work_email': obj.work_email, 'created_at': obj.created_at, 'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None, 'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None, 'intro_session': intro_session, 'status': _request_status_label(obj.processing_status, language_code), 'status_key': obj.processing_status, 'last_error': obj.last_error, } ) for obj in offboarding_items: rows.append( { 'id': obj.id, 'kind': 'Offboarding', 'kind_slug': 'offboarding', 'name': obj.full_name, 'work_email': obj.work_email, 'created_at': obj.created_at, 'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None, 'intro_pdf_url': None, 'intro_session': None, 'status': _request_status_label(obj.processing_status, language_code), 'status_key': obj.processing_status, 'last_error': obj.last_error, } ) rows.sort(key=lambda x: x['created_at'], reverse=True) today = timezone.localdate() start_date = today - timedelta(days=13) onboarding_daily = {} offboarding_daily = {} for i in range(14): day = start_date + timedelta(days=i) onboarding_daily[day] = 0 offboarding_daily[day] = 0 for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True): onboarding_daily[timezone.localtime(dt).date()] += 1 for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True): offboarding_daily[timezone.localtime(dt).date()] += 1 chart_points = [] max_total = 1 for i in range(14): day = start_date + timedelta(days=i) on_count = onboarding_daily[day] off_count = offboarding_daily[day] total = on_count + off_count max_total = max(max_total, total) chart_points.append( { 'label': day.strftime('%d.%m'), 'onboarding': on_count, 'offboarding': off_count, 'total': total, } ) for point in chart_points: point['height'] = max(8, int((point['total'] / max_total) * 84)) onboarding_total = onboarding_qs.count() offboarding_total = offboarding_qs.count() departments = sorted( { value.strip() for value in list(all_onboarding.exclude(department='').values_list('department', flat=True)) + list(all_offboarding.exclude(department='').values_list('department', flat=True)) if value and value.strip() }, key=str.lower, ) status_choices = [ {'value': 'submitted', 'label': _request_status_label('submitted', language_code)}, {'value': 'processing', 'label': _request_status_label('processing', language_code)}, {'value': 'completed', 'label': _request_status_label('completed', language_code)}, {'value': 'failed', 'label': _request_status_label('failed', language_code)}, ] has_filters = any([search_query, type_filter, status_filter, department_filter, date_from, date_to]) column_count = 4 if user_has_capability(request.user, 'delete_requests'): column_count += 1 if user_has_capability(request.user, 'run_intro_session') or user_has_capability(request.user, 'generate_intro_pdfs'): column_count += 1 if user_has_capability(request.user, 'access_requests_dashboard'): column_count += 1 return render( request, 'workflows/requests_dashboard.html', { 'rows': rows[:60], 'search_query': search_query, 'selected_type': type_filter, 'selected_status': status_filter, 'selected_department': department_filter, 'date_from': date_from, 'date_to': date_to, 'departments': departments, 'status_choices': status_choices, 'has_filters': has_filters, 'column_count': column_count, 'onboarding_total': onboarding_total, 'offboarding_total': offboarding_total, 'combined_total': onboarding_total + offboarding_total, 'chart_points': chart_points, }, ) @login_required @ensure_csrf_cookie def onboarding_create(request): config = WorkflowConfig.objects.order_by('id').first() legal_text = ( config.legal_text if config and config.legal_text else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.' ) if request.method == 'POST': form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email) if form.is_valid(): obj = form.save() obj.onboarded_by_name = _display_user_name(request.user) obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0]) obj.save(update_fields=['onboarded_by_name', 'preferred_language']) process_onboarding_request.delay(obj.id) return redirect(f"/onboarding/new/?saved=1&id={obj.id}") else: form = OnboardingRequestForm(requester_email=request.user.email) onboarding_blocks = _build_onboarding_layout(form) field_pages = getattr(form, '_field_page_keys', {}) section_configs = ensure_form_section_configs('onboarding') visible_section_keys = set() for section in get_section_definitions('onboarding'): key = section['key'] if section.get('is_custom'): if section.get('is_active', True): visible_section_keys.add(key) elif key in LOCKED_SECTION_RULES.get('onboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible: visible_section_keys.add(key) onboarding_sections = _build_onboarding_sections(onboarding_blocks, field_pages, visible_section_keys=visible_section_keys) onboarding_conditional_rules = _normalized_conditional_rule_payload('onboarding') return render( request, 'workflows/onboarding_form.html', { 'form': form, 'onboarding_blocks': onboarding_blocks, 'onboarding_sections': onboarding_sections, 'onboarding_inline_checks': ONBOARDING_INLINE_CHECKS, 'onboarding_checkbox_lists': ONBOARDING_CHECKBOX_LISTS, 'onboarding_conditional_rules': onboarding_conditional_rules, 'legal_text': legal_text, 'saved': request.GET.get('saved') == '1', 'saved_request_id': request.GET.get('id', ''), 'portal_email_domain': get_company_email_domain(), }, ) @login_required def onboarding_success(request, request_id: int): obj = get_object_or_404(OnboardingRequest, id=request_id) pdf_url = None if obj.generated_pdf_path: pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}" return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url}) @_require_capability('generate_intro_pdfs') @require_POST def generate_onboarding_intro_pdf(request, request_id: int): obj = get_object_or_404(OnboardingRequest, id=request_id) pdf_path = _generate_onboarding_intro_pdf(obj, language_code=get_language()) obj.intro_pdf_path = str(pdf_path) obj.save(update_fields=['intro_pdf_path']) _audit(request, 'intro_pdf_generated', target_type='onboarding', target_id=obj.id, target_label=obj.full_name) messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.')) return redirect('requests_dashboard') @_require_capability('generate_intro_pdfs') @require_POST def generate_onboarding_intro_session_pdf(request, request_id: int): onboarding = get_object_or_404(OnboardingRequest, id=request_id) session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding) pdf_path = _generate_onboarding_intro_session_pdf( session, admin_signature_name=_display_user_name(request.user), language_code=get_language(), ) session.exported_pdf_path = str(pdf_path) session.save(update_fields=['exported_pdf_path']) _audit(request, 'intro_live_pdf_generated', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name) messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.')) return redirect('onboarding_intro_session_page', request_id=request_id) @_require_capability('run_intro_session') def onboarding_intro_session_page(request, request_id: int): onboarding = get_object_or_404(OnboardingRequest, id=request_id) session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding) sections = build_intro_sections_for_request(onboarding, language_code=get_language()) if request.method == 'POST': checked_ids = set(request.POST.getlist('checked_items')) checklist_state = {} for section in sections: for item in section['items']: checklist_state[item['id']] = item['id'] in checked_ids action = (request.POST.get('session_action') or 'save').strip() session.checklist_state = checklist_state session.notes = (request.POST.get('notes') or '').strip() if action == 'reset': session.checklist_state = {} session.notes = '' session.status = 'draft' session.completed_at = None session.completed_by_name = '' session.exported_pdf_path = '' session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path']) _audit(request, 'intro_session_reset', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name) messages.success(request, _('Einweisung wurde zurückgesetzt.')) return redirect('onboarding_intro_session_page', request_id=request_id) if action == 'complete': session.status = 'completed' session.completed_at = timezone.now() session.completed_by_name = _display_user_name(request.user) _audit( request, 'intro_session_completed', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name, details={'checked_count': len([value for value in checklist_state.values() if value])}, ) messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.')) else: session.status = 'draft' session.completed_at = None session.completed_by_name = '' _audit( request, 'intro_session_saved', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name, details={'checked_count': len([value for value in checklist_state.values() if value])}, ) messages.success(request, _('Einweisung wurde als Entwurf gespeichert.')) session.save() return redirect('onboarding_intro_session_page', request_id=request_id) checked_map = session.checklist_state or {} checked_count = 0 total_count = 0 for section in sections: for item in section['items']: item['checked'] = bool(checked_map.get(item['id'])) total_count += 1 if item['checked']: checked_count += 1 salutation = (onboarding.get_gender_display() or '').strip() display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name progress_percent = int((checked_count / total_count) * 100) if total_count else 0 return render( request, 'workflows/onboarding_intro_session.html', { 'onboarding': onboarding, 'session': session, 'display_name': display_name, 'sections': sections, 'checked_count': checked_count, 'total_count': total_count, 'progress_percent': progress_percent, 'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None, }, ) @login_required @ensure_csrf_cookie def offboarding_create(request): profile_id = request.GET.get('profile') search_query = request.GET.get('q', '').strip() selected_profile = None if profile_id: selected_profile = EmployeeProfile.objects.filter(id=profile_id).first() search_results = [] if search_query: search_results = list( EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10] ) + list( EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10] ) # preserve order while removing duplicates seen = set() unique = [] for r in search_results: if r.id not in seen: unique.append(r) seen.add(r.id) search_results = unique[:10] if request.method == 'POST': form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile) if form.is_valid(): obj = form.save(commit=False) if selected_profile: obj.employee_profile = selected_profile requester_email = (request.user.email or '').strip().lower() company_suffix = f"@{get_company_email_domain()}" if requester_email and requester_email.endswith(company_suffix): obj.requested_by_email = requester_email else: obj.requested_by_email = settings.DEFAULT_FROM_EMAIL obj.requested_by_name = _display_user_name(request.user) obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0]) obj.save() process_offboarding_request.delay(obj.id) return redirect(f"/offboarding/new/?saved=1&id={obj.id}") else: form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query}) field_pages = getattr(form, '_field_page_keys', {}) section_configs = ensure_form_section_configs('offboarding') visible_section_keys = { key for key in OFFBOARDING_PAGE_ORDER if key in LOCKED_SECTION_RULES.get('offboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible } offboarding_sections = _build_offboarding_sections(form, visible_section_keys=visible_section_keys) return render( request, 'workflows/offboarding_form.html', { 'form': form, 'search_results': search_results, 'selected_profile': selected_profile, 'search_query': search_query, 'saved': request.GET.get('saved') == '1', 'saved_request_id': request.GET.get('id', ''), 'portal_email_domain': get_company_email_domain(), 'offboarding_sections': offboarding_sections, }, ) @login_required def offboarding_success(request, request_id: int): obj = get_object_or_404(OffboardingRequest, id=request_id) pdf_url = None if obj.generated_pdf_path: pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}" return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url}) @_require_capability('manage_builders') def form_builder_page(request): language_code = get_language() form_type = request.GET.get('form_type', 'onboarding') can_override_locked_builder_rules = get_user_role_key(request.user) == ROLE_PLATFORM_OWNER anchor = (request.GET.get('anchor') or '').strip() active_panel = (request.GET.get('panel') or '').strip() active_subpanel = (request.GET.get('subpanel') or '').strip() active_rules_panel = (request.GET.get('rules_panel') or '').strip() active_module = (request.GET.get('module') or '').strip() active_structure_section = (request.GET.get('structure_section') or '').strip() active_field_rules_section = ((request.POST.get('field_rules_section') if request.method == 'POST' else '') or request.GET.get('field_rules_section') or '').strip() active_field_texts_section = ((request.POST.get('field_texts_section') if request.method == 'POST' else '') or request.GET.get('field_texts_section') or '').strip() active_custom_fields_section = ((request.POST.get('custom_fields_section') if request.method == 'POST' else '') or request.GET.get('custom_fields_section') or '').strip() active_section_rules_section = ((request.POST.get('section_rules_section') if request.method == 'POST' else '') or request.GET.get('section_rules_section') or '').strip() active_conditional_target = ((request.POST.get('conditional_target') if request.method == 'POST' else '') or request.GET.get('conditional_target') or '').strip() if form_type not in DEFAULT_FIELD_ORDER: form_type = 'onboarding' option_category = request.GET.get('option_category', 'department') option_categories = [c[0] for c in FormOption.CATEGORY_CHOICES] if option_category not in option_categories: option_category = option_categories[0] valid_modules = { 'structure', 'section-rules', 'field-rules', 'conditional-rules', 'options', 'field-texts', 'custom-sections', 'custom-fields', 'preview', } if not active_module: if active_panel == 'builder-structure': active_module = 'structure' elif active_panel == 'builder-rules': active_module = active_rules_panel or 'section-rules' elif active_panel == 'builder-content': active_module = active_subpanel or 'options' else: active_module = 'structure' if active_module not in valid_modules: active_module = 'structure' if form_type != 'onboarding' and active_module == 'custom-sections': active_module = 'options' if form_type != 'onboarding' and active_module == 'conditional-rules': active_module = 'field-rules' if request.method == 'POST': delete_option_id = request.POST.get('delete_option_id', '').strip() delete_custom_field_id = request.POST.get('delete_custom_field_id', '').strip() delete_custom_section_id = request.POST.get('delete_custom_section_id', '').strip() if delete_option_id: option = FormOption.objects.filter(id=delete_option_id).first() if not option: messages.error(request, _('Option nicht gefunden.')) else: option_category = option.category deleted_label = option.label deleted_id = option.id option.delete() _audit(request, 'form_option_deleted', target_type='form_option', target_id=deleted_id, target_label=deleted_label) messages.success(request, _('Option wurde gelöscht.')) return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}&module=options") if delete_custom_field_id: custom_field = FormCustomFieldConfig.objects.filter(id=delete_custom_field_id, form_type=form_type).first() if not custom_field: messages.error(request, _('Benutzerdefiniertes Feld nicht gefunden.')) else: deleted_label = custom_field.label deleted_id = custom_field.id custom_field.delete() _audit(request, 'form_custom_field_deleted', target_type='form_custom_field', target_id=deleted_id, target_label=deleted_label) messages.success(request, _('Benutzerdefiniertes Feld wurde gelöscht.')) return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}&module=custom-fields") if delete_custom_section_id: custom_section = FormCustomSectionConfig.objects.filter(id=delete_custom_section_id, form_type=form_type).first() if not custom_section: messages.error(request, _('Benutzerdefinierter Abschnitt nicht gefunden.')) else: deleted_label = custom_section.title deleted_id = custom_section.id section_key = custom_section.section_key custom_fields = list(FormCustomFieldConfig.objects.filter(form_type=form_type, section_key=section_key)) deleted_field_count = len(custom_fields) if custom_fields: field_keys = [item.field_key for item in custom_fields] FormConditionalRuleConfig.objects.filter( form_type=form_type, target_key__in=[f'custom__{field_key}' for field_key in field_keys], ).delete() FormCustomFieldConfig.objects.filter(id__in=[item.id for item in custom_fields]).delete() custom_section.delete() _audit( request, 'form_custom_section_deleted', target_type='form_custom_section', target_id=deleted_id, target_label=deleted_label, details={'section_key': section_key, 'deleted_field_count': deleted_field_count}, ) messages.success(request, _('Benutzerdefinierter Abschnitt wurde gelöscht.')) return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}&module=custom-sections") action = request.POST.get('builder_action', '') if action == 'add_option': category = request.POST.get('category', '').strip() label = request.POST.get('label', '').strip() label_en = request.POST.get('label_en', '').strip() value = request.POST.get('value', '').strip() if category not in option_categories: messages.error(request, _('Ungültige Kategorie.')) elif not label: messages.error(request, _('Bitte einen Namen für die Option angeben.')) else: next_sort = ( FormOption.objects.filter(category=category).order_by('-sort_order').values_list('sort_order', flat=True).first() ) FormOption.objects.create( # Global form option catalog entry category=category, label=label, label_en=label_en, value=value or label, sort_order=(next_sort + 1) if next_sort is not None else 0, is_active=True, ) _audit( request, 'form_option_added', target_type='form_option', target_label=label, details={'category': category, 'label_en': label_en, 'value': value or label}, ) messages.success(request, _('Option wurde hinzugefügt.')) option_category = category elif action == 'save_options': option_ids = request.POST.getlist('option_ids') for pos, raw_id in enumerate(option_ids): option = FormOption.objects.filter(id=raw_id).first() if not option: continue next_label = request.POST.get(f'label_{option.id}', '').strip() or option.label option.label = next_label option.label_en = request.POST.get(f'label_en_{option.id}', '').strip() option.value = request.POST.get(f'value_{option.id}', '').strip() or next_label option.is_active = request.POST.get(f'active_{option.id}') == 'on' option.sort_order = pos try: option.save(update_fields=['label', 'label_en', 'value', 'is_active', 'sort_order']) except IntegrityError: messages.error(request, _('Doppelte Bezeichnung in Kategorie: %(label)s') % {'label': next_label}) return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option.category}&module=options") option_category = option.category _audit(request, 'form_options_saved', target_type='form_option', target_label=option_category, details={'count': len(option_ids)}) messages.success(request, _('Optionen wurden gespeichert.')) elif action == 'save_field_texts': field_ids = request.POST.getlist('field_ids') for raw_id in field_ids: cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first() if not cfg: continue cfg.label_override = (request.POST.get(f'label_override_{cfg.id}') or '').strip() cfg.label_override_en = (request.POST.get(f'label_override_en_{cfg.id}') or '').strip() cfg.help_text_override = (request.POST.get(f'help_text_override_{cfg.id}') or '').strip() cfg.help_text_override_en = (request.POST.get(f'help_text_override_en_{cfg.id}') or '').strip() cfg.save(update_fields=['label_override', 'label_override_en', 'help_text_override', 'help_text_override_en']) _audit(request, 'form_field_texts_saved', target_type='form_config', target_label=form_type, details={'count': len(field_ids)}) messages.success(request, _('Feldtexte wurden gespeichert.')) elif action == 'add_custom_section' and form_type == 'onboarding': title = (request.POST.get('custom_section_title') or '').strip() title_en = (request.POST.get('custom_section_title_en') or '').strip() sort_order_raw = (request.POST.get('custom_section_sort_order') or '').strip() if not title: messages.error(request, _('Bitte einen Titel für den benutzerdefinierten Abschnitt angeben.')) else: section_key_base = build_custom_field_key(title) section_key = section_key_base suffix = 2 while FormCustomSectionConfig.objects.filter(form_type=form_type, section_key=section_key).exists(): section_key = f'{section_key_base}_{suffix}' suffix += 1 try: sort_order = int(sort_order_raw or 0) except ValueError: sort_order = 0 FormCustomSectionConfig.objects.create( form_type=form_type, section_key=section_key, sort_order=max(0, sort_order), title=title, title_en=title_en, is_active=True, ) _audit(request, 'form_custom_section_added', target_type='form_custom_section', target_label=title, details={'form_type': form_type, 'section_key': section_key}) messages.success(request, _('Benutzerdefinierter Abschnitt wurde hinzugefügt.')) elif action == 'save_custom_sections' and form_type == 'onboarding': section_ids = request.POST.getlist('custom_section_ids') updated = 0 for raw_id in section_ids: cfg = FormCustomSectionConfig.objects.filter(id=raw_id, form_type=form_type).first() if not cfg: continue try: sort_order = int((request.POST.get(f'custom_section_sort_order_{cfg.id}') or '').strip() or cfg.sort_order) except ValueError: sort_order = cfg.sort_order cfg.title = (request.POST.get(f'custom_section_title_{cfg.id}') or '').strip() or cfg.title cfg.title_en = (request.POST.get(f'custom_section_title_en_{cfg.id}') or '').strip() cfg.is_active = request.POST.get(f'custom_section_is_active_{cfg.id}') == 'on' cfg.sort_order = max(0, sort_order) cfg.save(update_fields=['title', 'title_en', 'is_active', 'sort_order']) updated += 1 _audit(request, 'form_custom_sections_saved', target_type='form_custom_section', target_label=form_type, details={'count': updated}) messages.success(request, _('Benutzerdefinierte Abschnitte wurden gespeichert.')) elif action == 'add_custom_field': label = (request.POST.get('custom_label') or '').strip() label_en = (request.POST.get('custom_label_en') or '').strip() section_key = (request.POST.get('custom_section_key') or '').strip() field_type = (request.POST.get('custom_field_type') or '').strip() sort_order_raw = (request.POST.get('custom_sort_order') or '').strip() help_text = (request.POST.get('custom_help_text') or '').strip() help_text_en = (request.POST.get('custom_help_text_en') or '').strip() select_options = (request.POST.get('custom_select_options') or '').strip() select_options_en = (request.POST.get('custom_select_options_en') or '').strip() section_choices = {key for key in get_section_order(form_type)} field_type_choices = {key for key, _ in FormCustomFieldConfig.FIELD_TYPE_CHOICES} if not label: messages.error(request, _('Bitte eine Bezeichnung für das benutzerdefinierte Feld angeben.')) elif section_key not in section_choices: messages.error(request, _('Ungültiger Abschnitt für das benutzerdefinierte Feld.')) elif field_type not in field_type_choices: messages.error(request, _('Ungültiger Feldtyp.')) elif field_type == FormCustomFieldConfig.FIELD_TYPE_SELECT and not select_options: messages.error(request, _('Auswahlfelder benötigen mindestens eine Option.')) else: field_key_base = build_custom_field_key(label) field_key = field_key_base suffix = 2 while FormCustomFieldConfig.objects.filter(form_type=form_type, field_key=field_key).exists(): field_key = f'{field_key_base}_{suffix}' suffix += 1 try: sort_order = int(sort_order_raw or 0) except ValueError: sort_order = 0 FormCustomFieldConfig.objects.create( form_type=form_type, field_key=field_key, section_key=section_key, sort_order=max(0, sort_order), field_type=field_type, is_active=True, is_required=request.POST.get('custom_is_required') == 'on', label=label, label_en=label_en, help_text=help_text, help_text_en=help_text_en, select_options=select_options, select_options_en=select_options_en, ) _audit(request, 'form_custom_field_added', target_type='form_custom_field', target_label=label, details={'form_type': form_type, 'field_type': field_type, 'section_key': section_key}) messages.success(request, _('Benutzerdefiniertes Feld wurde hinzugefügt.')) elif action == 'save_custom_fields': custom_ids = request.POST.getlist('custom_field_ids') updated = 0 section_choices = {key for key in get_section_order(form_type)} field_type_choices = {key for key, _ in FormCustomFieldConfig.FIELD_TYPE_CHOICES} for raw_id in custom_ids: cfg = FormCustomFieldConfig.objects.filter(id=raw_id, form_type=form_type).first() if not cfg: continue field_type = (request.POST.get(f'custom_field_type_{cfg.id}') or '').strip() section_key = (request.POST.get(f'custom_section_key_{cfg.id}') or '').strip() try: sort_order = int((request.POST.get(f'custom_sort_order_{cfg.id}') or '').strip() or cfg.sort_order) except ValueError: sort_order = cfg.sort_order cfg.label = (request.POST.get(f'custom_label_{cfg.id}') or '').strip() or cfg.label cfg.label_en = (request.POST.get(f'custom_label_en_{cfg.id}') or '').strip() cfg.help_text = (request.POST.get(f'custom_help_text_{cfg.id}') or '').strip() cfg.help_text_en = (request.POST.get(f'custom_help_text_en_{cfg.id}') or '').strip() cfg.is_required = request.POST.get(f'custom_is_required_{cfg.id}') == 'on' cfg.is_active = request.POST.get(f'custom_is_active_{cfg.id}') == 'on' if field_type in field_type_choices: cfg.field_type = field_type if section_key in section_choices: cfg.section_key = section_key cfg.sort_order = max(0, sort_order) cfg.select_options = (request.POST.get(f'custom_select_options_{cfg.id}') or '').strip() cfg.select_options_en = (request.POST.get(f'custom_select_options_en_{cfg.id}') or '').strip() if cfg.field_type == FormCustomFieldConfig.FIELD_TYPE_SELECT and not cfg.select_options: messages.error(request, _('Auswahlfeld "%(label)s" benötigt mindestens eine Option.') % {'label': cfg.label}) return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}&module=custom-fields") cfg.save() updated += 1 _audit(request, 'form_custom_fields_saved', target_type='form_custom_field', target_label=form_type, details={'count': updated}) messages.success(request, _('Benutzerdefinierte Felder wurden gespeichert.')) elif action == 'save_field_rules': field_ids = request.POST.getlist('field_rule_ids') locked_fields = LOCKED_FIELD_RULES.get(form_type, set()) updated = 0 for raw_id in field_ids: cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first() if not cfg: continue if cfg.field_name in locked_fields and not can_override_locked_builder_rules: cfg.is_visible = True cfg.is_required = None else: cfg.is_visible = request.POST.get(f'is_visible_{cfg.id}') == 'on' required_mode = (request.POST.get(f'is_required_{cfg.id}') or '').strip() cfg.is_required = True if required_mode == 'required' else False if required_mode == 'optional' else None cfg.save(update_fields=['is_visible', 'is_required']) updated += 1 _audit(request, 'form_field_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated}) messages.success(request, _('Feldregeln wurden gespeichert.')) elif action == 'save_section_rules' and form_type in {'onboarding', 'offboarding'}: section_configs = ensure_form_section_configs(form_type) locked_sections = LOCKED_SECTION_RULES.get(form_type, set()) posted_order = request.POST.getlist('section_order') next_sort_order = 0 updated = 0 for section_key in posted_order: cfg = section_configs.get(section_key) if cfg is not None: if cfg.sort_order != next_sort_order: cfg.sort_order = next_sort_order cfg.save(update_fields=['sort_order']) updated += 1 next_sort_order += 1 continue if form_type == 'onboarding': custom_cfg = FormCustomSectionConfig.objects.filter(form_type=form_type, section_key=section_key).first() if custom_cfg and custom_cfg.sort_order != next_sort_order: custom_cfg.sort_order = next_sort_order custom_cfg.save(update_fields=['sort_order']) updated += 1 next_sort_order += 1 for section_key, cfg in section_configs.items(): if section_key in locked_sections and not can_override_locked_builder_rules: if not cfg.is_visible: cfg.is_visible = True cfg.save(update_fields=['is_visible']) continue cfg.is_visible = request.POST.get(f'section_visible_{section_key}') == 'on' cfg.save(update_fields=['is_visible']) updated += 1 if form_type == 'onboarding': for cfg in FormCustomSectionConfig.objects.filter(form_type=form_type): cfg.is_active = request.POST.get(f'section_visible_{cfg.section_key}') == 'on' cfg.save(update_fields=['is_active']) updated += 1 _audit(request, 'form_section_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated}) messages.success(request, _('Abschnittsregeln wurden gespeichert.')) elif action == 'save_conditional_rules' and form_type == 'onboarding': rule_configs = ensure_form_conditional_rule_configs(form_type) updated = 0 for target_key, cfg in rule_configs.items(): cfg.is_active = request.POST.get(f'conditional_active_{target_key}') == 'on' clauses = [] clause_total = 2 for index in range(clause_total): field_name = (request.POST.get(f'conditional_field_{target_key}_{index}') or '').strip() operator = (request.POST.get(f'conditional_operator_{target_key}_{index}') or '').strip() value = (request.POST.get(f'conditional_value_{target_key}_{index}') or '').strip() if not field_name or not operator: continue parsed_value = True if operator == 'checked' else value clauses.append({'field': field_name, 'operator': operator, 'value': parsed_value}) cfg.clauses = clauses cfg.save(update_fields=['is_active', 'clauses']) updated += 1 _audit(request, 'form_conditional_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated}) messages.success(request, _('Bedingte Logik wurde gespeichert.')) elif action == 'apply_preset': preset_key = (request.POST.get('preset_key') or '').strip() if apply_form_preset(form_type, preset_key): active_module = 'preview' _audit(request, 'form_preset_applied', target_type='form_config', target_label=form_type, details={'preset': preset_key}) messages.success(request, _('Preset wurde angewendet.')) else: messages.error(request, _('Preset konnte nicht angewendet werden.')) if action in {'add_option', 'save_options'}: active_module = 'options' elif action == 'save_field_texts': active_module = 'field-texts' elif action in {'add_custom_field', 'save_custom_fields'}: active_module = 'custom-fields' elif action in {'add_custom_section', 'save_custom_sections'}: active_module = 'custom-sections' elif action in {'save_field_rules', 'save_section_rules', 'save_conditional_rules'}: active_module = 'section-rules' if action == 'save_section_rules': active_module = 'section-rules' elif action == 'save_field_rules': active_module = 'field-rules' elif action == 'save_conditional_rules': active_module = 'conditional-rules' redirect_target = f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}" if active_module: redirect_target += f"&module={active_module}" if active_structure_section: redirect_target += f"&structure_section={active_structure_section}" if active_section_rules_section and active_module == 'section-rules': redirect_target += f"§ion_rules_section={active_section_rules_section}" if active_field_rules_section and active_module == 'field-rules': redirect_target += f"&field_rules_section={active_field_rules_section}" if active_conditional_target and active_module == 'conditional-rules': redirect_target += f"&conditional_target={active_conditional_target}" if active_field_texts_section and active_module == 'field-texts': redirect_target += f"&field_texts_section={active_field_texts_section}" if active_custom_fields_section and active_module == 'custom-fields': redirect_target += f"&custom_fields_section={active_custom_fields_section}" return redirect(redirect_target) default_names = list(DEFAULT_FIELD_ORDER.get(form_type, [])) existing_names = list( OnboardingRequestForm.base_fields.keys() if form_type == 'onboarding' else OffboardingRequestForm.base_fields.keys() ) for name in existing_names: if name not in default_names: default_names.append(name) ensure_form_field_configs(form_type, default_names) section_configs = ensure_form_section_configs(form_type) conditional_rule_configs = ensure_form_conditional_rule_configs(form_type) if form_type == 'onboarding' else {} section_definitions = get_section_definitions(form_type, include_inactive_custom=True) section_order = [item['key'] for item in section_definitions] section_labels = {item['key']: item['title'] for item in section_definitions} default_page_map = get_default_page_map(form_type) configs = list( FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name') ) labels = _form_field_labels(form_type) locked = LOCKED_FIELD_RULES.get(form_type, set()) locked_sections = LOCKED_SECTION_RULES.get(form_type, set()) custom_field_configs = list(FormCustomFieldConfig.objects.filter(form_type=form_type).order_by('section_key', 'sort_order', 'field_key')) custom_section_configs = get_custom_section_configs(form_type, include_inactive=True) if form_type == 'onboarding': columns = [ { 'key': key, 'title': section_labels.get(key, key), 'items': [], } for key in section_order ] column_by_key = {c['key']: c for c in columns} fallback = 'abschluss' for cfg in configs: page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(cfg.field_name, fallback) if page_key not in column_by_key: page_key = fallback column_by_key[page_key]['items'].append( { 'field_name': cfg.field_name, 'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name), 'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name), 'label_en': cfg.label_override_en, 'is_visible': cfg.is_visible, 'is_required': cfg.is_required, 'locked': cfg.field_name in locked, 'page_key': page_key, 'is_custom': False, 'sort_order': cfg.sort_order, } ) for cfg in custom_field_configs: page_key = cfg.section_key or fallback if page_key not in column_by_key: page_key = fallback column_by_key[page_key]['items'].append( { 'field_name': f'custom__{cfg.field_key}', 'label': cfg.translated_label(language_code), 'label_de': cfg.label, 'label_en': cfg.label_en, 'is_visible': cfg.is_active, 'is_required': cfg.is_required, 'locked': False, 'page_key': page_key, 'is_custom': True, 'sort_order': cfg.sort_order, } ) for column in columns: column['items'].sort(key=lambda item: (item.get('sort_order', 9999), item['field_name'])) else: columns = [ { 'key': key, 'title': section_labels.get(key, key), 'items': [], } for key in section_order ] column_by_key = {c['key']: c for c in columns} fallback = section_order[-1] if section_order else 'all' for cfg in configs: page_key = cfg.page_key or default_page_map.get(cfg.field_name, fallback) if page_key not in column_by_key: page_key = fallback column_by_key[page_key]['items'].append( { 'field_name': cfg.field_name, 'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name), 'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name), 'label_en': cfg.label_override_en, 'is_visible': cfg.is_visible, 'is_required': cfg.is_required, 'locked': cfg.field_name in locked, 'page_key': page_key, 'is_custom': False, 'sort_order': cfg.sort_order, } ) for cfg in custom_field_configs: page_key = cfg.section_key or fallback if page_key not in column_by_key: page_key = fallback column_by_key[page_key]['items'].append( { 'field_name': f'custom__{cfg.field_key}', 'label': cfg.translated_label(language_code), 'label_de': cfg.label, 'label_en': cfg.label_en, 'is_visible': cfg.is_active, 'is_required': cfg.is_required, 'locked': False, 'page_key': page_key, 'is_custom': True, 'sort_order': cfg.sort_order, } ) for column in columns: column['items'].sort(key=lambda item: (item.get('sort_order', 9999), item['field_name'])) section_rule_items = [] if section_order: if active_structure_section not in section_order: active_structure_section = section_order[0] fallback_section = section_order[-1] if section_order else '' custom_section_map = {cfg.section_key: cfg for cfg in custom_section_configs} for key in section_order: cfg = section_configs.get(key) custom_cfg = custom_section_map.get(key) is_custom = custom_cfg is not None raw_title = str(section_labels.get(key, key)) display_title = re.sub(r'^\d+\.\s*', '', raw_title) if not is_custom else raw_title section_rule_items.append( { 'key': key, 'title': raw_title, 'display_title': display_title, 'is_visible': bool(custom_cfg.is_active) if is_custom else (True if not cfg else cfg.is_visible), 'locked': False if is_custom else (key in locked_sections and not can_override_locked_builder_rules), 'is_custom': is_custom, 'sort_order': custom_cfg.sort_order if is_custom else (cfg.sort_order if cfg else 0), 'field_count': len([c for c in configs if (c.page_key or default_page_map.get(c.field_name, fallback_section)) == key]) + len([c for c in custom_field_configs if c.section_key == key]), } ) section_rule_keys = [item['key'] for item in section_rule_items] if section_rule_keys and active_section_rules_section not in section_rule_keys: active_section_rules_section = section_rule_keys[0] field_rule_items = [] for cfg in configs: page_key = cfg.page_key or default_page_map.get(cfg.field_name, section_order[-1] if section_order else '') field_rule_items.append( { 'id': cfg.id, 'field_name': cfg.field_name, 'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name), 'page_key': page_key, 'page_label': section_labels.get(page_key, page_key) if section_order else '', 'is_visible': cfg.is_visible, 'is_required': cfg.is_required, 'locked': cfg.field_name in locked and not can_override_locked_builder_rules, 'summary': _field_rule_summary( is_visible=cfg.is_visible, is_required=cfg.is_required, locked=cfg.field_name in locked and not can_override_locked_builder_rules, ), } ) custom_field_groups = [] if section_order: grouped_custom = {key: [] for key in section_order} for cfg in custom_field_configs: grouped_custom.setdefault(cfg.section_key, []).append(cfg) for key in section_order: custom_field_groups.append( { 'key': key, 'title': section_labels.get(key, key), 'items': grouped_custom.get(key, []), } ) custom_section_field_counts: dict[str, int] = {} for cfg in custom_field_configs: custom_section_field_counts[cfg.section_key] = custom_section_field_counts.get(cfg.section_key, 0) + 1 for cfg in custom_section_configs: cfg.custom_field_count = custom_section_field_counts.get(cfg.section_key, 0) field_rule_groups = [] if section_order: grouped_rules = {key: [] for key in section_order} for item in field_rule_items: grouped_rules.setdefault(item['page_key'], []).append(item) for key in section_order: field_rule_groups.append( { 'key': key, 'title': section_labels.get(key, key), 'items': grouped_rules.get(key, []), } ) field_rule_group_keys = [group['key'] for group in field_rule_groups] if field_rule_group_keys and active_field_rules_section not in field_rule_group_keys: active_field_rules_section = field_rule_group_keys[0] field_text_groups = [] if section_order: grouped_texts = {key: [] for key in section_order} for cfg in configs: page_key = cfg.page_key or default_page_map.get(cfg.field_name, section_order[-1] if section_order else '') grouped_texts.setdefault(page_key, []).append(cfg) for key in section_order: field_text_groups.append( { 'key': key, 'title': section_labels.get(key, key), 'items': grouped_texts.get(key, []), } ) field_text_group_keys = [group['key'] for group in field_text_groups] if field_text_group_keys and active_field_texts_section not in field_text_group_keys: active_field_texts_section = field_text_group_keys[0] custom_field_group_keys = [group['key'] for group in custom_field_groups] if custom_field_group_keys and active_custom_fields_section not in custom_field_group_keys: active_custom_fields_section = custom_field_group_keys[0] conditional_rule_items = [] if form_type == 'onboarding': conditional_field_choices = [] for field_name in [ 'order_business_cards', 'employment_type', 'group_mailboxes_required_choice', 'additional_hardware_needed_choice', 'additional_software_needed_choice', 'additional_access_needed_choice', 'successor_required_choice', 'inherit_phone_number_choice', ]: conditional_field_choices.append((field_name, labels.get(field_name, field_name))) for cfg in custom_field_configs: conditional_field_choices.append((f'custom__{cfg.field_key}', cfg.translated_label(language_code))) conditional_field_label_map = {value: label for value, label in conditional_field_choices} conditional_target_titles = { 'business-card-box': _('Visitenkarten-Details'), 'employment-end-box': _('Vertragsende'), 'group-mailboxes-box': _('Gruppenpostfächer'), 'extra-hardware-box': _('Zusätzliche Hardware'), 'extra-software-box': _('Zusätzliche Software'), 'extra-access-box': _('Zusätzliche Zugänge'), 'successor-box': _('Nachfolge'), } conditional_target_descriptions = { 'business-card-box': _('Steuert die Detailfelder für Visitenkarten.'), 'employment-end-box': _('Steuert das Enddatum bei befristeter Beschäftigung.'), 'group-mailboxes-box': _('Steuert das Freitextfeld für Gruppenpostfächer.'), 'extra-hardware-box': _('Steuert zusätzliche Hardware-Felder.'), 'extra-software-box': _('Steuert zusätzliche Software-Felder.'), 'extra-access-box': _('Steuert zusätzliche Zugangsangaben.'), 'successor-box': _('Steuert Nachfolge- und Übernahmefelder.'), } for target_key, cfg in conditional_rule_configs.items(): clauses = list(cfg.clauses or []) while len(clauses) < 2: clauses.append({'field': '', 'operator': 'equals', 'value': ''}) if target_key.startswith('custom__'): custom_field_key = target_key.replace('custom__', '', 1) custom_field = next((item for item in custom_field_configs if item.field_key == custom_field_key), None) target_title = custom_field.translated_label(language_code) if custom_field else target_key target_description = _('Steuert die Sichtbarkeit dieses benutzerdefinierten Feldes.') target_fields = [target_title] else: target_title = conditional_target_titles.get(target_key, target_key) target_description = conditional_target_descriptions.get(target_key, '') target_fields = [labels.get(name, name) for name in ONBOARDING_GROUPS.get(target_key, [])] conditional_rule_items.append( { 'target_key': target_key, 'title': target_title, 'description': target_description, 'is_active': cfg.is_active, 'clauses': clauses[:2], 'summary': _conditional_rule_summary(clauses[:2], conditional_field_label_map), 'field_choices': conditional_field_choices, 'operator_choices': CONDITIONAL_RULE_OPERATOR_CHOICES, 'target_fields': target_fields, } ) conditional_rule_keys = [item['target_key'] for item in conditional_rule_items] if conditional_rule_keys and active_conditional_target not in conditional_rule_keys: active_conditional_target = conditional_rule_keys[0] preview_sections = [] if section_order: field_rule_group_map = {group['key']: group['items'] for group in field_rule_groups} for key in section_order: section_cfg = section_configs.get(key) section_locked = key in locked_sections custom_section = next((cfg for cfg in custom_section_configs if cfg.section_key == key), None) section_visible = bool(custom_section.is_active) if custom_section else (True if section_locked or not section_cfg else section_cfg.is_visible) visible_items = [ item for item in field_rule_group_map.get(key, []) if item['locked'] or item['is_visible'] ] visible_items.extend( [ { 'label': cfg.translated_label(language_code), 'locked': False, } for cfg in custom_field_configs if cfg.section_key == key and cfg.is_active ] ) if section_visible: preview_sections.append( { 'key': key, 'title': section_labels.get(key, key), 'items': visible_items, } ) locked_field_count = len([item for item in field_rule_items if item['locked']]) hidden_field_count = len([item for item in field_rule_items if not item['is_visible']]) configurable_field_count = len(field_rule_items) - locked_field_count hidden_section_count = len([item for item in section_rule_items if not item['is_visible']]) if section_rule_items else 0 builder_summary = { 'locked_field_count': locked_field_count, 'configurable_field_count': configurable_field_count, 'hidden_field_count': hidden_field_count, 'hidden_section_count': hidden_section_count, 'custom_field_count': len([cfg for cfg in custom_field_configs if cfg.is_active]), 'custom_section_count': len([cfg for cfg in custom_section_configs if cfg.is_active]), } module_labels = { 'structure': _('Struktur & Reihenfolge'), 'section-rules': _('Abschnitte'), 'field-rules': _('Feldregeln'), 'conditional-rules': _('Bedingte Logik'), 'options': _('Optionen'), 'field-texts': _('Feldtexte'), 'custom-sections': _('Eigene Abschnitte'), 'custom-fields': _('Eigene Felder'), 'preview': _('Vorschau'), } option_category_labels = dict(_translate_choice_list(FormOption.CATEGORY_CHOICES)) form_type_labels = { 'onboarding': _('Onboarding'), 'offboarding': _('Offboarding'), } active_focus_label = '' if active_module == 'structure' and active_structure_section: active_focus_label = section_labels.get(active_structure_section, active_structure_section) elif active_module == 'section-rules' and section_rule_items: active_focus_label = _('Alle Abschnitte') elif active_module == 'field-rules' and active_field_rules_section: active_focus_label = section_labels.get(active_field_rules_section, active_field_rules_section) elif active_module == 'conditional-rules' and active_conditional_target: active_focus_label = next((item['title'] for item in conditional_rule_items if item['target_key'] == active_conditional_target), active_conditional_target) elif active_module == 'options': active_focus_label = option_category_labels.get(option_category, option_category) elif active_module == 'field-texts' and active_field_texts_section: active_focus_label = section_labels.get(active_field_texts_section, active_field_texts_section) elif active_module == 'custom-sections': active_focus_label = _('Onboarding') elif active_module == 'custom-fields' and active_custom_fields_section: active_focus_label = section_labels.get(active_custom_fields_section, active_custom_fields_section) elif active_module == 'preview': active_focus_label = _('Live-Vorschau') return render( request, 'workflows/form_builder.html', { 'form_type': form_type, 'columns': columns, 'form_types': [('onboarding', _('Onboarding')), ('offboarding', _('Offboarding'))], 'option_categories': _translate_choice_list(FormOption.CATEGORY_CHOICES), 'selected_option_category': option_category, 'option_items': FormOption.objects.filter(category=option_category).order_by('sort_order', 'label'), 'field_text_items': configs, 'field_rule_items': field_rule_items, 'field_rule_groups': field_rule_groups, 'field_text_groups': field_text_groups, 'preview_sections': preview_sections, 'section_rule_items': section_rule_items, 'builder_summary': builder_summary, 'conditional_rule_items': conditional_rule_items, 'custom_field_groups': custom_field_groups, 'custom_field_type_choices': _translate_choice_list(FormCustomFieldConfig.FIELD_TYPE_CHOICES), 'custom_section_items': custom_section_configs, 'active_panel': active_panel, 'active_subpanel': active_subpanel, 'active_rules_panel': active_rules_panel, 'active_module': active_module, 'active_form_type_label': form_type_labels.get(form_type, form_type), 'active_module_label': module_labels.get(active_module, active_module), 'active_focus_label': active_focus_label, 'active_structure_section': active_structure_section, 'active_field_rules_section': active_field_rules_section, 'active_field_texts_section': active_field_texts_section, 'active_custom_fields_section': active_custom_fields_section, 'active_section_rules_section': active_section_rules_section, 'active_conditional_target': active_conditional_target, 'available_presets': FORM_PRESETS.get(form_type, {}), 'can_override_locked_builder_rules': can_override_locked_builder_rules, }, ) @_require_capability('manage_builders') def intro_builder_page(request): if request.method == 'POST': delete_id = (request.POST.get('delete_item_id') or '').strip() if delete_id: item = IntroChecklistItem.objects.filter(id=delete_id).first() if item: deleted_label = item.label deleted_id_int = item.id item.delete() _audit(request, 'intro_checklist_item_deleted', target_type='intro_checklist_item', target_id=deleted_id_int, target_label=deleted_label) messages.success(request, 'Checklistenpunkt wurde gelöscht.') else: messages.error(request, 'Checklistenpunkt nicht gefunden.') return redirect('intro_builder_page') action = (request.POST.get('builder_action') or '').strip() if action == 'add_item': section = (request.POST.get('section') or '').strip() label = (request.POST.get('label') or '').strip() label_en = (request.POST.get('label_en') or '').strip() if section not in {k for k, _ in IntroChecklistItem.SECTION_CHOICES}: messages.error(request, 'Ungültiger Abschnitt.') return redirect('intro_builder_page') if not label: messages.error(request, 'Bitte eine Bezeichnung für den Checklistenpunkt angeben.') return redirect('intro_builder_page') next_sort = ( IntroChecklistItem.objects.filter(section=section).order_by('-sort_order').values_list('sort_order', flat=True).first() ) IntroChecklistItem.objects.create( section=section, label=label, label_en=label_en, sort_order=(next_sort + 1) if next_sort is not None else 0, is_active=True, condition_operator='always', ) _audit(request, 'intro_checklist_item_added', target_type='intro_checklist_item', target_label=label, details={'section': section, 'label_en': label_en}) messages.success(request, 'Checklistenpunkt wurde hinzugefügt.') return redirect('intro_builder_page') if action == 'save_items': item_ids = request.POST.getlist('item_ids') valid_sections = {k for k, _ in IntroChecklistItem.SECTION_CHOICES} valid_ops = {k for k, _ in IntroChecklistItem.OPERATOR_CHOICES} for pos, raw_id in enumerate(item_ids): item = IntroChecklistItem.objects.filter(id=raw_id).first() if not item: continue section = (request.POST.get(f'section_{item.id}') or item.section).strip() if section not in valid_sections: section = item.section operator = (request.POST.get(f'operator_{item.id}') or item.condition_operator).strip() if operator not in valid_ops: operator = 'always' item.section = section item.label = (request.POST.get(f'label_{item.id}') or item.label).strip() or item.label item.label_en = (request.POST.get(f'label_en_{item.id}') or '').strip() item.is_active = request.POST.get(f'active_{item.id}') == 'on' item.condition_field = (request.POST.get(f'field_{item.id}') or '').strip() item.condition_operator = operator item.condition_value = (request.POST.get(f'value_{item.id}') or '').strip() item.sort_order = pos item.save( update_fields=[ 'section', 'label', 'label_en', 'is_active', 'condition_field', 'condition_operator', 'condition_value', 'sort_order', ] ) _audit(request, 'intro_checklist_saved', target_type='intro_checklist_item', details={'count': len(item_ids)}) messages.success(request, 'Einweisungs-Checkliste wurde gespeichert.') return redirect('intro_builder_page') condition_field_choices = [ ('', 'Keine Bedingung'), ('needed_devices', 'Benötigte Geräte und Gegenstände'), ('needed_software', 'Benötigte Software'), ('needed_accesses', 'Benötigte Zugänge'), ('needed_workspace_groups', 'Benötigte Gruppen im Workspace'), ('needed_resources', 'Benötigte Ressourcen'), ('additional_hardware', 'Zusätzliche Hardware'), ('additional_software', 'Zusätzliche Software'), ('additional_access_text', 'Weitere Zugänge (Freitext)'), ('group_mailboxes_required', 'Gruppenpostfächer erforderlich'), ('order_business_cards', 'Visitenkarten bestellt'), ('phone_number', 'Direktwahl vorhanden'), ('successor_name', 'Nachfolge vorhanden'), ('department', 'Abteilung'), ] items = list(IntroChecklistItem.objects.all().order_by('section', 'sort_order', 'label')) return render( request, 'workflows/intro_builder.html', { 'items': items, 'section_choices': _translate_choice_list(IntroChecklistItem.SECTION_CHOICES), 'operator_choices': _translate_choice_list(IntroChecklistItem.OPERATOR_CHOICES), 'condition_field_choices': condition_field_choices, }, ) @_require_capability('manage_integrations') def integrations_setup_page(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') kind = (request.GET.get('kind') or 'nextcloud').strip().lower() if kind not in {'nextcloud', 'mail', 'emails', 'rules', 'backup'}: kind = 'nextcloud' templates = list(NotificationTemplate.objects.all().order_by('key')) system_email_config = ( SystemEmailConfig.objects.filter(is_active=True).order_by('-updated_at').first() or SystemEmailConfig.objects.filter(name='Default SMTP').first() ) return render( request, 'workflows/integrations_setup.html', { 'workflow_config': config, 'system_email_config': system_email_config, 'nextcloud_enabled': is_nextcloud_enabled(), 'email_test_mode': is_email_test_mode(), 'kind': kind, 'templates': templates, 'notification_rules': NotificationRule.objects.all().order_by('event_type', 'sort_order', 'id'), 'rule_event_choices': NotificationRule.EVENT_CHOICES, 'rule_operator_choices': NotificationRule.OPERATOR_CHOICES, 'template_choices': NotificationTemplate.TEMPLATE_CHOICES, 'remote_backup_target_choices': WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES, }, ) @_require_capability('manage_welcome_emails') def welcome_emails_page(request): rows = ScheduledWelcomeEmail.objects.select_related('onboarding_request').order_by('-send_at', '-id')[:200] config, _ = WorkflowConfig.objects.get_or_create(name='Default') welcome_template = NotificationTemplate.objects.filter(key='onboarding_welcome').first() default_welcome = get_default_notification_templates().get('onboarding_welcome', {}) default_subject = (default_welcome.get('subject') or '').strip() default_body = (default_welcome.get('body') or '').strip() default_subject_en = (default_welcome.get('subject_en') or '').strip() default_body_en = (default_welcome.get('body_en') or '').strip() subject_value = (welcome_template.subject_template if welcome_template else '').strip() or default_subject body_value = (welcome_template.body_template if welcome_template else '').strip() or default_body subject_value_en = (welcome_template.subject_template_en if welcome_template else '').strip() or default_subject_en body_value_en = (welcome_template.body_template_en if welcome_template else '').strip() or default_body_en return render( request, 'workflows/welcome_emails.html', { 'rows': rows, 'workflow_config': config, 'welcome_template': welcome_template, 'welcome_subject_value': subject_value, 'welcome_body_value': body_value, 'welcome_subject_value_en': subject_value_en, 'welcome_body_value_en': body_value_en, 'welcome_keywords': ['{{ FULL_NAME }}', '{{ VORNAME }}', '{{ NACHNAME }}', '{{ DEPARTMENT }}', '{{ CONTRACT_START }}', '{{ EMAIL }}', '{{ REQUESTED_BY }}'], }, ) @_require_capability('manage_welcome_emails') @require_POST def trigger_welcome_email_now(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status == 'cancelled': messages.error(request, f'Welcome E-Mail #{schedule_id} ist abgebrochen und kann nicht gesendet werden.') return redirect('welcome_emails_page') async_result = send_scheduled_welcome_email.delay(scheduled.id, True) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.status = 'scheduled' scheduled.last_error = '' scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at']) _audit(request, 'welcome_email_triggered_now', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde sofort angestoßen.') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def save_welcome_email_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: delay_days = int(request.POST.get('welcome_email_delay_days', config.welcome_email_delay_days or 5)) except ValueError: messages.error(request, 'Ungültige Zahl bei der Welcome-Verzögerung.') return redirect('welcome_emails_page') config.welcome_email_delay_days = max(0, delay_days) config.welcome_sender_email = request.POST.get('welcome_sender_email', '').strip() config.welcome_include_pdf = request.POST.get('welcome_include_pdf') == 'on' config.save(update_fields=['welcome_email_delay_days', 'welcome_sender_email', 'welcome_include_pdf']) subject = request.POST.get('welcome_subject') body = request.POST.get('welcome_body') subject_en = request.POST.get('welcome_subject_en') body_en = request.POST.get('welcome_body_en') if subject is not None or body is not None or subject_en is not None or body_en is not None: default_welcome = get_default_notification_templates().get('onboarding_welcome', {}) default_subject = (default_welcome.get('subject') or '').strip() default_body = (default_welcome.get('body') or '').strip() default_subject_en = (default_welcome.get('subject_en') or '').strip() default_body_en = (default_welcome.get('body_en') or '').strip() subject_clean = (subject or '').strip() or default_subject body_clean = (body or '').strip() or default_body subject_clean_en = (subject_en or '').strip() or default_subject_en body_clean_en = (body_en or '').strip() or default_body_en template, _ = NotificationTemplate.objects.get_or_create( key='onboarding_welcome', defaults={ 'subject_template': subject_clean, 'body_template': body_clean, 'subject_template_en': subject_clean_en, 'body_template_en': body_clean_en, 'is_active': True, }, ) changes = [] if template.subject_template != subject_clean: template.subject_template = subject_clean changes.append('subject_template') if template.body_template != body_clean: template.body_template = body_clean changes.append('body_template') if template.subject_template_en != subject_clean_en: template.subject_template_en = subject_clean_en changes.append('subject_template_en') if template.body_template_en != body_clean_en: template.body_template_en = body_clean_en changes.append('body_template_en') if not template.is_active: template.is_active = True changes.append('is_active') if changes: template.save(update_fields=changes) _audit( request, 'welcome_email_settings_saved', target_type='welcome_email_settings', target_label='onboarding_welcome', details={ 'delay_days': config.welcome_email_delay_days, 'sender_email': config.welcome_sender_email, 'include_pdf': config.welcome_include_pdf, }, ) messages.success(request, 'Welcome-E-Mail Einstellungen wurden gespeichert.') return redirect('welcome_emails_page') def _revoke_celery_task(task_id: str) -> None: if not task_id: return try: current_app.control.revoke(task_id, terminate=False) except Exception: return def _parse_selected_schedule_ids(raw: str) -> list[int]: if not raw: return [] parsed: list[int] = [] seen: set[int] = set() for token in raw.split(','): token = token.strip() if not token: continue try: schedule_id = int(token) except ValueError: continue if schedule_id in seen: continue seen.add(schedule_id) parsed.append(schedule_id) return parsed @_require_capability('manage_welcome_emails') @require_POST def bulk_welcome_email_action(request): action = (request.POST.get('bulk_action') or '').strip().lower() selected_ids = _parse_selected_schedule_ids(request.POST.get('selected_ids', '')) if action not in {'pause', 'send_now', 'delete'}: messages.error(request, 'Ungültige Bulk-Aktion.') return redirect('welcome_emails_page') if not selected_ids: messages.warning(request, 'Keine Welcome-Einträge ausgewählt.') return redirect('welcome_emails_page') rows = list(ScheduledWelcomeEmail.objects.filter(id__in=selected_ids).order_by('id')) if not rows: messages.warning(request, 'Keine passenden Welcome-Einträge gefunden.') return redirect('welcome_emails_page') success_count = 0 skipped_count = 0 for scheduled in rows: if action == 'pause': if scheduled.status in {'sent', 'cancelled'}: skipped_count += 1 continue _revoke_celery_task(scheduled.celery_task_id) scheduled.status = 'paused' scheduled.save(update_fields=['status', 'updated_at']) success_count += 1 continue if action == 'send_now': if scheduled.status == 'cancelled': skipped_count += 1 continue async_result = send_scheduled_welcome_email.delay(scheduled.id, True) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.status = 'scheduled' scheduled.last_error = '' scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at']) success_count += 1 continue if action == 'delete': if scheduled.status == 'scheduled': _revoke_celery_task(scheduled.celery_task_id) scheduled.delete() success_count += 1 action_label = { 'pause': 'pausiert', 'send_now': 'sofort angestoßen', 'delete': 'gelöscht', }[action] if success_count: _audit( request, 'welcome_email_bulk_action', target_type='welcome_email', target_label=action, details={'selected_ids': selected_ids, 'success_count': success_count, 'skipped_count': skipped_count}, ) messages.success(request, f'{success_count} Welcome-Eintrag/Einträge {action_label}.') if skipped_count: messages.warning(request, f'{skipped_count} Eintrag/Einträge wurden übersprungen (Status nicht geeignet).') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def pause_welcome_email(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status in {'sent', 'cancelled'}: messages.error(request, f'Welcome E-Mail #{schedule_id} kann nicht pausiert werden.') return redirect('welcome_emails_page') _revoke_celery_task(scheduled.celery_task_id) scheduled.status = 'paused' scheduled.save(update_fields=['status', 'updated_at']) _audit(request, 'welcome_email_paused', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde pausiert.') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def resume_welcome_email(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status != 'paused': messages.error(request, f'Welcome E-Mail #{schedule_id} ist nicht pausiert.') return redirect('welcome_emails_page') eta = scheduled.send_at if timezone.now() < scheduled.send_at else None async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=eta) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.status = 'scheduled' scheduled.last_error = '' scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at']) _audit(request, 'welcome_email_resumed', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde fortgesetzt.') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def cancel_welcome_email(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status == 'sent': messages.error(request, f'Welcome E-Mail #{schedule_id} wurde bereits gesendet.') return redirect('welcome_emails_page') _revoke_celery_task(scheduled.celery_task_id) scheduled.status = 'cancelled' scheduled.last_error = '' scheduled.save(update_fields=['status', 'last_error', 'updated_at']) _audit(request, 'welcome_email_cancelled', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde abgebrochen.') return redirect('welcome_emails_page') @_require_capability('manage_builders') @require_POST def form_builder_save_order(request): try: payload = json.loads(request.body.decode('utf-8')) except (json.JSONDecodeError, UnicodeDecodeError): return JsonResponse({'ok': False, 'error': _('Ungültige JSON-Daten.')}, status=400) form_type = payload.get('form_type') if form_type not in DEFAULT_FIELD_ORDER: return JsonResponse({'ok': False, 'error': _('Ungültiger Formulartyp.')}, status=400) default_page_map = get_default_page_map(form_type) columns = payload.get('columns') if not isinstance(columns, dict): return JsonResponse({'ok': False, 'error': _('Spalten-Daten fehlen.')}, status=400) configs = list(FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name')) custom_configs = list(FormCustomFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_key')) allowed_names = {cfg.field_name for cfg in configs} | {f'custom__{cfg.field_key}' for cfg in custom_configs} seen = set() allowed_columns = get_section_order(form_type) fallback_section = allowed_columns[-1] if allowed_columns else '' name_to_cfg = {cfg.field_name: cfg for cfg in configs} custom_name_to_cfg = {f'custom__{cfg.field_key}': cfg for cfg in custom_configs} sort_order = 0 for column_key in allowed_columns: names = columns.get(column_key, []) if not isinstance(names, list): return JsonResponse({'ok': False, 'error': _('Ungültige Spalte: %(column)s') % {'column': column_key}}, status=400) for name in names: if not isinstance(name, str): continue if name not in allowed_names or name in seen: continue seen.add(name) if name in name_to_cfg: cfg = name_to_cfg[name] cfg.sort_order = sort_order cfg.page_key = column_key else: cfg = custom_name_to_cfg[name] cfg.sort_order = sort_order cfg.section_key = column_key sort_order += 1 missing = [cfg.field_name for cfg in configs if cfg.field_name not in seen] for name in missing: cfg = name_to_cfg[name] cfg.sort_order = sort_order sort_order += 1 cfg.page_key = cfg.page_key or default_page_map.get(name, fallback_section) missing_custom = [name for name in custom_name_to_cfg.keys() if name not in seen] for name in missing_custom: cfg = custom_name_to_cfg[name] cfg.sort_order = sort_order sort_order += 1 cfg.section_key = cfg.section_key or fallback_section FormFieldConfig.objects.bulk_update(configs, ['sort_order', 'page_key']) if custom_configs: FormCustomFieldConfig.objects.bulk_update(custom_configs, ['sort_order', 'section_key']) saved_count = len(configs) + len(custom_configs) _audit(request, 'form_layout_saved', target_type='form_config', target_label=form_type, details={'saved_count': saved_count}) return JsonResponse({'ok': True, 'saved_count': saved_count}) @_require_capability('manage_integrations') @require_POST def send_test_email(request): mode = 'TEST_MODE_ON' if is_email_test_mode() else 'TEST_MODE_OFF' redirect_email = get_email_test_redirect() try: send_system_email( subject=f'SMTP test from onboarding/offboarding v2 ({mode})', body=( 'This is a test email. If you see this, SMTP is configured correctly.\n' f'EMAIL_TEST_MODE={is_email_test_mode()}\n' f'EMAIL_TEST_REDIRECT={redirect_email}\n' ), to=[settings.TEST_NOTIFICATION_EMAIL], ) _audit(request, 'smtp_test_sent', target_type='system_email', target_label=settings.TEST_NOTIFICATION_EMAIL, details={'email_test_mode': is_email_test_mode()}) notify_user( user=request.user, title=_('SMTP-Test erfolgreich'), body=_('Die SMTP-Testmail wurde erfolgreich gesendet.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.success(request, f'SMTP-Testmail wurde gesendet ({mode}).') except Exception as exc: notify_user( user=request.user, title=_('SMTP-Test fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.error(request, _('SMTP-Testmail konnte nicht gesendet werden: %(error)s') % {'error': exc}) return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def nextcloud_test_upload(request): filename = f"nextcloud_test_{timezone.now().strftime('%Y%m%d_%H%M%S')}.txt" content = ( "Nextcloud test upload from onboarding/offboarding system.\n" f"Time: {timezone.now().isoformat()}\n" f"User: {request.user.username}\n" ) temp_path = None try: with NamedTemporaryFile('w', suffix='.txt', delete=False, encoding='utf-8') as tf: tf.write(content) temp_path = Path(tf.name) ok = upload_to_nextcloud(temp_path, filename) if ok: _audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'success'}) notify_user( user=request.user, title=_('Nextcloud-Test erfolgreich'), body=_('Der Testupload nach Nextcloud war erfolgreich.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.success(request, f'Nextcloud-Testupload erfolgreich: {filename}') else: _audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'error'}) notify_user( user=request.user, title=_('Nextcloud-Test fehlgeschlagen'), body=_('Der Testupload nach Nextcloud ist fehlgeschlagen.'), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.error(request, 'Nextcloud-Testupload fehlgeschlagen. Bitte Konfiguration prüfen.') except Exception as exc: notify_user( user=request.user, title=_('Nextcloud-Test fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.error(request, f'Nextcloud-Testupload fehlgeschlagen: {exc}') finally: if temp_path and temp_path.exists(): temp_path.unlink(missing_ok=True) return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def toggle_nextcloud_enabled(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') currently_enabled = is_nextcloud_enabled() config.nextcloud_enabled_override = not currently_enabled config.save(update_fields=['nextcloud_enabled_override']) _audit(request, 'nextcloud_mode_toggled', target_type='workflow_config', target_label='nextcloud', details={'enabled': config.nextcloud_enabled_override}) state = 'aktiviert' if config.nextcloud_enabled_override else 'deaktiviert' messages.success(request, f'Nextcloud Upload wurde {state}.') return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def toggle_email_mode(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') currently_test_mode = is_email_test_mode() config.email_test_mode_override = not currently_test_mode config.save(update_fields=['email_test_mode_override']) _audit(request, 'email_mode_toggled', target_type='workflow_config', target_label='email_mode', details={'test_mode': config.email_test_mode_override}) state = 'Testmodus (Umleitung)' if config.email_test_mode_override else 'Produktionsmodus' messages.success(request, f'E-Mail-Modus wurde auf {state} gesetzt.') return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def save_integrations_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60)) smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465)) except ValueError: messages.error(request, 'Ungültige Zahl bei Sync-Intervall oder SMTP Port.') return redirect('home') config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip() config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip() config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip() config.sync_interval_seconds = max(10, sync_interval) config.imap_server = request.POST.get('imap_server', '').strip() config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX' config.smtp_server = request.POST.get('smtp_server', '').strip() config.smtp_port = max(1, smtp_port) config.email_account = request.POST.get('email_account', '').strip() config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on' config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on' nextcloud_password = request.POST.get('nextcloud_password_override', '').strip() if nextcloud_password: config.nextcloud_password_override = nextcloud_password email_password = request.POST.get('email_password', '').strip() if email_password: config.email_password = email_password config.save() _audit(request, 'integrations_saved', target_type='workflow_config', target_label='all_integrations') messages.success(request, 'Integrations-Einstellungen wurden gespeichert.') return redirect('home') @_require_capability('manage_integrations') @require_POST def save_nextcloud_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60)) except ValueError: messages.error(request, 'Ungültige Zahl beim Sync-Intervall.') return redirect('home') config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip() config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip() config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip() config.sync_interval_seconds = max(10, sync_interval) nextcloud_password = request.POST.get('nextcloud_password_override', '').strip() if nextcloud_password: config.nextcloud_password_override = nextcloud_password config.save() _audit(request, 'nextcloud_settings_saved', target_type='workflow_config', target_label='nextcloud') messages.success(request, 'Nextcloud-Einstellungen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=nextcloud') @_require_capability('manage_integrations') @require_POST def save_workflow_rules(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: handover_lead_days = int( request.POST.get( 'device_handover_lead_days', config.device_handover_lead_days or 5, ) ) except ValueError: messages.error(request, 'Ungültige Zahl beim Hardware-Vorlauf.') return redirect('/admin-tools/integrations/?kind=rules') config.device_handover_lead_days = max(0, handover_lead_days) config.save(update_fields=['device_handover_lead_days']) _audit( request, 'workflow_rules_saved', target_type='workflow_config', target_label='workflow_rules', details={ 'device_handover_lead_days': config.device_handover_lead_days, }, ) messages.success(request, 'Workflow-Regeln wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=rules') @_require_capability('manage_integrations') @require_POST def save_backup_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') target_type = (request.POST.get('remote_backup_target_type') or config.remote_backup_target_type or 'nextcloud').strip().lower() if target_type not in {choice for choice, _ in WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES}: target_type = 'nextcloud' remote_backup_enabled = request.POST.get('remote_backup_enabled') == 'on' remote_backup_nextcloud_directory = request.POST.get('remote_backup_nextcloud_directory', '').strip() primary_nextcloud_directory = ( (config.nextcloud_directory_override or '').strip() or settings.NEXTCLOUD_DIRECTORY.strip() ).strip('/') if remote_backup_enabled and target_type == 'nextcloud': if not remote_backup_nextcloud_directory: messages.error(request, 'Bitte ein separates Nextcloud Backup-Verzeichnis angeben.') return redirect('/admin-tools/integrations/?kind=backup') if remote_backup_nextcloud_directory.strip('/') == primary_nextcloud_directory: messages.error(request, 'Das Backup-Verzeichnis muss vom normalen Nextcloud Dokumentenordner getrennt sein.') return redirect('/admin-tools/integrations/?kind=backup') config.remote_backup_enabled = remote_backup_enabled config.remote_backup_target_type = target_type config.remote_backup_nextcloud_directory = remote_backup_nextcloud_directory config.remote_backup_s3_bucket = request.POST.get('remote_backup_s3_bucket', '').strip() config.remote_backup_nfs_path = request.POST.get('remote_backup_nfs_path', '').strip() config.save( update_fields=[ 'device_handover_lead_days', 'remote_backup_enabled', 'remote_backup_target_type', 'remote_backup_nextcloud_directory', 'remote_backup_s3_bucket', 'remote_backup_nfs_path', ] ) _audit( request, 'backup_settings_saved', target_type='workflow_config', target_label='backup_settings', details={ 'remote_backup_enabled': config.remote_backup_enabled, 'remote_backup_target_type': config.remote_backup_target_type, 'remote_backup_nextcloud_directory': config.remote_backup_nextcloud_directory, }, ) messages.success(request, 'Backup-Einstellungen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=backup') @_require_capability('manage_integrations') @require_POST def save_mail_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465)) except ValueError: messages.error(request, 'Ungültige Zahl beim SMTP Port.') return redirect('home') config.imap_server = request.POST.get('imap_server', '').strip() config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX' config.smtp_server = request.POST.get('smtp_server', '').strip() config.smtp_port = max(1, smtp_port) config.email_account = request.POST.get('email_account', '').strip() config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on' config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on' email_password = request.POST.get('email_password', '').strip() if email_password: config.email_password = email_password config.save() smtp_cfg, _ = SystemEmailConfig.objects.get_or_create(name='Default SMTP') SystemEmailConfig.objects.exclude(id=smtp_cfg.id).update(is_active=False) smtp_cfg.is_active = True smtp_cfg.host = config.smtp_server smtp_cfg.port = config.smtp_port smtp_cfg.username = config.email_account if email_password: smtp_cfg.password = email_password smtp_cfg.use_ssl = config.smtp_use_ssl smtp_cfg.use_tls = config.smtp_use_tls smtp_cfg.from_email = request.POST.get('from_email', '').strip() smtp_cfg.save() _audit(request, 'mail_settings_saved', target_type='workflow_config', target_label='mail') messages.success(request, 'Mail-Einstellungen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=mail') @_require_capability('manage_integrations') @require_POST def save_email_routing_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') config.it_onboarding_email = request.POST.get('it_onboarding_email', '').strip() config.general_info_email = request.POST.get('general_info_email', '').strip() config.business_card_email = request.POST.get('business_card_email', '').strip() config.hr_works_email = request.POST.get('hr_works_email', '').strip() config.key_notification_email = request.POST.get('key_notification_email', '').strip() config.save( update_fields=[ 'it_onboarding_email', 'general_info_email', 'business_card_email', 'hr_works_email', 'key_notification_email', ] ) known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES} for key in known_keys: subject = request.POST.get(f'subject_{key}') body = request.POST.get(f'body_{key}') subject_en = request.POST.get(f'subject_en_{key}') body_en = request.POST.get(f'body_en_{key}') if subject is None and body is None and subject_en is None and body_en is None: continue subject = (subject or '').strip() body = (body or '').strip() subject_en = (subject_en or '').strip() body_en = (body_en or '').strip() if not subject and not body and not subject_en and not body_en: continue obj, _ = NotificationTemplate.objects.get_or_create( key=key, defaults={ 'subject_template': subject or f'[{key}]', 'body_template': body or '-', 'subject_template_en': subject_en, 'body_template_en': body_en, 'is_active': True, }, ) changed = [] if subject and obj.subject_template != subject: obj.subject_template = subject changed.append('subject_template') if body and obj.body_template != body: obj.body_template = body changed.append('body_template') if obj.subject_template_en != subject_en: obj.subject_template_en = subject_en changed.append('subject_template_en') if obj.body_template_en != body_en: obj.body_template_en = body_en changed.append('body_template_en') if not obj.is_active: obj.is_active = True changed.append('is_active') if changed: obj.save(update_fields=changed) _audit(request, 'email_routing_saved', target_type='workflow_config', target_label='email_routing') messages.success(request, 'E-Mail Routing und Vorlagen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=emails') @_require_capability('manage_integrations') @require_POST def save_notification_rules(request): rule_ids = request.POST.getlist('rule_ids') for position, raw_id in enumerate(rule_ids): rule = NotificationRule.objects.filter(id=raw_id).first() if not rule: continue if request.POST.get(f'delete_{rule.id}') == 'on': rule.delete() continue rule.name = request.POST.get(f'name_{rule.id}', '').strip() or rule.name event_type = request.POST.get(f'event_type_{rule.id}', '').strip() if event_type in {'onboarding', 'offboarding'}: rule.event_type = event_type operator = request.POST.get(f'operator_{rule.id}', '').strip() if operator in {x[0] for x in NotificationRule.OPERATOR_CHOICES}: rule.operator = operator rule.field_name = request.POST.get(f'field_name_{rule.id}', '').strip() rule.expected_value = request.POST.get(f'expected_value_{rule.id}', '').strip() rule.recipients = request.POST.get(f'recipients_{rule.id}', '').strip() rule.template_key = request.POST.get(f'template_key_{rule.id}', '').strip() rule.custom_subject = request.POST.get(f'custom_subject_{rule.id}', '').strip() rule.custom_body = request.POST.get(f'custom_body_{rule.id}', '').strip() rule.custom_subject_en = request.POST.get(f'custom_subject_en_{rule.id}', '').strip() rule.custom_body_en = request.POST.get(f'custom_body_en_{rule.id}', '').strip() rule.include_pdf_attachment = request.POST.get(f'include_pdf_{rule.id}') == 'on' rule.is_active = request.POST.get(f'active_{rule.id}') == 'on' rule.sort_order = position rule.save() new_name = request.POST.get('new_name', '').strip() new_recipients = request.POST.get('new_recipients', '').strip() if new_name and new_recipients: new_event = request.POST.get('new_event_type', 'onboarding').strip() if new_event not in {'onboarding', 'offboarding'}: new_event = 'onboarding' new_operator = request.POST.get('new_operator', 'always').strip() if new_operator not in {x[0] for x in NotificationRule.OPERATOR_CHOICES}: new_operator = 'always' NotificationRule.objects.create( name=new_name, event_type=new_event, field_name=request.POST.get('new_field_name', '').strip(), operator=new_operator, expected_value=request.POST.get('new_expected_value', '').strip(), recipients=new_recipients, template_key=request.POST.get('new_template_key', '').strip(), custom_subject=request.POST.get('new_custom_subject', '').strip(), custom_body=request.POST.get('new_custom_body', '').strip(), custom_subject_en=request.POST.get('new_custom_subject_en', '').strip(), custom_body_en=request.POST.get('new_custom_body_en', '').strip(), include_pdf_attachment=request.POST.get('new_include_pdf') == 'on', is_active=True, sort_order=NotificationRule.objects.filter(event_type=new_event).count() + 1, ) _audit(request, 'notification_rules_saved', target_type='notification_rule') messages.success(request, 'Benachrichtigungsregeln wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=emails') @_require_capability('delete_requests') @require_POST def delete_request_from_dashboard(request, kind: str, request_id: int): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') target_label = _request_target_label(obj, kind) obj.delete() _audit(request, 'request_deleted', target_type=kind, target_id=request_id, target_label=target_label) messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.') return redirect('requests_dashboard') @_require_capability('retry_requests') @require_POST def retry_request_from_dashboard(request, kind: str, request_id: int): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) obj.processing_status = 'submitted' obj.last_error = '' obj.save(update_fields=['processing_status', 'last_error']) process_onboarding_request.delay(obj.id) _audit(request, 'request_retried', target_type='onboarding', target_id=obj.id, target_label=_request_target_label(obj, 'onboarding')) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) obj.processing_status = 'submitted' obj.last_error = '' obj.save(update_fields=['processing_status', 'last_error']) process_offboarding_request.delay(obj.id) _audit(request, 'request_retried', target_type='offboarding', target_id=obj.id, target_label=_request_target_label(obj, 'offboarding')) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde erneut angestoßen.') return redirect('requests_dashboard')