from pathlib import Path from datetime import timedelta from tempfile import NamedTemporaryFile import json from io import BytesIO from functools import wraps from celery import current_app from django.conf import settings from django.db import connection from django.db import IntegrityError from django.db.models import Q from django.shortcuts import get_object_or_404, redirect, render from django.contrib import messages from django.contrib.auth import get_user_model, login as auth_login from django.contrib.auth.decorators import login_required from django.contrib.auth.tokens import default_token_generator from django.http import JsonResponse from django.views.decorators.http import require_POST from django.views.decorators.csrf import ensure_csrf_cookie from django.utils import timezone from django.utils.encoding import force_bytes from django.utils.http import urlsafe_base64_encode from django.utils.translation import gettext as _, gettext_lazy from django.utils.translation import get_language, override from django.urls import reverse from .app_registry import build_portal_app_sections, get_portal_app_registry_rows, normalize_portal_app_sort_orders from .backup_ops import ( create_backup_bundle, delete_backup_bundle, latest_backup_health_snapshot, list_backup_bundles, verify_backup_bundle, ) from .branding import get_branding_email_copy, get_company_email_domain, get_default_notification_templates, get_portal_trial_config, is_trial_expired from .forms import AccountAvatarForm, AccountDetailsForm, AccountNotificationPreferencesForm, AccountTOTPDisableForm, AccountTOTPEnableForm, AccountTOTPRegenerateRecoveryCodesForm, AppLoginForm, AppTOTPChallengeForm, OffboardingRequestForm, OnboardingRequestForm, PortalBrandingForm, PortalCompanyConfigForm, PortalTrialConfigForm, UserManagementCreateForm from .form_builder import ( DEFAULT_FIELD_ORDER, DEFAULT_CONDITIONAL_RULES, FORM_PRESETS, LOCKED_FIELD_RULES, LOCKED_SECTION_RULES, OFFBOARDING_PAGE_LABELS, OFFBOARDING_PAGE_ORDER, ONBOARDING_DEFAULT_PAGE, ONBOARDING_PAGE_LABELS, ONBOARDING_PAGE_ORDER, ensure_form_field_configs, ensure_form_conditional_rule_configs, ensure_form_section_configs, get_default_page_map, get_section_labels, get_section_order, apply_form_preset, ) from .models import AdminAuditLog, AsyncTaskLog, EmployeeProfile, FormConditionalRuleConfig, FormFieldConfig, FormOption, FormSectionConfig, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, PortalAppConfig, PortalBranding, PortalCompanyConfig, PortalTrialConfig, ScheduledWelcomeEmail, SystemEmailConfig, UserNotification, UserProfile, WorkflowConfig from .emailing import send_system_email from .notifications import notify_user from .roles import ROLE_GROUP_NAMES, ROLE_LABELS, ROLE_PLATFORM_OWNER, ROLE_SUPER_ADMIN, assign_user_role, get_user_role_key, get_user_role_label, user_has_capability from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud from .totp import build_otpauth_uri, generate_recovery_codes, generate_totp_secret from .tasks import ( _generate_onboarding_intro_pdf, _generate_onboarding_intro_session_pdf, build_intro_sections_for_request, process_offboarding_request, process_onboarding_request, send_scheduled_welcome_email, ) def _redirect_back(request, fallback: str): target = (request.POST.get('next') or request.GET.get('next') or '').strip() if target.startswith('/'): return redirect(target) referer = (request.META.get('HTTP_REFERER') or '').strip() if referer.startswith('http://127.0.0.1') or referer.startswith('http://localhost') or referer.startswith('/'): return redirect(referer) return redirect(fallback) @login_required @require_POST def mark_notification_read(request, notification_id: int): notification = get_object_or_404(UserNotification, id=notification_id, user=request.user) notification.mark_read() return _redirect_back(request, 'home') @login_required @require_POST def mark_all_notifications_read(request): UserNotification.objects.filter(user=request.user, read_at__isnull=True).update(read_at=timezone.now()) return _redirect_back(request, 'home') ONBOARDING_GROUPS = { 'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'], 'employment-end-box': ['employment_end_date'], 'group-mailboxes-box': ['group_mailboxes'], 'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'], 'extra-software-box': ['additional_software_multi', 'additional_software'], 'extra-access-box': ['additional_access_text'], 'successor-box': ['successor_name', 'inherit_phone_number_choice'], 'phone-box': ['phone_number_choice'], } ONBOARDING_HIDDEN_BY_DEFAULT = set(DEFAULT_CONDITIONAL_RULES.get('onboarding', {}).keys()) ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'} ONBOARDING_CHECKBOX_LISTS = { 'needed_devices_multi', 'additional_hardware_multi', 'needed_software_multi', 'additional_software_multi', 'needed_accesses_multi', 'needed_workspace_groups_multi', 'needed_resources_multi', } ONBOARDING_SECTION_ORDER = ['stammdaten', 'vertrag', 'itsetup', 'abschluss'] ONBOARDING_SECTION_META = { 'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')}, 'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')}, 'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')}, 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')}, } CONDITIONAL_RULE_OPERATOR_CHOICES = [ ('checked', _('ist aktiviert')), ('equals', _('ist gleich')), ('not_equals', _('ist nicht gleich')), ] def _normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]: configs = ensure_form_conditional_rule_configs(form_type) payload = {} for target_key, cfg in configs.items(): if not cfg.is_active: continue clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')] if clauses: payload[target_key] = {'all': clauses} return payload def healthz(request): db_ok = True try: with connection.cursor() as cursor: cursor.execute('SELECT 1') cursor.fetchone() except Exception: db_ok = False status_code = 200 if db_ok else 503 return JsonResponse( { 'status': 'ok' if db_ok else 'degraded', 'service': 'workdock', 'db': 'ok' if db_ok else 'error', 'time': timezone.now().isoformat(), }, status=status_code, ) def login_page(request): if getattr(request.user, 'is_authenticated', False): return redirect('home') next_target = (request.POST.get('next') or request.GET.get('next') or '').strip() form = AppLoginForm(request=request, data=request.POST or None) if request.method == 'POST' and form.is_valid(): user = form.get_user() profile, _ = UserProfile.objects.get_or_create(user=user) safe_next = next_target if next_target.startswith('/') else '' if profile.totp_enabled: request.session['login_pending_user_id'] = user.pk request.session['login_pending_backend'] = getattr(user, 'backend', '') request.session['login_pending_next'] = safe_next return redirect('login_totp') auth_login(request, user, backend=getattr(user, 'backend', None)) now_ts = int(timezone.now().timestamp()) request.session['auth_fresh_ts'] = now_ts request.session['last_activity_ts'] = now_ts return redirect(safe_next or reverse('home')) request.session.pop('login_pending_user_id', None) request.session.pop('login_pending_backend', None) request.session.pop('login_pending_next', None) return render( request, 'workflows/auth/login.html', { 'form': form, 'next': next_target, 'login_step': 'password', }, ) def login_totp_page(request): if getattr(request.user, 'is_authenticated', False): return redirect('home') pending_user_id = request.session.get('login_pending_user_id') backend_path = (request.session.get('login_pending_backend') or '').strip() next_target = (request.session.get('login_pending_next') or '').strip() if not pending_user_id: return redirect('login') user = get_object_or_404(get_user_model(), pk=pending_user_id) profile, _ = UserProfile.objects.get_or_create(user=user) if not profile.totp_enabled: request.session.pop('login_pending_user_id', None) request.session.pop('login_pending_backend', None) request.session.pop('login_pending_next', None) return redirect('login') show_recovery = request.method == 'POST' and bool((request.POST.get('recovery_code') or '').strip()) form = AppTOTPChallengeForm(data=request.POST or None, profile=profile) if request.method == 'POST' and form.is_valid(): auth_login(request, user, backend=backend_path or 'django.contrib.auth.backends.ModelBackend') request.session.pop('login_pending_user_id', None) request.session.pop('login_pending_backend', None) request.session.pop('login_pending_next', None) now_ts = int(timezone.now().timestamp()) request.session['auth_fresh_ts'] = now_ts request.session['last_activity_ts'] = now_ts return redirect(next_target if next_target.startswith('/') else reverse('home')) return render( request, 'workflows/auth/login.html', { 'form': form, 'next': next_target, 'login_step': 'totp', 'login_totp_user': user, 'show_recovery_code': show_recovery, }, ) @login_required def account_profile_page(request): session_secret_key = 'account_totp_pending_secret' session_codes_key = 'account_totp_recovery_codes' profile, created = UserProfile.objects.get_or_create(user=request.user) recovery_codes = request.session.pop(session_codes_key, []) pending_totp_secret = request.session.get(session_secret_key) or '' if profile.totp_enabled: pending_totp_secret = '' request.session.pop(session_secret_key, None) elif not pending_totp_secret: pending_totp_secret = generate_totp_secret() request.session[session_secret_key] = pending_totp_secret avatar_form = AccountAvatarForm(instance=profile) details_form = AccountDetailsForm(user=request.user, profile=profile) notification_preferences_form = AccountNotificationPreferencesForm(profile=profile, user=request.user) totp_enable_form = AccountTOTPEnableForm(user=request.user, secret=pending_totp_secret) totp_disable_form = AccountTOTPDisableForm(user=request.user, profile=profile) totp_regenerate_form = AccountTOTPRegenerateRecoveryCodesForm(user=request.user, profile=profile) account_edit_open = False notifications_edit_open = False totp_edit_open = False if request.method == 'POST': form_kind = (request.POST.get('account_form') or '').strip() if form_kind == 'avatar': avatar_form = AccountAvatarForm(request.POST, request.FILES, instance=profile) if avatar_form.is_valid(): avatar_form.save() messages.success(request, _('Profilbild gespeichert.')) return redirect('account_profile_page') messages.error(request, _('Profilbild konnte nicht gespeichert werden.')) elif form_kind == 'details': account_edit_open = True details_form = AccountDetailsForm(request.POST, user=request.user, profile=profile) if details_form.is_valid(): details_form.save() messages.success(request, _('Profildaten gespeichert.')) return redirect('account_profile_page') messages.error(request, _('Profildaten konnten nicht gespeichert werden.')) elif form_kind == 'notification_preferences': notifications_edit_open = True notification_preferences_form = AccountNotificationPreferencesForm(request.POST, profile=profile, user=request.user) if notification_preferences_form.is_valid(): notification_preferences_form.save() messages.success(request, _('Benachrichtigungseinstellungen gespeichert.')) return redirect('account_profile_page') messages.error(request, _('Benachrichtigungseinstellungen konnten nicht gespeichert werden.')) elif form_kind == 'totp_enable': totp_edit_open = True totp_enable_form = AccountTOTPEnableForm(request.POST, user=request.user, secret=pending_totp_secret) if totp_enable_form.is_valid(): recovery_codes = generate_recovery_codes() profile.enable_totp(pending_totp_secret, recovery_codes) request.session[session_codes_key] = recovery_codes request.session.pop(session_secret_key, None) messages.success(request, _('TOTP wurde aktiviert.')) return redirect('account_profile_page') messages.error(request, _('TOTP konnte nicht aktiviert werden.')) elif form_kind == 'totp_disable': totp_edit_open = True totp_disable_form = AccountTOTPDisableForm(request.POST, user=request.user, profile=profile) if totp_disable_form.is_valid(): profile.disable_totp() request.session.pop(session_secret_key, None) messages.success(request, _('TOTP wurde deaktiviert.')) return redirect('account_profile_page') messages.error(request, _('TOTP konnte nicht deaktiviert werden.')) elif form_kind == 'totp_regenerate_codes': totp_edit_open = True totp_regenerate_form = AccountTOTPRegenerateRecoveryCodesForm(request.POST, user=request.user, profile=profile) if totp_regenerate_form.is_valid(): recovery_codes = generate_recovery_codes() profile.set_recovery_codes(recovery_codes) profile.save(update_fields=['totp_recovery_codes', 'updated_at']) request.session[session_codes_key] = recovery_codes messages.success(request, _('Recovery-Codes wurden neu erzeugt.')) return redirect('account_profile_page') messages.error(request, _('Recovery-Codes konnten nicht neu erzeugt werden.')) branding_context = get_branding_email_copy() totp_account_name = (request.user.email or request.user.username or '').strip() totp_issuer = (branding_context.get('portal_title') or branding_context.get('company_name') or 'Workdock').strip() totp_otpauth_uri = '' if profile.totp_enabled else build_otpauth_uri(pending_totp_secret, account_name=totp_account_name, issuer=totp_issuer) totp_qr_svg = '' if totp_otpauth_uri: try: import qrcode import qrcode.image.svg qr_image = qrcode.make(totp_otpauth_uri, image_factory=qrcode.image.svg.SvgPathImage) stream = BytesIO() qr_image.save(stream) totp_qr_svg = stream.getvalue().decode('utf-8') except Exception: totp_qr_svg = '' return render( request, 'workflows/account_profile.html', { 'account_user': request.user, 'account_user_profile': profile, 'avatar_form': avatar_form, 'details_form': details_form, 'notification_preferences_form': notification_preferences_form, 'notification_preference_groups': notification_preferences_form.grouped_fields(), 'totp_enable_form': totp_enable_form, 'totp_disable_form': totp_disable_form, 'totp_regenerate_form': totp_regenerate_form, 'account_edit_open': account_edit_open, 'notifications_edit_open': notifications_edit_open, 'totp_edit_open': totp_edit_open, 'role_label': get_user_role_label(request.user), 'totp_pending_secret': pending_totp_secret, 'totp_otpauth_uri': totp_otpauth_uri, 'totp_qr_svg': totp_qr_svg, 'totp_recovery_codes': recovery_codes, }, ) def _require_capability(capability: str): def decorator(view_func): @wraps(view_func) @login_required def wrapped(request, *args, **kwargs): if not user_has_capability(request.user, capability): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('home') return view_func(request, *args, **kwargs) return wrapped return decorator def _display_user_name(user) -> str: first_name = (getattr(user, 'first_name', '') or '').strip() last_name = (getattr(user, 'last_name', '') or '').strip() full_name = f'{first_name} {last_name}'.strip() if full_name: return full_name username = (getattr(user, 'username', '') or '').strip() if username: return username return (getattr(user, 'email', '') or '').strip() def _audit( request, action: str, *, target_type: str = '', target_id: int | None = None, target_label: str = '', details: dict | None = None, ) -> None: if not getattr(request, 'user', None) or not request.user.is_authenticated: return AdminAuditLog.objects.create( actor=request.user, actor_display=_display_user_name(request.user), action=action, target_type=target_type, target_id=target_id, target_label=target_label, details=details or {}, ) def _form_field_labels(form_type: str) -> dict[str, str]: if form_type == 'onboarding': return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()} if form_type == 'offboarding': return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()} return {} def _request_target_label(obj, kind: str | None = None) -> str: request_kind = (kind or '').strip() if not request_kind: request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding' name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}' email = (getattr(obj, 'work_email', '') or '').strip() created_at = getattr(obj, 'created_at', None) date_label = created_at.strftime('%Y-%m-%d') if created_at else '' parts = [request_kind.capitalize(), name] if email: parts.append(f'<{email}>') if date_label: parts.append(date_label) return ' | '.join(parts) def _request_status_label(status_key: str, language_code: str | None = None) -> str: lang = ((language_code or 'de').split('-')[0] or 'de').lower() with override(lang): labels = { 'submitted': _('Eingereicht'), 'processing': _('In Bearbeitung'), 'completed': _('Abgeschlossen'), 'failed': _('Fehlgeschlagen'), } return labels.get(status_key, status_key) def _audit_action_label(action: str) -> str: labels = { 'requests_deleted': _('Vorgänge gelöscht'), 'request_deleted': _('Vorgang gelöscht'), 'request_retried': _('Vorgang erneut angestoßen'), 'intro_pdf_generated': _('Einweisungs-PDF erzeugt'), 'intro_live_pdf_generated': _('Live-Protokoll erzeugt'), 'intro_session_reset': _('Einweisung zurückgesetzt'), 'intro_session_saved': _('Einweisung als Entwurf gespeichert'), 'intro_session_completed': _('Einweisung abgeschlossen'), 'form_option_deleted': _('Formularoption gelöscht'), 'form_options_saved': _('Formularoptionen gespeichert'), 'form_field_texts_saved': _('Feldtexte gespeichert'), 'form_layout_saved': _('Formularlayout gespeichert'), 'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'), 'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'), 'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'), 'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'), 'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'), 'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'), 'welcome_email_paused': _('Welcome E-Mail pausiert'), 'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'), 'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'), 'smtp_test_sent': _('SMTP-Test gesendet'), 'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'), 'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'), 'email_mode_toggled': _('E-Mail-Modus umgeschaltet'), 'integrations_saved': _('Integrationen gespeichert'), 'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'), 'mail_settings_saved': _('Mail-Einstellungen gespeichert'), 'email_routing_saved': _('E-Mail-Routing gespeichert'), 'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'), 'user_created': _('Benutzer erstellt'), 'user_updated': _('Benutzer aktualisiert'), 'user_password_reset_sent': _('Passwort-Reset-Link versendet'), 'user_deleted': _('Benutzer gelöscht'), 'backup_created': _('Backup erstellt'), 'backup_verified': _('Backup verifiziert'), 'backup_deleted': _('Backup gelöscht'), 'backup_settings_saved': _('Backup-Einstellungen gespeichert'), 'portal_app_registry_saved': _('App-Registry gespeichert'), } return labels.get(action, action.replace('_', ' ').strip().capitalize()) def _translate_choice_list(choices): return [(value, str(label)) for value, label in choices] def _build_onboarding_layout(form) -> list[dict]: ordered_names = list(form.fields.keys()) group_by_field = {} for group_id, group_fields in ONBOARDING_GROUPS.items(): for name in group_fields: group_by_field[name] = group_id rendered_groups = set() consumed = set() blocks = [] for field_name in ordered_names: if field_name in consumed: continue group_id = group_by_field.get(field_name) if group_id: if group_id in rendered_groups: continue group_fields = [ form[name] for name in ONBOARDING_GROUPS[group_id] if name in form.fields ] if not group_fields: continue blocks.append( { 'kind': 'group', 'id': group_id, 'hidden_default': group_id in ONBOARDING_HIDDEN_BY_DEFAULT, 'fields': group_fields, } ) rendered_groups.add(group_id) consumed.update([f.name for f in group_fields]) continue blocks.append({'kind': 'field', 'field': form[field_name]}) consumed.add(field_name) return blocks def _section_for_block(block: dict, field_pages: dict[str, str]) -> str: if block['kind'] == 'field': return field_pages.get(block['field'].name, 'abschluss') fields = block.get('fields') or [] if not fields: return 'abschluss' return field_pages.get(fields[0].name, 'abschluss') def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]: grouped = {key: [] for key in ONBOARDING_SECTION_ORDER} for block in blocks: section_key = _section_for_block(block, field_pages) if section_key not in grouped: section_key = 'abschluss' grouped[section_key].append(block) visible_keys = visible_section_keys or set(ONBOARDING_SECTION_ORDER) return [ { 'key': key, 'title': ONBOARDING_SECTION_META[key]['title'], 'subtitle': ONBOARDING_SECTION_META[key]['subtitle'], 'blocks': grouped[key], } for key in ONBOARDING_SECTION_ORDER if key in visible_keys ] OFFBOARDING_SECTION_META = { 'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')}, 'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')}, 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')}, } def _build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]: field_pages = getattr(form, '_field_page_keys', {}) grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER} for field_name in form.fields.keys(): section_key = field_pages.get(field_name, 'abschluss') if section_key not in grouped: section_key = 'abschluss' grouped[section_key].append(form[field_name]) visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER) return [ { 'key': key, 'title': OFFBOARDING_SECTION_META[key]['title'], 'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'], 'fields': grouped[key], } for key in OFFBOARDING_PAGE_ORDER if key in visible_keys and grouped[key] ] @login_required def home(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') role_key = get_user_role_key(request.user) return render( request, 'workflows/home.html', { 'nextcloud_enabled': is_nextcloud_enabled(), 'email_test_mode': is_email_test_mode(), 'workflow_config': config, 'role_label': get_user_role_label(request.user), 'role_key': role_key, 'portal_app_sections': build_portal_app_sections(request.user), }, ) @_require_capability('manage_app_registry') def portal_app_registry_page(request): return render( request, 'workflows/app_registry.html', { 'rows': get_portal_app_registry_rows(), 'section_choices': _translate_choice_list(PortalAppConfig.SECTION_CHOICES), }, ) @_require_capability('view_job_monitor') def job_monitor_page(request): status_filter = (request.GET.get('status') or '').strip() task_filter = (request.GET.get('task') or '').strip() logs = AsyncTaskLog.objects.all() if status_filter: logs = logs.filter(status=status_filter) if task_filter: logs = logs.filter(task_name=task_filter) logs = logs.order_by('-started_at', '-id')[:200] task_names = list(AsyncTaskLog.objects.order_by('task_name').values_list('task_name', flat=True).distinct()) return render( request, 'workflows/job_monitor.html', { 'logs': logs, 'status_filter': status_filter, 'task_filter': task_filter, 'task_names': task_names, 'status_choices': [('started', _('Gestartet')), ('succeeded', _('Erfolgreich')), ('failed', _('Fehlgeschlagen'))], }, ) @_require_capability('manage_app_registry') @require_POST def save_portal_app_registry(request): rows = get_portal_app_registry_rows() updated_configs = [] for row in rows: config = row['config'] key = config.key config.section = (request.POST.get(f'section__{key}') or config.section).strip() if config.section not in dict(PortalAppConfig.SECTION_CHOICES): config.section = row['default_section'] config.is_enabled = request.POST.get(f'is_enabled__{key}') == 'on' config.visible_to_super_admin = request.POST.get(f'visible_to_super_admin__{key}') == 'on' config.visible_to_admin = request.POST.get(f'visible_to_admin__{key}') == 'on' config.visible_to_it_staff = request.POST.get(f'visible_to_it_staff__{key}') == 'on' config.visible_to_staff = request.POST.get(f'visible_to_staff__{key}') == 'on' try: config.sort_order = int((request.POST.get(f'sort_order__{key}') or '').strip() or row['default_sort_order']) except ValueError: config.sort_order = row['default_sort_order'] config.title_override = (request.POST.get(f'title_override__{key}') or '').strip() config.title_override_en = (request.POST.get(f'title_override_en__{key}') or '').strip() config.description_override = (request.POST.get(f'description_override__{key}') or '').strip() config.description_override_en = (request.POST.get(f'description_override_en__{key}') or '').strip() config.action_label_override = (request.POST.get(f'action_label_override__{key}') or '').strip() config.action_label_override_en = (request.POST.get(f'action_label_override_en__{key}') or '').strip() config.save() updated_configs.append(config) normalize_portal_app_sort_orders() _audit( request, 'portal_app_registry_saved', target_type='portal_app_registry', target_label='Portal App Registry', details={'updated_apps': len(rows)}, ) messages.success(request, _('App-Registry gespeichert.')) return redirect('portal_app_registry_page') def _user_management_rows(): user_model = get_user_model() role_order = { ROLE_PLATFORM_OWNER: 0, ROLE_SUPER_ADMIN: 0, 'admin': 1, 'it_staff': 2, 'staff': 3, } rows = [] for user in user_model.objects.all().order_by('-is_active', 'username'): role_key = get_user_role_key(user) rows.append( { 'user': user, 'role_key': role_key, 'role_label': str(ROLE_LABELS[role_key]), 'role_sort': role_order.get(role_key, 99), 'display_name': _display_user_name(user), } ) rows.sort(key=lambda item: (not item['user'].is_active, item['role_sort'], item['user'].username.lower())) return rows def _render_user_management(request, create_form=None, status_code: int = 200): recent_user_events = list( AdminAuditLog.objects.select_related('actor') .filter(action__in=['user_created', 'user_updated', 'user_password_reset_sent', 'user_deleted']) .order_by('-created_at', '-id')[:12] ) for row in recent_user_events: row.action_label = _audit_action_label(row.action) role_key = (row.details or {}).get('role') row.role_label = str(ROLE_LABELS[role_key]) if role_key in ROLE_LABELS else role_key include_product_owner = get_user_role_key(request.user) == ROLE_PLATFORM_OWNER return render( request, 'workflows/user_management.html', { 'create_form': create_form or UserManagementCreateForm(include_product_owner=include_product_owner), 'rows': _user_management_rows(), 'role_choices': [ (key, str(ROLE_LABELS[key])) for key in ROLE_GROUP_NAMES if include_product_owner or key != ROLE_PLATFORM_OWNER ], 'include_product_owner': include_product_owner, 'recent_user_events': recent_user_events, }, status=status_code, ) def _platform_owner_user_count() -> int: user_model = get_user_model() return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_PLATFORM_OWNER and user.is_active) def _super_admin_user_count() -> int: user_model = get_user_model() return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_SUPER_ADMIN and user.is_active) def _would_remove_last_super_admin(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool: if get_user_role_key(user) != ROLE_SUPER_ADMIN or not user.is_active: return False if _super_admin_user_count() > 1: return False if deleting: return True if new_role_key is not None and new_role_key != ROLE_SUPER_ADMIN: return True if new_is_active is not None and not new_is_active: return True return False def _would_remove_last_platform_owner(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool: if get_user_role_key(user) != ROLE_PLATFORM_OWNER or not user.is_active: return False if _platform_owner_user_count() > 1: return False if deleting: return True if new_role_key is not None and new_role_key != ROLE_PLATFORM_OWNER: return True if new_is_active is not None and not new_is_active: return True return False def _send_user_access_email(request, target_user, *, invitation: bool) -> None: email = (target_user.email or '').strip() if not email: raise ValueError(_('Für diesen Benutzer ist keine E-Mail-Adresse hinterlegt.')) uid = urlsafe_base64_encode(force_bytes(target_user.pk)) token = default_token_generator.make_token(target_user) reset_path = reverse('password_reset_confirm', kwargs={'uidb64': uid, 'token': token}) reset_url = request.build_absolute_uri(reset_path) branding_copy = get_branding_email_copy() if invitation: subject = _('Zugangseinladung für %(username)s') % {'username': target_user.username} body = _( 'Hallo %(name)s,\n\n' 'für Sie wurde ein Benutzerkonto im %(portal_title)s angelegt.\n' 'Bitte öffnen Sie den folgenden Link, um Ihr Passwort zu setzen:\n' '%(url)s\n\n' 'Wenn Sie diese Einladung nicht erwartet haben, melden Sie sich bitte bei Ihrem Administrator.' ) % { 'name': _display_user_name(target_user), 'portal_title': branding_copy['portal_title'], 'url': reset_url, } else: subject = _('Passwort zurücksetzen für %(username)s') % {'username': target_user.username} body = _( 'Hallo %(name)s,\n\n' 'für Ihr Konto wurde ein Link zum Zurücksetzen des Passworts erstellt.\n' 'Bitte öffnen Sie den folgenden Link:\n' '%(url)s\n\n' 'Wenn Sie diese Anfrage nicht erwartet haben, können Sie diese E-Mail ignorieren.' ) % { 'name': _display_user_name(target_user), 'url': reset_url, } send_system_email(subject=subject, body=body, to=[email]) @_require_capability('manage_users') def user_management_page(request): return _render_user_management(request) @_require_capability('manage_product_branding') def portal_branding_page(request): branding, created = PortalBranding.objects.get_or_create(name='Default') form = PortalBrandingForm(instance=branding) return render( request, 'workflows/branding_settings.html', { 'form': form, 'branding': branding, 'branding_sections': _build_branding_sections(form, branding), 'editing_branding_section': '', }, ) @_require_capability('manage_product_branding') @require_POST def save_portal_branding(request): branding, created = PortalBranding.objects.get_or_create(name='Default') section_key = (request.POST.get('section_key') or '').strip() data = request.POST.copy() for field_name in PortalBrandingForm.Meta.fields: if field_name not in data: field = PortalBranding._meta.get_field(field_name) if getattr(field, 'many_to_many', False): continue if getattr(field, 'null', False) and getattr(branding, field_name, None) is None: data[field_name] = '' else: data[field_name] = getattr(branding, field_name, '') or '' form = PortalBrandingForm(data, request.FILES, instance=branding) if not form.is_valid(): messages.error(request, _('Branding konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/branding_settings.html', { 'form': form, 'branding': branding, 'branding_sections': _build_branding_sections(form, branding), 'editing_branding_section': section_key, }, status=400, ) branding = form.save() _audit( request, 'portal_branding_saved', target_type='portal_branding', target_id=branding.id, target_label=branding.portal_title, details={ 'company_name': branding.company_name, 'support_email': branding.support_email, 'default_language': branding.default_language, 'has_custom_logo': bool(branding.logo_image), 'has_custom_letterhead': bool(branding.pdf_letterhead), }, ) messages.success(request, _('Portal-Branding wurde gespeichert.')) return render( request, 'workflows/branding_settings.html', { 'form': PortalBrandingForm(instance=branding), 'branding': branding, 'branding_sections': _build_branding_sections(PortalBrandingForm(instance=branding), branding), 'editing_branding_section': '', }, ) def _build_branding_sections(form, branding): sections = [ { 'key': 'identity', 'title': _('Identität'), 'subtitle': _('Titel, Firmenname und zentrale Spracheinstellungen.'), 'fields': ['portal_title', 'company_name', 'company_domain', 'default_language', 'login_subtitle'], 'field_full': {'login_subtitle'}, 'hint_map': { 'company_domain': _('Wird für E-Mail-Vorschläge und Domain-bezogene Standardtexte verwendet, z. B. workdock.de.'), }, }, { 'key': 'appearance', 'title': _('Farben & Erscheinungsbild'), 'subtitle': _('Zentrale visuelle Markenwerte und Browser-Icon.'), 'fields': ['primary_color', 'secondary_color', 'logo_image', 'favicon_image'], 'field_full': set(), 'hint_map': { 'logo_image': _('Erlaubte Formate: SVG, PNG, JPG, JPEG, WEBP. Maximal 5 MB.'), 'favicon_image': _('Erlaubte Formate: ICO, PNG, SVG, WEBP. Maximal 2 MB.'), }, }, { 'key': 'communication', 'title': _('Kommunikation'), 'subtitle': _('Absender, Support und PDF-Branding für ausgehende Kommunikation.'), 'fields': ['support_email', 'sender_display_name', 'pdf_letterhead'], 'field_full': {'pdf_letterhead'}, 'hint_map': { 'sender_display_name': _('Wird für ausgehende System-E-Mails als Anzeigename verwendet.'), 'pdf_letterhead': _('Erlaubtes Format: PDF. Maximal 10 MB.'), }, }, { 'key': 'legal', 'title': _('Footer & Rechtliches'), 'subtitle': _('Gemeinsame Footer-Texte und rechtliche Hinweise für die Shell.'), 'fields': ['footer_text', 'legal_notice', 'footer_text_en', 'legal_notice_en'], 'field_full': {'legal_notice', 'legal_notice_en'}, 'hint_map': {}, }, ] for section in sections: rows = [] for field_name in section['fields']: field = form[field_name] value = getattr(branding, field_name, '') or '' is_file = bool(getattr(field.field.widget, 'input_type', '') == 'file') rows.append( { 'name': field_name, 'bound_field': field, 'label': field.label, 'value': value, 'is_file': is_file, 'is_full': field_name in section.get('field_full', set()), 'hint': section.get('hint_map', {}).get(field_name, ''), } ) section['rows'] = rows return sections @_require_capability('manage_company_config') def portal_company_config_page(request): company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default') form = PortalCompanyConfigForm(instance=company_config) return render( request, 'workflows/company_config.html', { 'form': form, 'company_config': company_config, 'company_config_sections': _build_company_config_sections(form, company_config), 'editing_company_section': '', }, ) @_require_capability('manage_company_config') @require_POST def save_portal_company_config(request): company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default') section_key = (request.POST.get('section_key') or '').strip() data = request.POST.copy() for field_name in PortalCompanyConfigForm.Meta.fields: if field_name not in data: data[field_name] = getattr(company_config, field_name, '') or '' form = PortalCompanyConfigForm(data, instance=company_config) if not form.is_valid(): messages.error(request, _('Firmenkonfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/company_config.html', { 'form': form, 'company_config': company_config, 'company_config_sections': _build_company_config_sections(form, company_config), 'editing_company_section': section_key, }, status=400, ) company_config = form.save() _audit( request, 'portal_company_config_saved', target_type='portal_company_config', target_id=company_config.id, target_label=company_config.legal_company_name or 'Default', details={ 'website_url': company_config.website_url, 'imprint_url': company_config.imprint_url, 'privacy_url': company_config.privacy_url, 'hr_contact_email': company_config.hr_contact_email, 'it_contact_email': company_config.it_contact_email, 'operations_contact_email': company_config.operations_contact_email, }, ) messages.success(request, _('Firmenkonfiguration wurde gespeichert.')) return render( request, 'workflows/company_config.html', { 'form': PortalCompanyConfigForm(instance=company_config), 'company_config': company_config, 'company_config_sections': _build_company_config_sections(PortalCompanyConfigForm(instance=company_config), company_config), 'editing_company_section': '', }, ) def _build_company_config_sections(form, company_config): sections = [ { 'key': 'profile', 'title': _('Firmenprofil'), 'subtitle': _('Rechtlicher Name und zentrale Stammdaten der Firma.'), 'fields': ['legal_company_name', 'phone_number', 'website_url', 'country'], }, { 'key': 'address', 'title': _('Adresse & Register'), 'subtitle': _('Anschrift sowie optionale Register- und Steuerangaben.'), 'fields': ['street_address', 'postal_code', 'city', 'registration_number', 'vat_id'], }, { 'key': 'contacts', 'title': _('Kontaktpunkte'), 'subtitle': _('Zentrale Ansprechpartner für HR, IT und Operations.'), 'fields': ['hr_contact_email', 'it_contact_email', 'operations_contact_email'], }, { 'key': 'public', 'title': _('Recht & Öffentlichkeit'), 'subtitle': _('Öffentliche Links für Website, Impressum und Datenschutz.'), 'fields': ['imprint_url', 'privacy_url'], 'hint': _('Diese Links können später im Portal-Footer oder in öffentlichen Seiten verwendet werden.'), }, ] for section in sections: rows = [] for field_name in section['fields']: field = form[field_name] rows.append( { 'name': field_name, 'bound_field': field, 'label': field.label, 'value': getattr(company_config, field_name, '') or '', } ) section['rows'] = rows return sections @_require_capability('manage_trial_lifecycle') def portal_trial_config_page(request): trial_config = get_portal_trial_config() form = PortalTrialConfigForm(instance=trial_config) return render( request, 'workflows/trial_management.html', { 'form': form, 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, ) @_require_capability('manage_trial_lifecycle') @require_POST def save_portal_trial_config(request): trial_config = get_portal_trial_config() form = PortalTrialConfigForm(request.POST, instance=trial_config) if not form.is_valid(): messages.error(request, _('Trial-Konfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/trial_management.html', { 'form': form, 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, status=400, ) trial_config = form.save() _audit( request, 'portal_trial_config_saved', target_type='portal_trial_config', target_id=trial_config.id, target_label='Default', details={ 'is_trial_mode': trial_config.is_trial_mode, 'trial_started_at': trial_config.trial_started_at.isoformat() if trial_config.trial_started_at else '', 'trial_expires_at': trial_config.trial_expires_at.isoformat() if trial_config.trial_expires_at else '', 'restrict_production_integrations': trial_config.restrict_production_integrations, 'auto_cleanup_enabled': trial_config.auto_cleanup_enabled, }, ) if trial_config.is_trial_mode and trial_config.trial_expires_at: remaining = trial_config.trial_expires_at - timezone.now() if remaining.total_seconds() <= 0: notify_user( user=request.user, title=_('Trial ist abgelaufen'), body=_('Der Trial-Zeitraum ist überschritten. Nicht-Platform-Owner werden jetzt blockiert.'), level=UserNotification.LEVEL_WARNING, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) elif remaining <= timedelta(days=7): notify_user( user=request.user, title=_('Trial läuft bald ab'), body=_('Der Trial endet am %(date)s.') % {'date': timezone.localtime(trial_config.trial_expires_at).strftime('%d.%m.%Y %H:%M')}, level=UserNotification.LEVEL_WARNING, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) elif not trial_config.is_trial_mode: notify_user( user=request.user, title=_('Trial-Modus deaktiviert'), body=_('Der Trial-Modus wurde ausgeschaltet.'), level=UserNotification.LEVEL_INFO, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) messages.success(request, _('Trial-Konfiguration wurde gespeichert.')) return render( request, 'workflows/trial_management.html', { 'form': PortalTrialConfigForm(instance=trial_config), 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, ) @_require_capability('manage_users') @require_POST def create_user_from_admin(request): form = UserManagementCreateForm(request.POST, include_product_owner=(get_user_role_key(request.user) == ROLE_PLATFORM_OWNER)) if not form.is_valid(): messages.error(request, _('Benutzer konnte nicht erstellt werden. Bitte prüfen Sie die Eingaben.')) return _render_user_management(request, create_form=form, status_code=400) user = form.save() _send_user_access_email(request, user, invitation=True) _audit( request, 'user_created', target_type='user', target_id=user.id, target_label=_display_user_name(user), details={'username': user.username, 'role': get_user_role_key(user), 'invitation_sent': True}, ) messages.success(request, _('Benutzer wurde erstellt und eingeladen: %(username)s') % {'username': user.username}) return redirect('user_management_page') @_require_capability('manage_users') @require_POST def update_user_from_admin(request, user_id: int): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) role_key = (request.POST.get('role_key') or '').strip() is_active = request.POST.get('is_active') == 'on' new_password = (request.POST.get('new_password') or '').strip() if role_key not in ROLE_GROUP_NAMES: messages.error(request, _('Ungültige Rolle.')) return redirect('user_management_page') if role_key == ROLE_PLATFORM_OWNER and get_user_role_key(request.user) != ROLE_PLATFORM_OWNER: messages.error(request, _('Nur Platform Owner dürfen diese Rolle vergeben.')) return redirect('user_management_page') current_role = get_user_role_key(request.user) if target_user == request.user and current_role == ROLE_PLATFORM_OWNER and (role_key != ROLE_PLATFORM_OWNER or not is_active): messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst sperren oder herabstufen.')) return redirect('user_management_page') if target_user == request.user and current_role == ROLE_SUPER_ADMIN and (role_key != ROLE_SUPER_ADMIN or not is_active): messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst sperren oder herabstufen.')) return redirect('user_management_page') if _would_remove_last_platform_owner(target_user, new_role_key=role_key, new_is_active=is_active): messages.error(request, _('Der letzte aktive Platform Owner kann nicht deaktiviert oder herabgestuft werden.')) return redirect('user_management_page') if _would_remove_last_super_admin(target_user, new_role_key=role_key, new_is_active=is_active): messages.error(request, _('Der letzte aktive Super Admin kann nicht deaktiviert oder herabgestuft werden.')) return redirect('user_management_page') assign_user_role(target_user, role_key) target_user.is_active = is_active if new_password: target_user.set_password(new_password) target_user.save() _audit( request, 'user_updated', target_type='user', target_id=target_user.id, target_label=_display_user_name(target_user), details={'username': target_user.username, 'role': role_key, 'is_active': is_active, 'password_changed': bool(new_password)}, ) messages.success(request, _('Benutzer wurde aktualisiert: %(username)s') % {'username': target_user.username}) return redirect('user_management_page') @_require_capability('manage_users') @require_POST def send_password_reset_from_admin(request, user_id: int): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) try: _send_user_access_email(request, target_user, invitation=False) except ValueError as exc: messages.error(request, str(exc)) return redirect('user_management_page') _audit( request, 'user_password_reset_sent', target_type='user', target_id=target_user.id, target_label=_display_user_name(target_user), details={'username': target_user.username, 'email': target_user.email}, ) messages.success(request, _('Passwort-Reset-Link wurde versendet: %(username)s') % {'username': target_user.username}) return redirect('user_management_page') @_require_capability('manage_users') @require_POST def delete_user_from_admin(request, user_id: int): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) current_role = get_user_role_key(request.user) if target_user == request.user and current_role == ROLE_PLATFORM_OWNER: messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst löschen.')) return redirect('user_management_page') if target_user == request.user: messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst löschen.')) return redirect('user_management_page') if _would_remove_last_platform_owner(target_user, deleting=True): messages.error(request, _('Der letzte aktive Platform Owner kann nicht gelöscht werden.')) return redirect('user_management_page') if _would_remove_last_super_admin(target_user, deleting=True): messages.error(request, _('Der letzte aktive Super Admin kann nicht gelöscht werden.')) return redirect('user_management_page') target_label = _display_user_name(target_user) username = target_user.username target_user.delete() _audit( request, 'user_deleted', target_type='user', target_label=target_label, details={'username': username}, ) messages.success(request, _('Benutzer wurde gelöscht: %(username)s') % {'username': username}) return redirect('user_management_page') @_require_capability('view_docs') def handbook_page(request): return render(request, 'workflows/handbook.html') @_require_capability('view_docs') def project_wiki_page(request): return render(request, 'workflows/project_wiki.html') @_require_capability('view_docs') def developer_handbook_page(request): return render(request, 'workflows/developer_handbook.html') @_require_capability('view_docs') def release_checklist_page(request): return render(request, 'workflows/release_checklist.html') @_require_capability('view_audit_log') def audit_log_page(request): action = (request.GET.get('action') or '').strip() user_query = (request.GET.get('user') or '').strip() date_from = (request.GET.get('date_from') or '').strip() date_to = (request.GET.get('date_to') or '').strip() rows_qs = AdminAuditLog.objects.select_related('actor').all() if action: rows_qs = rows_qs.filter(action=action) if user_query: rows_qs = rows_qs.filter( Q(actor_display__icontains=user_query) | Q(actor__username__icontains=user_query) | Q(actor__email__icontains=user_query) ) if date_from: rows_qs = rows_qs.filter(created_at__date__gte=date_from) if date_to: rows_qs = rows_qs.filter(created_at__date__lte=date_to) rows = list(rows_qs[:300]) action_choices = ( AdminAuditLog.objects.order_by('action').values_list('action', flat=True).distinct() ) return render( request, 'workflows/audit_log.html', { 'rows': rows, 'action_choices': action_choices, 'selected_action': action, 'user_query': user_query, 'date_from': date_from, 'date_to': date_to, }, ) @_require_capability('manage_backups') def backup_recovery_page(request): rows = list_backup_bundles() return render( request, 'workflows/backup_recovery.html', { 'rows': rows, 'backup_health': latest_backup_health_snapshot(), }, ) @_require_capability('manage_backups') @require_POST def create_backup_from_admin(request): try: result = create_backup_bundle() _audit( request, 'backup_created', target_type='backup_bundle', target_label=result['name'], details={'path': result['path']}, ) notify_user( user=request.user, title=_('Backup erstellt: %(name)s') % {'name': result['name']}, body=_('Das Backup-Bundle wurde erfolgreich erstellt.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_SUCCESS, ) messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']}) except Exception as exc: notify_user( user=request.user, title=_('Backup fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_FAILURE, ) messages.error(request, _('Backup konnte nicht erstellt werden: %(error)s') % {'error': exc}) return redirect('backup_recovery_page') @_require_capability('manage_backups') @require_POST def verify_backup_from_admin(request, backup_name: str): try: result = verify_backup_bundle(backup_name) _audit( request, 'backup_verified', target_type='backup_bundle', target_label=backup_name, details={'summary': result['summary']}, ) notify_user( user=request.user, title=_('Backup verifiziert: %(name)s') % {'name': result['name']}, body=result.get('summary') or _('Das Backup wurde erfolgreich verifiziert.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_SUCCESS, ) messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']}) except Exception as exc: notify_user( user=request.user, title=_('Backup-Verifikation fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/backups/', event_key=UserProfile.NOTIFICATION_BACKUP_FAILURE, ) messages.error(request, _('Backup-Verifikation fehlgeschlagen: %(error)s') % {'error': exc}) return redirect('backup_recovery_page') @_require_capability('manage_backups') @require_POST def delete_backup_from_admin(request, backup_name: str): try: result = delete_backup_bundle(backup_name) _audit( request, 'backup_deleted', target_type='backup_bundle', target_label=backup_name, details={}, ) messages.success(request, _('Backup wurde gelöscht: %(name)s') % {'name': result['name']}) except Exception as exc: messages.error(request, _('Backup konnte nicht gelöscht werden: %(error)s') % {'error': exc}) return redirect('backup_recovery_page') @_require_capability('view_request_timeline') def request_timeline_page(request, kind: str, request_id: int): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') request_label = _request_target_label(obj, kind) audit_rows = list( AdminAuditLog.objects.select_related('actor') .filter(target_type__in=[kind, 'request']) .filter(Q(target_id=request_id) | Q(target_label__icontains=(obj.full_name or '').strip())) .order_by('-created_at', '-id')[:200] ) timeline_rows = [ { 'created_at': obj.created_at, 'kind': 'system', 'title': _('Anfrage erstellt'), 'summary': request_label, 'meta': _('Status: %(status)s') % {'status': obj.get_processing_status_display()}, } ] contract_start = getattr(obj, 'contract_start', None) if contract_start: timeline_rows.append( { 'created_at': timezone.make_aware(timezone.datetime.combine(contract_start, timezone.datetime.min.time())), 'kind': 'milestone', 'title': _('Vertragsbeginn'), 'summary': str(contract_start), 'meta': _('Geplanter Start'), } ) handover_date = getattr(obj, 'handover_date', None) if handover_date: timeline_rows.append( { 'created_at': timezone.make_aware(timezone.datetime.combine(handover_date, timezone.datetime.min.time())), 'kind': 'milestone', 'title': _('Geräteübergabe / Hardware-Abholung'), 'summary': str(handover_date), 'meta': _('Geplanter Hardware-Termin'), } ) if getattr(obj, 'generated_pdf_path', ''): timeline_rows.append( { 'created_at': obj.created_at, 'kind': 'document', 'title': _('PDF verfügbar'), 'summary': Path(obj.generated_pdf_path).name, 'meta': '', 'url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}", } ) for row in audit_rows: timeline_rows.append( { 'created_at': row.created_at, 'kind': 'audit', 'title': _audit_action_label(row.action), 'summary': row.target_label or row.target_type or '-', 'meta': row.actor_display or '-', 'details': row.details, } ) if kind == 'onboarding': intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first() if intro_session: timeline_rows.append( { 'created_at': intro_session.updated_at, 'kind': 'session', 'title': _('Einweisungssitzung'), 'summary': intro_session.get_status_display(), 'meta': intro_session.completed_by_name or '-', 'url': (f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" if intro_session.exported_pdf_path else ''), } ) welcome_email = ScheduledWelcomeEmail.objects.filter(onboarding_request=obj).first() if welcome_email: timeline_rows.append( { 'created_at': welcome_email.updated_at, 'kind': 'email', 'title': _('Welcome E-Mail'), 'summary': welcome_email.get_status_display(), 'meta': welcome_email.recipient_email, } ) timeline_rows.sort(key=lambda item: item['created_at']) return render( request, 'workflows/request_timeline.html', { 'request_kind': kind, 'request_obj': obj, 'request_label': request_label, 'timeline_rows': timeline_rows, 'contract_start': getattr(obj, 'contract_start', None), 'handover_date': getattr(obj, 'handover_date', None), }, ) @login_required def requests_dashboard(request): if not user_has_capability(request.user, 'access_requests_dashboard'): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('home') if request.method == 'POST': if not user_has_capability(request.user, 'delete_requests'): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('requests_dashboard') selected = request.POST.getlist('selected_requests') single_delete = (request.POST.get('single_delete') or '').strip() if single_delete: selected = [single_delete] if not selected: messages.warning(request, _('Keine Einträge ausgewählt.')) return redirect('requests_dashboard') deleted_count = 0 invalid_count = 0 deleted_labels = [] for token in selected: try: kind, raw_id = token.split(':', 1) request_id = int(raw_id) except (ValueError, TypeError): invalid_count += 1 continue model = None if kind == 'onboarding': model = OnboardingRequest elif kind == 'offboarding': model = OffboardingRequest else: invalid_count += 1 continue obj = model.objects.filter(id=request_id).first() if not obj: continue deleted_labels.append(_request_target_label(obj, kind)) obj.delete() deleted_count += 1 if deleted_count: _audit( request, 'requests_deleted', target_type='request', target_label='Dashboard bulk/single delete', details={ 'deleted_count': deleted_count, 'invalid_count': invalid_count, 'selected': selected, 'request_labels': deleted_labels, }, ) messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count}) if invalid_count: messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count}) if not deleted_count and not invalid_count: messages.info(request, _('Keine passenden Einträge gefunden.')) return redirect('requests_dashboard') search_query = request.GET.get('q', '').strip() type_filter = (request.GET.get('type') or '').strip().lower() status_filter = (request.GET.get('status') or '').strip().lower() department_filter = (request.GET.get('department') or '').strip() date_from = (request.GET.get('date_from') or '').strip() date_to = (request.GET.get('date_to') or '').strip() onboarding_qs = OnboardingRequest.objects.order_by('-created_at') offboarding_qs = OffboardingRequest.objects.order_by('-created_at') all_onboarding = OnboardingRequest.objects.all() all_offboarding = OffboardingRequest.objects.all() if search_query: onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query)) offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query)) if status_filter in {'submitted', 'processing', 'completed', 'failed'}: onboarding_qs = onboarding_qs.filter(processing_status=status_filter) offboarding_qs = offboarding_qs.filter(processing_status=status_filter) if department_filter: onboarding_qs = onboarding_qs.filter(department=department_filter) offboarding_qs = offboarding_qs.filter(department=department_filter) if date_from: onboarding_qs = onboarding_qs.filter(created_at__date__gte=date_from) offboarding_qs = offboarding_qs.filter(created_at__date__gte=date_from) if date_to: onboarding_qs = onboarding_qs.filter(created_at__date__lte=date_to) offboarding_qs = offboarding_qs.filter(created_at__date__lte=date_to) if type_filter == 'onboarding': offboarding_qs = offboarding_qs.none() elif type_filter == 'offboarding': onboarding_qs = onboarding_qs.none() onboarding_items = onboarding_qs[:50] offboarding_items = offboarding_qs[:50] language_code = ( request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME) or getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de' ).split('-')[0].lower() rows = [] for obj in onboarding_items: intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first() if intro_session and intro_session.exported_pdf_path: intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" rows.append( { 'id': obj.id, 'kind': 'Onboarding', 'kind_slug': 'onboarding', 'name': obj.full_name, 'work_email': obj.work_email, 'created_at': obj.created_at, 'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None, 'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None, 'intro_session': intro_session, 'status': _request_status_label(obj.processing_status, language_code), 'status_key': obj.processing_status, 'last_error': obj.last_error, } ) for obj in offboarding_items: rows.append( { 'id': obj.id, 'kind': 'Offboarding', 'kind_slug': 'offboarding', 'name': obj.full_name, 'work_email': obj.work_email, 'created_at': obj.created_at, 'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None, 'intro_pdf_url': None, 'intro_session': None, 'status': _request_status_label(obj.processing_status, language_code), 'status_key': obj.processing_status, 'last_error': obj.last_error, } ) rows.sort(key=lambda x: x['created_at'], reverse=True) today = timezone.localdate() start_date = today - timedelta(days=13) onboarding_daily = {} offboarding_daily = {} for i in range(14): day = start_date + timedelta(days=i) onboarding_daily[day] = 0 offboarding_daily[day] = 0 for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True): onboarding_daily[timezone.localtime(dt).date()] += 1 for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True): offboarding_daily[timezone.localtime(dt).date()] += 1 chart_points = [] max_total = 1 for i in range(14): day = start_date + timedelta(days=i) on_count = onboarding_daily[day] off_count = offboarding_daily[day] total = on_count + off_count max_total = max(max_total, total) chart_points.append( { 'label': day.strftime('%d.%m'), 'onboarding': on_count, 'offboarding': off_count, 'total': total, } ) for point in chart_points: point['height'] = max(8, int((point['total'] / max_total) * 84)) onboarding_total = onboarding_qs.count() offboarding_total = offboarding_qs.count() departments = sorted( { value.strip() for value in list(all_onboarding.exclude(department='').values_list('department', flat=True)) + list(all_offboarding.exclude(department='').values_list('department', flat=True)) if value and value.strip() }, key=str.lower, ) status_choices = [ {'value': 'submitted', 'label': _request_status_label('submitted', language_code)}, {'value': 'processing', 'label': _request_status_label('processing', language_code)}, {'value': 'completed', 'label': _request_status_label('completed', language_code)}, {'value': 'failed', 'label': _request_status_label('failed', language_code)}, ] has_filters = any([search_query, type_filter, status_filter, department_filter, date_from, date_to]) column_count = 4 if user_has_capability(request.user, 'delete_requests'): column_count += 1 if user_has_capability(request.user, 'run_intro_session') or user_has_capability(request.user, 'generate_intro_pdfs'): column_count += 1 if user_has_capability(request.user, 'access_requests_dashboard'): column_count += 1 return render( request, 'workflows/requests_dashboard.html', { 'rows': rows[:60], 'search_query': search_query, 'selected_type': type_filter, 'selected_status': status_filter, 'selected_department': department_filter, 'date_from': date_from, 'date_to': date_to, 'departments': departments, 'status_choices': status_choices, 'has_filters': has_filters, 'column_count': column_count, 'onboarding_total': onboarding_total, 'offboarding_total': offboarding_total, 'combined_total': onboarding_total + offboarding_total, 'chart_points': chart_points, }, ) @login_required @ensure_csrf_cookie def onboarding_create(request): config = WorkflowConfig.objects.order_by('id').first() legal_text = ( config.legal_text if config and config.legal_text else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.' ) if request.method == 'POST': form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email) if form.is_valid(): obj = form.save() obj.onboarded_by_name = _display_user_name(request.user) obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0]) obj.save(update_fields=['onboarded_by_name', 'preferred_language']) process_onboarding_request.delay(obj.id) return redirect(f"/onboarding/new/?saved=1&id={obj.id}") else: form = OnboardingRequestForm(requester_email=request.user.email) onboarding_blocks = _build_onboarding_layout(form) field_pages = getattr(form, '_field_page_keys', {}) section_configs = ensure_form_section_configs('onboarding') visible_section_keys = { key for key in ONBOARDING_SECTION_ORDER if key in LOCKED_SECTION_RULES.get('onboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible } onboarding_sections = _build_onboarding_sections(onboarding_blocks, field_pages, visible_section_keys=visible_section_keys) onboarding_conditional_rules = _normalized_conditional_rule_payload('onboarding') return render( request, 'workflows/onboarding_form.html', { 'form': form, 'onboarding_blocks': onboarding_blocks, 'onboarding_sections': onboarding_sections, 'onboarding_inline_checks': ONBOARDING_INLINE_CHECKS, 'onboarding_checkbox_lists': ONBOARDING_CHECKBOX_LISTS, 'onboarding_conditional_rules': onboarding_conditional_rules, 'legal_text': legal_text, 'saved': request.GET.get('saved') == '1', 'saved_request_id': request.GET.get('id', ''), 'portal_email_domain': get_company_email_domain(), }, ) @login_required def onboarding_success(request, request_id: int): obj = get_object_or_404(OnboardingRequest, id=request_id) pdf_url = None if obj.generated_pdf_path: pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}" return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url}) @_require_capability('generate_intro_pdfs') @require_POST def generate_onboarding_intro_pdf(request, request_id: int): obj = get_object_or_404(OnboardingRequest, id=request_id) pdf_path = _generate_onboarding_intro_pdf(obj, language_code=get_language()) obj.intro_pdf_path = str(pdf_path) obj.save(update_fields=['intro_pdf_path']) _audit(request, 'intro_pdf_generated', target_type='onboarding', target_id=obj.id, target_label=obj.full_name) messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.')) return redirect('requests_dashboard') @_require_capability('generate_intro_pdfs') @require_POST def generate_onboarding_intro_session_pdf(request, request_id: int): onboarding = get_object_or_404(OnboardingRequest, id=request_id) session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding) pdf_path = _generate_onboarding_intro_session_pdf( session, admin_signature_name=_display_user_name(request.user), language_code=get_language(), ) session.exported_pdf_path = str(pdf_path) session.save(update_fields=['exported_pdf_path']) _audit(request, 'intro_live_pdf_generated', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name) messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.')) return redirect('onboarding_intro_session_page', request_id=request_id) @_require_capability('run_intro_session') def onboarding_intro_session_page(request, request_id: int): onboarding = get_object_or_404(OnboardingRequest, id=request_id) session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding) sections = build_intro_sections_for_request(onboarding, language_code=get_language()) if request.method == 'POST': checked_ids = set(request.POST.getlist('checked_items')) checklist_state = {} for section in sections: for item in section['items']: checklist_state[item['id']] = item['id'] in checked_ids action = (request.POST.get('session_action') or 'save').strip() session.checklist_state = checklist_state session.notes = (request.POST.get('notes') or '').strip() if action == 'reset': session.checklist_state = {} session.notes = '' session.status = 'draft' session.completed_at = None session.completed_by_name = '' session.exported_pdf_path = '' session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path']) _audit(request, 'intro_session_reset', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name) messages.success(request, _('Einweisung wurde zurückgesetzt.')) return redirect('onboarding_intro_session_page', request_id=request_id) if action == 'complete': session.status = 'completed' session.completed_at = timezone.now() session.completed_by_name = _display_user_name(request.user) _audit( request, 'intro_session_completed', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name, details={'checked_count': len([value for value in checklist_state.values() if value])}, ) messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.')) else: session.status = 'draft' session.completed_at = None session.completed_by_name = '' _audit( request, 'intro_session_saved', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name, details={'checked_count': len([value for value in checklist_state.values() if value])}, ) messages.success(request, _('Einweisung wurde als Entwurf gespeichert.')) session.save() return redirect('onboarding_intro_session_page', request_id=request_id) checked_map = session.checklist_state or {} checked_count = 0 total_count = 0 for section in sections: for item in section['items']: item['checked'] = bool(checked_map.get(item['id'])) total_count += 1 if item['checked']: checked_count += 1 salutation = (onboarding.get_gender_display() or '').strip() display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name progress_percent = int((checked_count / total_count) * 100) if total_count else 0 return render( request, 'workflows/onboarding_intro_session.html', { 'onboarding': onboarding, 'session': session, 'display_name': display_name, 'sections': sections, 'checked_count': checked_count, 'total_count': total_count, 'progress_percent': progress_percent, 'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None, }, ) @login_required @ensure_csrf_cookie def offboarding_create(request): profile_id = request.GET.get('profile') search_query = request.GET.get('q', '').strip() selected_profile = None if profile_id: selected_profile = EmployeeProfile.objects.filter(id=profile_id).first() search_results = [] if search_query: search_results = list( EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10] ) + list( EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10] ) # preserve order while removing duplicates seen = set() unique = [] for r in search_results: if r.id not in seen: unique.append(r) seen.add(r.id) search_results = unique[:10] if request.method == 'POST': form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile) if form.is_valid(): obj = form.save(commit=False) if selected_profile: obj.employee_profile = selected_profile requester_email = (request.user.email or '').strip().lower() company_suffix = f"@{get_company_email_domain()}" if requester_email and requester_email.endswith(company_suffix): obj.requested_by_email = requester_email else: obj.requested_by_email = settings.DEFAULT_FROM_EMAIL obj.requested_by_name = _display_user_name(request.user) obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0]) obj.save() process_offboarding_request.delay(obj.id) return redirect(f"/offboarding/new/?saved=1&id={obj.id}") else: form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query}) field_pages = getattr(form, '_field_page_keys', {}) section_configs = ensure_form_section_configs('offboarding') visible_section_keys = { key for key in OFFBOARDING_PAGE_ORDER if key in LOCKED_SECTION_RULES.get('offboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible } offboarding_sections = _build_offboarding_sections(form, visible_section_keys=visible_section_keys) return render( request, 'workflows/offboarding_form.html', { 'form': form, 'search_results': search_results, 'selected_profile': selected_profile, 'search_query': search_query, 'saved': request.GET.get('saved') == '1', 'saved_request_id': request.GET.get('id', ''), 'portal_email_domain': get_company_email_domain(), 'offboarding_sections': offboarding_sections, }, ) @login_required def offboarding_success(request, request_id: int): obj = get_object_or_404(OffboardingRequest, id=request_id) pdf_url = None if obj.generated_pdf_path: pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}" return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url}) @_require_capability('manage_builders') def form_builder_page(request): language_code = get_language() form_type = request.GET.get('form_type', 'onboarding') anchor = (request.GET.get('anchor') or '').strip() active_panel = (request.GET.get('panel') or '').strip() active_subpanel = (request.GET.get('subpanel') or '').strip() if form_type not in DEFAULT_FIELD_ORDER: form_type = 'onboarding' option_category = request.GET.get('option_category', 'department') option_categories = [c[0] for c in FormOption.CATEGORY_CHOICES] if option_category not in option_categories: option_category = option_categories[0] if request.method == 'POST': delete_option_id = request.POST.get('delete_option_id', '').strip() if delete_option_id: option = FormOption.objects.filter(id=delete_option_id).first() if not option: messages.error(request, 'Option nicht gefunden.') else: option_category = option.category deleted_label = option.label deleted_id = option.id option.delete() _audit(request, 'form_option_deleted', target_type='form_option', target_id=deleted_id, target_label=deleted_label) messages.success(request, 'Option wurde gelöscht.') return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}&panel=builder-content&subpanel=options#builder-content") action = request.POST.get('builder_action', '') if action == 'add_option': category = request.POST.get('category', '').strip() label = request.POST.get('label', '').strip() label_en = request.POST.get('label_en', '').strip() value = request.POST.get('value', '').strip() if category not in option_categories: messages.error(request, 'Ungültige Kategorie.') elif not label: messages.error(request, 'Bitte einen Namen für die Option angeben.') else: next_sort = ( FormOption.objects.filter(category=category).order_by('-sort_order').values_list('sort_order', flat=True).first() ) FormOption.objects.create( # Global form option catalog entry category=category, label=label, label_en=label_en, value=value or label, sort_order=(next_sort + 1) if next_sort is not None else 0, is_active=True, ) _audit( request, 'form_option_added', target_type='form_option', target_label=label, details={'category': category, 'label_en': label_en, 'value': value or label}, ) messages.success(request, 'Option wurde hinzugefügt.') option_category = category elif action == 'save_options': option_ids = request.POST.getlist('option_ids') for pos, raw_id in enumerate(option_ids): option = FormOption.objects.filter(id=raw_id).first() if not option: continue next_label = request.POST.get(f'label_{option.id}', '').strip() or option.label option.label = next_label option.label_en = request.POST.get(f'label_en_{option.id}', '').strip() option.value = request.POST.get(f'value_{option.id}', '').strip() or next_label option.is_active = request.POST.get(f'active_{option.id}') == 'on' option.sort_order = pos try: option.save(update_fields=['label', 'label_en', 'value', 'is_active', 'sort_order']) except IntegrityError: messages.error(request, f'Doppelte Bezeichnung in Kategorie: {next_label}') return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option.category}&panel=builder-content&subpanel=options#builder-content") option_category = option.category _audit(request, 'form_options_saved', target_type='form_option', target_label=option_category, details={'count': len(option_ids)}) messages.success(request, 'Optionen wurden gespeichert.') elif action == 'save_field_texts': field_ids = request.POST.getlist('field_ids') for raw_id in field_ids: cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first() if not cfg: continue cfg.label_override = (request.POST.get(f'label_override_{cfg.id}') or '').strip() cfg.label_override_en = (request.POST.get(f'label_override_en_{cfg.id}') or '').strip() cfg.help_text_override = (request.POST.get(f'help_text_override_{cfg.id}') or '').strip() cfg.help_text_override_en = (request.POST.get(f'help_text_override_en_{cfg.id}') or '').strip() cfg.save(update_fields=['label_override', 'label_override_en', 'help_text_override', 'help_text_override_en']) _audit(request, 'form_field_texts_saved', target_type='form_config', target_label=form_type, details={'count': len(field_ids)}) messages.success(request, 'Feldtexte wurden gespeichert.') elif action == 'save_field_rules': field_ids = request.POST.getlist('field_rule_ids') locked_fields = LOCKED_FIELD_RULES.get(form_type, set()) updated = 0 for raw_id in field_ids: cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first() if not cfg: continue if cfg.field_name in locked_fields: cfg.is_visible = True cfg.is_required = None else: cfg.is_visible = request.POST.get(f'is_visible_{cfg.id}') == 'on' required_mode = (request.POST.get(f'is_required_{cfg.id}') or '').strip() cfg.is_required = True if required_mode == 'required' else False if required_mode == 'optional' else None cfg.save(update_fields=['is_visible', 'is_required']) updated += 1 _audit(request, 'form_field_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated}) messages.success(request, 'Feldregeln wurden gespeichert.') elif action == 'save_section_rules' and form_type in {'onboarding', 'offboarding'}: section_configs = ensure_form_section_configs(form_type) locked_sections = LOCKED_SECTION_RULES.get(form_type, set()) updated = 0 for section_key, cfg in section_configs.items(): if section_key in locked_sections: if not cfg.is_visible: cfg.is_visible = True cfg.save(update_fields=['is_visible']) continue cfg.is_visible = request.POST.get(f'section_visible_{section_key}') == 'on' cfg.save(update_fields=['is_visible']) updated += 1 _audit(request, 'form_section_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated}) messages.success(request, 'Abschnittsregeln wurden gespeichert.') elif action == 'save_conditional_rules' and form_type == 'onboarding': rule_configs = ensure_form_conditional_rule_configs(form_type) updated = 0 for target_key, cfg in rule_configs.items(): cfg.is_active = request.POST.get(f'conditional_active_{target_key}') == 'on' clauses = [] clause_total = 2 for index in range(clause_total): field_name = (request.POST.get(f'conditional_field_{target_key}_{index}') or '').strip() operator = (request.POST.get(f'conditional_operator_{target_key}_{index}') or '').strip() value = (request.POST.get(f'conditional_value_{target_key}_{index}') or '').strip() if not field_name or not operator: continue parsed_value = True if operator == 'checked' else value clauses.append({'field': field_name, 'operator': operator, 'value': parsed_value}) cfg.clauses = clauses cfg.save(update_fields=['is_active', 'clauses']) updated += 1 _audit(request, 'form_conditional_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated}) messages.success(request, 'Bedingte Logik wurde gespeichert.') elif action == 'apply_preset': preset_key = (request.POST.get('preset_key') or '').strip() if apply_form_preset(form_type, preset_key): active_panel = 'builder-preview' _audit(request, 'form_preset_applied', target_type='form_config', target_label=form_type, details={'preset': preset_key}) messages.success(request, 'Preset wurde angewendet.') else: messages.error(request, 'Preset konnte nicht angewendet werden.') if action in {'add_option', 'save_options', 'save_field_texts'}: active_panel = 'builder-content' if action in {'add_option', 'save_options'}: active_subpanel = 'options' elif action == 'save_field_texts': active_subpanel = 'field-texts' elif action in {'save_field_rules', 'save_section_rules', 'save_conditional_rules'}: active_panel = 'builder-rules' redirect_target = f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}" if active_panel: redirect_target += f"&panel={active_panel}" if active_subpanel: redirect_target += f"&subpanel={active_subpanel}" if anchor == 'builder-content' or active_panel == 'builder-content': redirect_target += "#builder-content" return redirect(redirect_target) default_names = list(DEFAULT_FIELD_ORDER.get(form_type, [])) existing_names = list( OnboardingRequestForm.base_fields.keys() if form_type == 'onboarding' else OffboardingRequestForm.base_fields.keys() ) for name in existing_names: if name not in default_names: default_names.append(name) ensure_form_field_configs(form_type, default_names) section_configs = ensure_form_section_configs(form_type) conditional_rule_configs = ensure_form_conditional_rule_configs(form_type) if form_type == 'onboarding' else {} section_order = get_section_order(form_type) section_labels = get_section_labels(form_type) default_page_map = get_default_page_map(form_type) configs = list( FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name') ) labels = _form_field_labels(form_type) locked = LOCKED_FIELD_RULES.get(form_type, set()) locked_sections = LOCKED_SECTION_RULES.get(form_type, set()) if form_type == 'onboarding': columns = [ { 'key': key, 'title': ONBOARDING_PAGE_LABELS.get(key, key), 'items': [], } for key in ONBOARDING_PAGE_ORDER ] column_by_key = {c['key']: c for c in columns} fallback = 'abschluss' for cfg in configs: page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(cfg.field_name, fallback) if page_key not in column_by_key: page_key = fallback column_by_key[page_key]['items'].append( { 'field_name': cfg.field_name, 'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name), 'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name), 'label_en': cfg.label_override_en, 'is_visible': cfg.is_visible, 'is_required': cfg.is_required, 'locked': cfg.field_name in locked, 'page_key': page_key, } ) else: columns = [ { 'key': key, 'title': section_labels.get(key, key), 'items': [], } for key in section_order ] column_by_key = {c['key']: c for c in columns} fallback = section_order[-1] if section_order else 'all' for cfg in configs: page_key = cfg.page_key or default_page_map.get(cfg.field_name, fallback) if page_key not in column_by_key: page_key = fallback column_by_key[page_key]['items'].append( { 'field_name': cfg.field_name, 'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name), 'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name), 'label_en': cfg.label_override_en, 'is_visible': cfg.is_visible, 'is_required': cfg.is_required, 'locked': cfg.field_name in locked, 'page_key': page_key, } ) section_rule_items = [] if section_order: fallback_section = section_order[-1] if section_order else '' for key in section_order: cfg = section_configs.get(key) section_rule_items.append( { 'key': key, 'title': section_labels.get(key, key), 'is_visible': True if not cfg else cfg.is_visible, 'locked': key in locked_sections, 'field_count': len([c for c in configs if (c.page_key or default_page_map.get(c.field_name, fallback_section)) == key]), } ) field_rule_items = [] for cfg in configs: page_key = cfg.page_key or default_page_map.get(cfg.field_name, section_order[-1] if section_order else '') field_rule_items.append( { 'id': cfg.id, 'field_name': cfg.field_name, 'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name), 'page_key': page_key, 'page_label': section_labels.get(page_key, page_key) if section_order else '', 'is_visible': cfg.is_visible, 'is_required': cfg.is_required, 'locked': cfg.field_name in locked, } ) field_rule_groups = [] if section_order: grouped_rules = {key: [] for key in section_order} for item in field_rule_items: grouped_rules.setdefault(item['page_key'], []).append(item) for key in section_order: field_rule_groups.append( { 'key': key, 'title': section_labels.get(key, key), 'items': grouped_rules.get(key, []), } ) field_text_groups = [] if section_order: grouped_texts = {key: [] for key in section_order} for cfg in configs: page_key = cfg.page_key or default_page_map.get(cfg.field_name, section_order[-1] if section_order else '') grouped_texts.setdefault(page_key, []).append(cfg) for key in section_order: field_text_groups.append( { 'key': key, 'title': section_labels.get(key, key), 'items': grouped_texts.get(key, []), } ) conditional_rule_items = [] if form_type == 'onboarding': conditional_field_choices = [] for field_name in [ 'order_business_cards', 'employment_type', 'group_mailboxes_required_choice', 'additional_hardware_needed_choice', 'additional_software_needed_choice', 'additional_access_needed_choice', 'successor_required_choice', 'inherit_phone_number_choice', ]: conditional_field_choices.append((field_name, labels.get(field_name, field_name))) conditional_target_titles = { 'business-card-box': _('Visitenkarten-Details'), 'employment-end-box': _('Vertragsende'), 'group-mailboxes-box': _('Gruppenpostfächer'), 'extra-hardware-box': _('Zusätzliche Hardware'), 'extra-software-box': _('Zusätzliche Software'), 'extra-access-box': _('Zusätzliche Zugänge'), 'successor-box': _('Nachfolge'), 'phone-box': _('Direktwahl'), } conditional_target_descriptions = { 'business-card-box': _('Steuert die Detailfelder für Visitenkarten.'), 'employment-end-box': _('Steuert das Enddatum bei befristeter Beschäftigung.'), 'group-mailboxes-box': _('Steuert das Freitextfeld für Gruppenpostfächer.'), 'extra-hardware-box': _('Steuert zusätzliche Hardware-Felder.'), 'extra-software-box': _('Steuert zusätzliche Software-Felder.'), 'extra-access-box': _('Steuert zusätzliche Zugangsangaben.'), 'successor-box': _('Steuert Nachfolge- und Übernahmefelder.'), 'phone-box': _('Steuert die manuelle Direktwahl.'), } for target_key, cfg in conditional_rule_configs.items(): clauses = list(cfg.clauses or []) while len(clauses) < 2: clauses.append({'field': '', 'operator': 'equals', 'value': ''}) conditional_rule_items.append( { 'target_key': target_key, 'title': conditional_target_titles.get(target_key, target_key), 'description': conditional_target_descriptions.get(target_key, ''), 'is_active': cfg.is_active, 'clauses': clauses[:2], 'field_choices': conditional_field_choices, 'operator_choices': CONDITIONAL_RULE_OPERATOR_CHOICES, 'target_fields': [labels.get(name, name) for name in ONBOARDING_GROUPS.get(target_key, [])], } ) preview_sections = [] if section_order: field_rule_group_map = {group['key']: group['items'] for group in field_rule_groups} for key in section_order: section_cfg = section_configs.get(key) section_locked = key in locked_sections section_visible = True if section_locked or not section_cfg else section_cfg.is_visible visible_items = [ item for item in field_rule_group_map.get(key, []) if item['locked'] or item['is_visible'] ] if section_visible: preview_sections.append( { 'key': key, 'title': section_labels.get(key, key), 'items': visible_items, } ) locked_field_count = len([item for item in field_rule_items if item['locked']]) hidden_field_count = len([item for item in field_rule_items if not item['is_visible']]) configurable_field_count = len(field_rule_items) - locked_field_count hidden_section_count = len([item for item in section_rule_items if not item['is_visible']]) if section_rule_items else 0 builder_summary = { 'locked_field_count': locked_field_count, 'configurable_field_count': configurable_field_count, 'hidden_field_count': hidden_field_count, 'hidden_section_count': hidden_section_count, } return render( request, 'workflows/form_builder.html', { 'form_type': form_type, 'columns': columns, 'form_types': [('onboarding', 'Onboarding'), ('offboarding', 'Offboarding')], 'option_categories': _translate_choice_list(FormOption.CATEGORY_CHOICES), 'selected_option_category': option_category, 'option_items': FormOption.objects.filter(category=option_category).order_by('sort_order', 'label'), 'field_text_items': configs, 'field_rule_items': field_rule_items, 'field_rule_groups': field_rule_groups, 'field_text_groups': field_text_groups, 'preview_sections': preview_sections, 'section_rule_items': section_rule_items, 'builder_summary': builder_summary, 'conditional_rule_items': conditional_rule_items, 'active_panel': active_panel, 'active_subpanel': active_subpanel, 'available_presets': FORM_PRESETS.get(form_type, {}), }, ) @_require_capability('manage_builders') def intro_builder_page(request): if request.method == 'POST': delete_id = (request.POST.get('delete_item_id') or '').strip() if delete_id: item = IntroChecklistItem.objects.filter(id=delete_id).first() if item: deleted_label = item.label deleted_id_int = item.id item.delete() _audit(request, 'intro_checklist_item_deleted', target_type='intro_checklist_item', target_id=deleted_id_int, target_label=deleted_label) messages.success(request, 'Checklistenpunkt wurde gelöscht.') else: messages.error(request, 'Checklistenpunkt nicht gefunden.') return redirect('intro_builder_page') action = (request.POST.get('builder_action') or '').strip() if action == 'add_item': section = (request.POST.get('section') or '').strip() label = (request.POST.get('label') or '').strip() label_en = (request.POST.get('label_en') or '').strip() if section not in {k for k, _ in IntroChecklistItem.SECTION_CHOICES}: messages.error(request, 'Ungültiger Abschnitt.') return redirect('intro_builder_page') if not label: messages.error(request, 'Bitte eine Bezeichnung für den Checklistenpunkt angeben.') return redirect('intro_builder_page') next_sort = ( IntroChecklistItem.objects.filter(section=section).order_by('-sort_order').values_list('sort_order', flat=True).first() ) IntroChecklistItem.objects.create( section=section, label=label, label_en=label_en, sort_order=(next_sort + 1) if next_sort is not None else 0, is_active=True, condition_operator='always', ) _audit(request, 'intro_checklist_item_added', target_type='intro_checklist_item', target_label=label, details={'section': section, 'label_en': label_en}) messages.success(request, 'Checklistenpunkt wurde hinzugefügt.') return redirect('intro_builder_page') if action == 'save_items': item_ids = request.POST.getlist('item_ids') valid_sections = {k for k, _ in IntroChecklistItem.SECTION_CHOICES} valid_ops = {k for k, _ in IntroChecklistItem.OPERATOR_CHOICES} for pos, raw_id in enumerate(item_ids): item = IntroChecklistItem.objects.filter(id=raw_id).first() if not item: continue section = (request.POST.get(f'section_{item.id}') or item.section).strip() if section not in valid_sections: section = item.section operator = (request.POST.get(f'operator_{item.id}') or item.condition_operator).strip() if operator not in valid_ops: operator = 'always' item.section = section item.label = (request.POST.get(f'label_{item.id}') or item.label).strip() or item.label item.label_en = (request.POST.get(f'label_en_{item.id}') or '').strip() item.is_active = request.POST.get(f'active_{item.id}') == 'on' item.condition_field = (request.POST.get(f'field_{item.id}') or '').strip() item.condition_operator = operator item.condition_value = (request.POST.get(f'value_{item.id}') or '').strip() item.sort_order = pos item.save( update_fields=[ 'section', 'label', 'label_en', 'is_active', 'condition_field', 'condition_operator', 'condition_value', 'sort_order', ] ) _audit(request, 'intro_checklist_saved', target_type='intro_checklist_item', details={'count': len(item_ids)}) messages.success(request, 'Einweisungs-Checkliste wurde gespeichert.') return redirect('intro_builder_page') condition_field_choices = [ ('', 'Keine Bedingung'), ('needed_devices', 'Benötigte Geräte und Gegenstände'), ('needed_software', 'Benötigte Software'), ('needed_accesses', 'Benötigte Zugänge'), ('needed_workspace_groups', 'Benötigte Gruppen im Workspace'), ('needed_resources', 'Benötigte Ressourcen'), ('additional_hardware', 'Zusätzliche Hardware'), ('additional_software', 'Zusätzliche Software'), ('additional_access_text', 'Weitere Zugänge (Freitext)'), ('group_mailboxes_required', 'Gruppenpostfächer erforderlich'), ('order_business_cards', 'Visitenkarten bestellt'), ('phone_number', 'Direktwahl vorhanden'), ('successor_name', 'Nachfolge vorhanden'), ('department', 'Abteilung'), ] items = list(IntroChecklistItem.objects.all().order_by('section', 'sort_order', 'label')) return render( request, 'workflows/intro_builder.html', { 'items': items, 'section_choices': _translate_choice_list(IntroChecklistItem.SECTION_CHOICES), 'operator_choices': _translate_choice_list(IntroChecklistItem.OPERATOR_CHOICES), 'condition_field_choices': condition_field_choices, }, ) @_require_capability('manage_integrations') def integrations_setup_page(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') kind = (request.GET.get('kind') or 'nextcloud').strip().lower() if kind not in {'nextcloud', 'mail', 'emails', 'rules', 'backup'}: kind = 'nextcloud' templates = list(NotificationTemplate.objects.all().order_by('key')) system_email_config = ( SystemEmailConfig.objects.filter(is_active=True).order_by('-updated_at').first() or SystemEmailConfig.objects.filter(name='Default SMTP').first() ) return render( request, 'workflows/integrations_setup.html', { 'workflow_config': config, 'system_email_config': system_email_config, 'nextcloud_enabled': is_nextcloud_enabled(), 'email_test_mode': is_email_test_mode(), 'kind': kind, 'templates': templates, 'notification_rules': NotificationRule.objects.all().order_by('event_type', 'sort_order', 'id'), 'rule_event_choices': NotificationRule.EVENT_CHOICES, 'rule_operator_choices': NotificationRule.OPERATOR_CHOICES, 'template_choices': NotificationTemplate.TEMPLATE_CHOICES, 'remote_backup_target_choices': WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES, }, ) @_require_capability('manage_welcome_emails') def welcome_emails_page(request): rows = ScheduledWelcomeEmail.objects.select_related('onboarding_request').order_by('-send_at', '-id')[:200] config, _ = WorkflowConfig.objects.get_or_create(name='Default') welcome_template = NotificationTemplate.objects.filter(key='onboarding_welcome').first() default_welcome = get_default_notification_templates().get('onboarding_welcome', {}) default_subject = (default_welcome.get('subject') or '').strip() default_body = (default_welcome.get('body') or '').strip() default_subject_en = (default_welcome.get('subject_en') or '').strip() default_body_en = (default_welcome.get('body_en') or '').strip() subject_value = (welcome_template.subject_template if welcome_template else '').strip() or default_subject body_value = (welcome_template.body_template if welcome_template else '').strip() or default_body subject_value_en = (welcome_template.subject_template_en if welcome_template else '').strip() or default_subject_en body_value_en = (welcome_template.body_template_en if welcome_template else '').strip() or default_body_en return render( request, 'workflows/welcome_emails.html', { 'rows': rows, 'workflow_config': config, 'welcome_template': welcome_template, 'welcome_subject_value': subject_value, 'welcome_body_value': body_value, 'welcome_subject_value_en': subject_value_en, 'welcome_body_value_en': body_value_en, 'welcome_keywords': ['{{ FULL_NAME }}', '{{ VORNAME }}', '{{ NACHNAME }}', '{{ DEPARTMENT }}', '{{ CONTRACT_START }}', '{{ EMAIL }}', '{{ REQUESTED_BY }}'], }, ) @_require_capability('manage_welcome_emails') @require_POST def trigger_welcome_email_now(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status == 'cancelled': messages.error(request, f'Welcome E-Mail #{schedule_id} ist abgebrochen und kann nicht gesendet werden.') return redirect('welcome_emails_page') async_result = send_scheduled_welcome_email.delay(scheduled.id, True) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.status = 'scheduled' scheduled.last_error = '' scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at']) _audit(request, 'welcome_email_triggered_now', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde sofort angestoßen.') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def save_welcome_email_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: delay_days = int(request.POST.get('welcome_email_delay_days', config.welcome_email_delay_days or 5)) except ValueError: messages.error(request, 'Ungültige Zahl bei der Welcome-Verzögerung.') return redirect('welcome_emails_page') config.welcome_email_delay_days = max(0, delay_days) config.welcome_sender_email = request.POST.get('welcome_sender_email', '').strip() config.welcome_include_pdf = request.POST.get('welcome_include_pdf') == 'on' config.save(update_fields=['welcome_email_delay_days', 'welcome_sender_email', 'welcome_include_pdf']) subject = request.POST.get('welcome_subject') body = request.POST.get('welcome_body') subject_en = request.POST.get('welcome_subject_en') body_en = request.POST.get('welcome_body_en') if subject is not None or body is not None or subject_en is not None or body_en is not None: default_welcome = get_default_notification_templates().get('onboarding_welcome', {}) default_subject = (default_welcome.get('subject') or '').strip() default_body = (default_welcome.get('body') or '').strip() default_subject_en = (default_welcome.get('subject_en') or '').strip() default_body_en = (default_welcome.get('body_en') or '').strip() subject_clean = (subject or '').strip() or default_subject body_clean = (body or '').strip() or default_body subject_clean_en = (subject_en or '').strip() or default_subject_en body_clean_en = (body_en or '').strip() or default_body_en template, _ = NotificationTemplate.objects.get_or_create( key='onboarding_welcome', defaults={ 'subject_template': subject_clean, 'body_template': body_clean, 'subject_template_en': subject_clean_en, 'body_template_en': body_clean_en, 'is_active': True, }, ) changes = [] if template.subject_template != subject_clean: template.subject_template = subject_clean changes.append('subject_template') if template.body_template != body_clean: template.body_template = body_clean changes.append('body_template') if template.subject_template_en != subject_clean_en: template.subject_template_en = subject_clean_en changes.append('subject_template_en') if template.body_template_en != body_clean_en: template.body_template_en = body_clean_en changes.append('body_template_en') if not template.is_active: template.is_active = True changes.append('is_active') if changes: template.save(update_fields=changes) _audit( request, 'welcome_email_settings_saved', target_type='welcome_email_settings', target_label='onboarding_welcome', details={ 'delay_days': config.welcome_email_delay_days, 'sender_email': config.welcome_sender_email, 'include_pdf': config.welcome_include_pdf, }, ) messages.success(request, 'Welcome-E-Mail Einstellungen wurden gespeichert.') return redirect('welcome_emails_page') def _revoke_celery_task(task_id: str) -> None: if not task_id: return try: current_app.control.revoke(task_id, terminate=False) except Exception: return def _parse_selected_schedule_ids(raw: str) -> list[int]: if not raw: return [] parsed: list[int] = [] seen: set[int] = set() for token in raw.split(','): token = token.strip() if not token: continue try: schedule_id = int(token) except ValueError: continue if schedule_id in seen: continue seen.add(schedule_id) parsed.append(schedule_id) return parsed @_require_capability('manage_welcome_emails') @require_POST def bulk_welcome_email_action(request): action = (request.POST.get('bulk_action') or '').strip().lower() selected_ids = _parse_selected_schedule_ids(request.POST.get('selected_ids', '')) if action not in {'pause', 'send_now', 'delete'}: messages.error(request, 'Ungültige Bulk-Aktion.') return redirect('welcome_emails_page') if not selected_ids: messages.warning(request, 'Keine Welcome-Einträge ausgewählt.') return redirect('welcome_emails_page') rows = list(ScheduledWelcomeEmail.objects.filter(id__in=selected_ids).order_by('id')) if not rows: messages.warning(request, 'Keine passenden Welcome-Einträge gefunden.') return redirect('welcome_emails_page') success_count = 0 skipped_count = 0 for scheduled in rows: if action == 'pause': if scheduled.status in {'sent', 'cancelled'}: skipped_count += 1 continue _revoke_celery_task(scheduled.celery_task_id) scheduled.status = 'paused' scheduled.save(update_fields=['status', 'updated_at']) success_count += 1 continue if action == 'send_now': if scheduled.status == 'cancelled': skipped_count += 1 continue async_result = send_scheduled_welcome_email.delay(scheduled.id, True) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.status = 'scheduled' scheduled.last_error = '' scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at']) success_count += 1 continue if action == 'delete': if scheduled.status == 'scheduled': _revoke_celery_task(scheduled.celery_task_id) scheduled.delete() success_count += 1 action_label = { 'pause': 'pausiert', 'send_now': 'sofort angestoßen', 'delete': 'gelöscht', }[action] if success_count: _audit( request, 'welcome_email_bulk_action', target_type='welcome_email', target_label=action, details={'selected_ids': selected_ids, 'success_count': success_count, 'skipped_count': skipped_count}, ) messages.success(request, f'{success_count} Welcome-Eintrag/Einträge {action_label}.') if skipped_count: messages.warning(request, f'{skipped_count} Eintrag/Einträge wurden übersprungen (Status nicht geeignet).') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def pause_welcome_email(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status in {'sent', 'cancelled'}: messages.error(request, f'Welcome E-Mail #{schedule_id} kann nicht pausiert werden.') return redirect('welcome_emails_page') _revoke_celery_task(scheduled.celery_task_id) scheduled.status = 'paused' scheduled.save(update_fields=['status', 'updated_at']) _audit(request, 'welcome_email_paused', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde pausiert.') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def resume_welcome_email(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status != 'paused': messages.error(request, f'Welcome E-Mail #{schedule_id} ist nicht pausiert.') return redirect('welcome_emails_page') eta = scheduled.send_at if timezone.now() < scheduled.send_at else None async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=eta) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.status = 'scheduled' scheduled.last_error = '' scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at']) _audit(request, 'welcome_email_resumed', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde fortgesetzt.') return redirect('welcome_emails_page') @_require_capability('manage_welcome_emails') @require_POST def cancel_welcome_email(request, schedule_id: int): scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first() if not scheduled: messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.') return redirect('welcome_emails_page') if scheduled.status == 'sent': messages.error(request, f'Welcome E-Mail #{schedule_id} wurde bereits gesendet.') return redirect('welcome_emails_page') _revoke_celery_task(scheduled.celery_task_id) scheduled.status = 'cancelled' scheduled.last_error = '' scheduled.save(update_fields=['status', 'last_error', 'updated_at']) _audit(request, 'welcome_email_cancelled', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email) messages.success(request, f'Welcome E-Mail #{schedule_id} wurde abgebrochen.') return redirect('welcome_emails_page') @_require_capability('manage_builders') @require_POST def form_builder_save_order(request): try: payload = json.loads(request.body.decode('utf-8')) except (json.JSONDecodeError, UnicodeDecodeError): return JsonResponse({'ok': False, 'error': 'Ungültige JSON-Daten.'}, status=400) form_type = payload.get('form_type') if form_type not in DEFAULT_FIELD_ORDER: return JsonResponse({'ok': False, 'error': 'Ungültiger Formulartyp.'}, status=400) columns = payload.get('columns') if not isinstance(columns, dict): return JsonResponse({'ok': False, 'error': 'Spalten-Daten fehlen.'}, status=400) configs = list(FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name')) allowed_names = {cfg.field_name for cfg in configs} seen = set() ordered_names = [] if form_type == 'onboarding': allowed_columns = ONBOARDING_PAGE_ORDER else: allowed_columns = ['all'] name_to_cfg = {cfg.field_name: cfg for cfg in configs} sort_order = 0 for column_key in allowed_columns: names = columns.get(column_key, []) if not isinstance(names, list): return JsonResponse({'ok': False, 'error': f'Ungültige Spalte: {column_key}'}, status=400) for name in names: if not isinstance(name, str): continue if name not in allowed_names or name in seen: continue seen.add(name) ordered_names.append(name) cfg = name_to_cfg[name] cfg.sort_order = sort_order sort_order += 1 if form_type == 'onboarding': cfg.page_key = column_key else: cfg.page_key = '' missing = [cfg.field_name for cfg in configs if cfg.field_name not in seen] for name in missing: cfg = name_to_cfg[name] cfg.sort_order = sort_order sort_order += 1 if form_type == 'onboarding': cfg.page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(name, 'abschluss') else: cfg.page_key = '' FormFieldConfig.objects.bulk_update(configs, ['sort_order', 'page_key']) _audit(request, 'form_layout_saved', target_type='form_config', target_label=form_type, details={'saved_count': len(configs)}) return JsonResponse({'ok': True, 'saved_count': len(configs)}) @_require_capability('manage_integrations') @require_POST def send_test_email(request): mode = 'TEST_MODE_ON' if is_email_test_mode() else 'TEST_MODE_OFF' redirect_email = get_email_test_redirect() try: send_system_email( subject=f'SMTP test from onboarding/offboarding v2 ({mode})', body=( 'This is a test email. If you see this, SMTP is configured correctly.\n' f'EMAIL_TEST_MODE={is_email_test_mode()}\n' f'EMAIL_TEST_REDIRECT={redirect_email}\n' ), to=[settings.TEST_NOTIFICATION_EMAIL], ) _audit(request, 'smtp_test_sent', target_type='system_email', target_label=settings.TEST_NOTIFICATION_EMAIL, details={'email_test_mode': is_email_test_mode()}) notify_user( user=request.user, title=_('SMTP-Test erfolgreich'), body=_('Die SMTP-Testmail wurde erfolgreich gesendet.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.success(request, f'SMTP-Testmail wurde gesendet ({mode}).') except Exception as exc: notify_user( user=request.user, title=_('SMTP-Test fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.error(request, _('SMTP-Testmail konnte nicht gesendet werden: %(error)s') % {'error': exc}) return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def nextcloud_test_upload(request): filename = f"nextcloud_test_{timezone.now().strftime('%Y%m%d_%H%M%S')}.txt" content = ( "Nextcloud test upload from onboarding/offboarding system.\n" f"Time: {timezone.now().isoformat()}\n" f"User: {request.user.username}\n" ) temp_path = None try: with NamedTemporaryFile('w', suffix='.txt', delete=False, encoding='utf-8') as tf: tf.write(content) temp_path = Path(tf.name) ok = upload_to_nextcloud(temp_path, filename) if ok: _audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'success'}) notify_user( user=request.user, title=_('Nextcloud-Test erfolgreich'), body=_('Der Testupload nach Nextcloud war erfolgreich.'), level=UserNotification.LEVEL_SUCCESS, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.success(request, f'Nextcloud-Testupload erfolgreich: {filename}') else: _audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'error'}) notify_user( user=request.user, title=_('Nextcloud-Test fehlgeschlagen'), body=_('Der Testupload nach Nextcloud ist fehlgeschlagen.'), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.error(request, 'Nextcloud-Testupload fehlgeschlagen. Bitte Konfiguration prüfen.') except Exception as exc: notify_user( user=request.user, title=_('Nextcloud-Test fehlgeschlagen'), body=str(exc), level=UserNotification.LEVEL_ERROR, link_url='/admin-tools/integrations/', event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS, ) messages.error(request, f'Nextcloud-Testupload fehlgeschlagen: {exc}') finally: if temp_path and temp_path.exists(): temp_path.unlink(missing_ok=True) return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def toggle_nextcloud_enabled(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') currently_enabled = is_nextcloud_enabled() config.nextcloud_enabled_override = not currently_enabled config.save(update_fields=['nextcloud_enabled_override']) _audit(request, 'nextcloud_mode_toggled', target_type='workflow_config', target_label='nextcloud', details={'enabled': config.nextcloud_enabled_override}) state = 'aktiviert' if config.nextcloud_enabled_override else 'deaktiviert' messages.success(request, f'Nextcloud Upload wurde {state}.') return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def toggle_email_mode(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') currently_test_mode = is_email_test_mode() config.email_test_mode_override = not currently_test_mode config.save(update_fields=['email_test_mode_override']) _audit(request, 'email_mode_toggled', target_type='workflow_config', target_label='email_mode', details={'test_mode': config.email_test_mode_override}) state = 'Testmodus (Umleitung)' if config.email_test_mode_override else 'Produktionsmodus' messages.success(request, f'E-Mail-Modus wurde auf {state} gesetzt.') return _redirect_back(request, 'home') @_require_capability('manage_integrations') @require_POST def save_integrations_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60)) smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465)) except ValueError: messages.error(request, 'Ungültige Zahl bei Sync-Intervall oder SMTP Port.') return redirect('home') config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip() config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip() config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip() config.sync_interval_seconds = max(10, sync_interval) config.imap_server = request.POST.get('imap_server', '').strip() config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX' config.smtp_server = request.POST.get('smtp_server', '').strip() config.smtp_port = max(1, smtp_port) config.email_account = request.POST.get('email_account', '').strip() config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on' config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on' nextcloud_password = request.POST.get('nextcloud_password_override', '').strip() if nextcloud_password: config.nextcloud_password_override = nextcloud_password email_password = request.POST.get('email_password', '').strip() if email_password: config.email_password = email_password config.save() _audit(request, 'integrations_saved', target_type='workflow_config', target_label='all_integrations') messages.success(request, 'Integrations-Einstellungen wurden gespeichert.') return redirect('home') @_require_capability('manage_integrations') @require_POST def save_nextcloud_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60)) except ValueError: messages.error(request, 'Ungültige Zahl beim Sync-Intervall.') return redirect('home') config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip() config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip() config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip() config.sync_interval_seconds = max(10, sync_interval) nextcloud_password = request.POST.get('nextcloud_password_override', '').strip() if nextcloud_password: config.nextcloud_password_override = nextcloud_password config.save() _audit(request, 'nextcloud_settings_saved', target_type='workflow_config', target_label='nextcloud') messages.success(request, 'Nextcloud-Einstellungen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=nextcloud') @_require_capability('manage_integrations') @require_POST def save_workflow_rules(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: handover_lead_days = int( request.POST.get( 'device_handover_lead_days', config.device_handover_lead_days or 5, ) ) except ValueError: messages.error(request, 'Ungültige Zahl beim Hardware-Vorlauf.') return redirect('/admin-tools/integrations/?kind=rules') config.device_handover_lead_days = max(0, handover_lead_days) config.save(update_fields=['device_handover_lead_days']) _audit( request, 'workflow_rules_saved', target_type='workflow_config', target_label='workflow_rules', details={ 'device_handover_lead_days': config.device_handover_lead_days, }, ) messages.success(request, 'Workflow-Regeln wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=rules') @_require_capability('manage_integrations') @require_POST def save_backup_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') target_type = (request.POST.get('remote_backup_target_type') or config.remote_backup_target_type or 'nextcloud').strip().lower() if target_type not in {choice for choice, _ in WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES}: target_type = 'nextcloud' remote_backup_enabled = request.POST.get('remote_backup_enabled') == 'on' remote_backup_nextcloud_directory = request.POST.get('remote_backup_nextcloud_directory', '').strip() primary_nextcloud_directory = ( (config.nextcloud_directory_override or '').strip() or settings.NEXTCLOUD_DIRECTORY.strip() ).strip('/') if remote_backup_enabled and target_type == 'nextcloud': if not remote_backup_nextcloud_directory: messages.error(request, 'Bitte ein separates Nextcloud Backup-Verzeichnis angeben.') return redirect('/admin-tools/integrations/?kind=backup') if remote_backup_nextcloud_directory.strip('/') == primary_nextcloud_directory: messages.error(request, 'Das Backup-Verzeichnis muss vom normalen Nextcloud Dokumentenordner getrennt sein.') return redirect('/admin-tools/integrations/?kind=backup') config.remote_backup_enabled = remote_backup_enabled config.remote_backup_target_type = target_type config.remote_backup_nextcloud_directory = remote_backup_nextcloud_directory config.remote_backup_s3_bucket = request.POST.get('remote_backup_s3_bucket', '').strip() config.remote_backup_nfs_path = request.POST.get('remote_backup_nfs_path', '').strip() config.save( update_fields=[ 'device_handover_lead_days', 'remote_backup_enabled', 'remote_backup_target_type', 'remote_backup_nextcloud_directory', 'remote_backup_s3_bucket', 'remote_backup_nfs_path', ] ) _audit( request, 'backup_settings_saved', target_type='workflow_config', target_label='backup_settings', details={ 'remote_backup_enabled': config.remote_backup_enabled, 'remote_backup_target_type': config.remote_backup_target_type, 'remote_backup_nextcloud_directory': config.remote_backup_nextcloud_directory, }, ) messages.success(request, 'Backup-Einstellungen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=backup') @_require_capability('manage_integrations') @require_POST def save_mail_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') try: smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465)) except ValueError: messages.error(request, 'Ungültige Zahl beim SMTP Port.') return redirect('home') config.imap_server = request.POST.get('imap_server', '').strip() config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX' config.smtp_server = request.POST.get('smtp_server', '').strip() config.smtp_port = max(1, smtp_port) config.email_account = request.POST.get('email_account', '').strip() config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on' config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on' email_password = request.POST.get('email_password', '').strip() if email_password: config.email_password = email_password config.save() smtp_cfg, _ = SystemEmailConfig.objects.get_or_create(name='Default SMTP') SystemEmailConfig.objects.exclude(id=smtp_cfg.id).update(is_active=False) smtp_cfg.is_active = True smtp_cfg.host = config.smtp_server smtp_cfg.port = config.smtp_port smtp_cfg.username = config.email_account if email_password: smtp_cfg.password = email_password smtp_cfg.use_ssl = config.smtp_use_ssl smtp_cfg.use_tls = config.smtp_use_tls smtp_cfg.from_email = request.POST.get('from_email', '').strip() smtp_cfg.save() _audit(request, 'mail_settings_saved', target_type='workflow_config', target_label='mail') messages.success(request, 'Mail-Einstellungen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=mail') @_require_capability('manage_integrations') @require_POST def save_email_routing_settings(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') config.it_onboarding_email = request.POST.get('it_onboarding_email', '').strip() config.general_info_email = request.POST.get('general_info_email', '').strip() config.business_card_email = request.POST.get('business_card_email', '').strip() config.hr_works_email = request.POST.get('hr_works_email', '').strip() config.key_notification_email = request.POST.get('key_notification_email', '').strip() config.save( update_fields=[ 'it_onboarding_email', 'general_info_email', 'business_card_email', 'hr_works_email', 'key_notification_email', ] ) known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES} for key in known_keys: subject = request.POST.get(f'subject_{key}') body = request.POST.get(f'body_{key}') subject_en = request.POST.get(f'subject_en_{key}') body_en = request.POST.get(f'body_en_{key}') if subject is None and body is None and subject_en is None and body_en is None: continue subject = (subject or '').strip() body = (body or '').strip() subject_en = (subject_en or '').strip() body_en = (body_en or '').strip() if not subject and not body and not subject_en and not body_en: continue obj, _ = NotificationTemplate.objects.get_or_create( key=key, defaults={ 'subject_template': subject or f'[{key}]', 'body_template': body or '-', 'subject_template_en': subject_en, 'body_template_en': body_en, 'is_active': True, }, ) changed = [] if subject and obj.subject_template != subject: obj.subject_template = subject changed.append('subject_template') if body and obj.body_template != body: obj.body_template = body changed.append('body_template') if obj.subject_template_en != subject_en: obj.subject_template_en = subject_en changed.append('subject_template_en') if obj.body_template_en != body_en: obj.body_template_en = body_en changed.append('body_template_en') if not obj.is_active: obj.is_active = True changed.append('is_active') if changed: obj.save(update_fields=changed) _audit(request, 'email_routing_saved', target_type='workflow_config', target_label='email_routing') messages.success(request, 'E-Mail Routing und Vorlagen wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=emails') @_require_capability('manage_integrations') @require_POST def save_notification_rules(request): rule_ids = request.POST.getlist('rule_ids') for position, raw_id in enumerate(rule_ids): rule = NotificationRule.objects.filter(id=raw_id).first() if not rule: continue if request.POST.get(f'delete_{rule.id}') == 'on': rule.delete() continue rule.name = request.POST.get(f'name_{rule.id}', '').strip() or rule.name event_type = request.POST.get(f'event_type_{rule.id}', '').strip() if event_type in {'onboarding', 'offboarding'}: rule.event_type = event_type operator = request.POST.get(f'operator_{rule.id}', '').strip() if operator in {x[0] for x in NotificationRule.OPERATOR_CHOICES}: rule.operator = operator rule.field_name = request.POST.get(f'field_name_{rule.id}', '').strip() rule.expected_value = request.POST.get(f'expected_value_{rule.id}', '').strip() rule.recipients = request.POST.get(f'recipients_{rule.id}', '').strip() rule.template_key = request.POST.get(f'template_key_{rule.id}', '').strip() rule.custom_subject = request.POST.get(f'custom_subject_{rule.id}', '').strip() rule.custom_body = request.POST.get(f'custom_body_{rule.id}', '').strip() rule.custom_subject_en = request.POST.get(f'custom_subject_en_{rule.id}', '').strip() rule.custom_body_en = request.POST.get(f'custom_body_en_{rule.id}', '').strip() rule.include_pdf_attachment = request.POST.get(f'include_pdf_{rule.id}') == 'on' rule.is_active = request.POST.get(f'active_{rule.id}') == 'on' rule.sort_order = position rule.save() new_name = request.POST.get('new_name', '').strip() new_recipients = request.POST.get('new_recipients', '').strip() if new_name and new_recipients: new_event = request.POST.get('new_event_type', 'onboarding').strip() if new_event not in {'onboarding', 'offboarding'}: new_event = 'onboarding' new_operator = request.POST.get('new_operator', 'always').strip() if new_operator not in {x[0] for x in NotificationRule.OPERATOR_CHOICES}: new_operator = 'always' NotificationRule.objects.create( name=new_name, event_type=new_event, field_name=request.POST.get('new_field_name', '').strip(), operator=new_operator, expected_value=request.POST.get('new_expected_value', '').strip(), recipients=new_recipients, template_key=request.POST.get('new_template_key', '').strip(), custom_subject=request.POST.get('new_custom_subject', '').strip(), custom_body=request.POST.get('new_custom_body', '').strip(), custom_subject_en=request.POST.get('new_custom_subject_en', '').strip(), custom_body_en=request.POST.get('new_custom_body_en', '').strip(), include_pdf_attachment=request.POST.get('new_include_pdf') == 'on', is_active=True, sort_order=NotificationRule.objects.filter(event_type=new_event).count() + 1, ) _audit(request, 'notification_rules_saved', target_type='notification_rule') messages.success(request, 'Benachrichtigungsregeln wurden gespeichert.') return redirect('/admin-tools/integrations/?kind=emails') @_require_capability('delete_requests') @require_POST def delete_request_from_dashboard(request, kind: str, request_id: int): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') target_label = _request_target_label(obj, kind) obj.delete() _audit(request, 'request_deleted', target_type=kind, target_id=request_id, target_label=target_label) messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.') return redirect('requests_dashboard') @_require_capability('retry_requests') @require_POST def retry_request_from_dashboard(request, kind: str, request_id: int): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) obj.processing_status = 'submitted' obj.last_error = '' obj.save(update_fields=['processing_status', 'last_error']) process_onboarding_request.delay(obj.id) _audit(request, 'request_retried', target_type='onboarding', target_id=obj.id, target_label=_request_target_label(obj, 'onboarding')) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) obj.processing_status = 'submitted' obj.last_error = '' obj.save(update_fields=['processing_status', 'last_error']) process_offboarding_request.delay(obj.id) _audit(request, 'request_retried', target_type='offboarding', target_id=obj.id, target_label=_request_target_label(obj, 'offboarding')) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde erneut angestoßen.') return redirect('requests_dashboard')