Files
workdock-platform/backend/workflows/views.py
2026-03-27 12:30:10 +01:00

3367 lines
143 KiB
Python

from pathlib import Path
from datetime import timedelta
from tempfile import NamedTemporaryFile
import json
from io import BytesIO
from functools import wraps
from celery import current_app
from django.conf import settings
from django.db import connection
from django.db import IntegrityError
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect, render
from django.contrib import messages
from django.contrib.auth import get_user_model, login as auth_login
from django.contrib.auth.decorators import login_required
from django.contrib.auth.tokens import default_token_generator
from django.http import JsonResponse
from django.views.decorators.http import require_POST
from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import gettext as _, gettext_lazy
from django.utils.translation import get_language, override
from django.urls import reverse
from .app_registry import build_portal_app_sections, get_portal_app_registry_rows, normalize_portal_app_sort_orders
from .backup_ops import (
create_backup_bundle,
delete_backup_bundle,
latest_backup_health_snapshot,
list_backup_bundles,
verify_backup_bundle,
)
from .branding import get_branding_email_copy, get_company_email_domain, get_default_notification_templates, get_portal_trial_config, is_trial_expired
from .forms import AccountAvatarForm, AccountDetailsForm, AccountNotificationPreferencesForm, AccountTOTPDisableForm, AccountTOTPEnableForm, AccountTOTPRegenerateRecoveryCodesForm, AppLoginForm, AppTOTPChallengeForm, OffboardingRequestForm, OnboardingRequestForm, PortalBrandingForm, PortalCompanyConfigForm, PortalTrialConfigForm, UserManagementCreateForm
from .form_builder import (
DEFAULT_FIELD_ORDER,
FORM_PRESETS,
LOCKED_FIELD_RULES,
LOCKED_SECTION_RULES,
OFFBOARDING_PAGE_LABELS,
OFFBOARDING_PAGE_ORDER,
ONBOARDING_DEFAULT_PAGE,
ONBOARDING_PAGE_LABELS,
ONBOARDING_PAGE_ORDER,
ensure_form_field_configs,
ensure_form_section_configs,
get_default_page_map,
get_section_labels,
get_section_order,
apply_form_preset,
)
from .models import AdminAuditLog, AsyncTaskLog, EmployeeProfile, FormFieldConfig, FormOption, FormSectionConfig, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, PortalAppConfig, PortalBranding, PortalCompanyConfig, PortalTrialConfig, ScheduledWelcomeEmail, SystemEmailConfig, UserNotification, UserProfile, WorkflowConfig
from .emailing import send_system_email
from .notifications import notify_user
from .roles import ROLE_GROUP_NAMES, ROLE_LABELS, ROLE_PLATFORM_OWNER, ROLE_SUPER_ADMIN, assign_user_role, get_user_role_key, get_user_role_label, user_has_capability
from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud
from .totp import build_otpauth_uri, generate_recovery_codes, generate_totp_secret
from .tasks import (
_generate_onboarding_intro_pdf,
_generate_onboarding_intro_session_pdf,
build_intro_sections_for_request,
process_offboarding_request,
process_onboarding_request,
send_scheduled_welcome_email,
)
def _redirect_back(request, fallback: str):
target = (request.POST.get('next') or request.GET.get('next') or '').strip()
if target.startswith('/'):
return redirect(target)
referer = (request.META.get('HTTP_REFERER') or '').strip()
if referer.startswith('http://127.0.0.1') or referer.startswith('http://localhost') or referer.startswith('/'):
return redirect(referer)
return redirect(fallback)
@login_required
@require_POST
def mark_notification_read(request, notification_id: int):
notification = get_object_or_404(UserNotification, id=notification_id, user=request.user)
notification.mark_read()
return _redirect_back(request, 'home')
@login_required
@require_POST
def mark_all_notifications_read(request):
UserNotification.objects.filter(user=request.user, read_at__isnull=True).update(read_at=timezone.now())
return _redirect_back(request, 'home')
ONBOARDING_GROUPS = {
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
'employment-end-box': ['employment_end_date'],
'group-mailboxes-box': ['group_mailboxes'],
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
'extra-software-box': ['additional_software_multi', 'additional_software'],
'extra-access-box': ['additional_access_text'],
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
'phone-box': ['phone_number_choice'],
}
ONBOARDING_HIDDEN_BY_DEFAULT = {
'business-card-box',
'employment-end-box',
'group-mailboxes-box',
'extra-hardware-box',
'extra-software-box',
'extra-access-box',
'successor-box',
}
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
ONBOARDING_CHECKBOX_LISTS = {
'needed_devices_multi',
'additional_hardware_multi',
'needed_software_multi',
'additional_software_multi',
'needed_accesses_multi',
'needed_workspace_groups_multi',
'needed_resources_multi',
}
ONBOARDING_SECTION_ORDER = ['stammdaten', 'vertrag', 'itsetup', 'abschluss']
ONBOARDING_SECTION_META = {
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
}
def healthz(request):
db_ok = True
try:
with connection.cursor() as cursor:
cursor.execute('SELECT 1')
cursor.fetchone()
except Exception:
db_ok = False
status_code = 200 if db_ok else 503
return JsonResponse(
{
'status': 'ok' if db_ok else 'degraded',
'service': 'workdock',
'db': 'ok' if db_ok else 'error',
'time': timezone.now().isoformat(),
},
status=status_code,
)
def login_page(request):
if getattr(request.user, 'is_authenticated', False):
return redirect('home')
next_target = (request.POST.get('next') or request.GET.get('next') or '').strip()
form = AppLoginForm(request=request, data=request.POST or None)
if request.method == 'POST' and form.is_valid():
user = form.get_user()
profile, _ = UserProfile.objects.get_or_create(user=user)
safe_next = next_target if next_target.startswith('/') else ''
if profile.totp_enabled:
request.session['login_pending_user_id'] = user.pk
request.session['login_pending_backend'] = getattr(user, 'backend', '')
request.session['login_pending_next'] = safe_next
return redirect('login_totp')
auth_login(request, user, backend=getattr(user, 'backend', None))
now_ts = int(timezone.now().timestamp())
request.session['auth_fresh_ts'] = now_ts
request.session['last_activity_ts'] = now_ts
return redirect(safe_next or reverse('home'))
request.session.pop('login_pending_user_id', None)
request.session.pop('login_pending_backend', None)
request.session.pop('login_pending_next', None)
return render(
request,
'workflows/auth/login.html',
{
'form': form,
'next': next_target,
'login_step': 'password',
},
)
def login_totp_page(request):
if getattr(request.user, 'is_authenticated', False):
return redirect('home')
pending_user_id = request.session.get('login_pending_user_id')
backend_path = (request.session.get('login_pending_backend') or '').strip()
next_target = (request.session.get('login_pending_next') or '').strip()
if not pending_user_id:
return redirect('login')
user = get_object_or_404(get_user_model(), pk=pending_user_id)
profile, _ = UserProfile.objects.get_or_create(user=user)
if not profile.totp_enabled:
request.session.pop('login_pending_user_id', None)
request.session.pop('login_pending_backend', None)
request.session.pop('login_pending_next', None)
return redirect('login')
show_recovery = request.method == 'POST' and bool((request.POST.get('recovery_code') or '').strip())
form = AppTOTPChallengeForm(data=request.POST or None, profile=profile)
if request.method == 'POST' and form.is_valid():
auth_login(request, user, backend=backend_path or 'django.contrib.auth.backends.ModelBackend')
request.session.pop('login_pending_user_id', None)
request.session.pop('login_pending_backend', None)
request.session.pop('login_pending_next', None)
now_ts = int(timezone.now().timestamp())
request.session['auth_fresh_ts'] = now_ts
request.session['last_activity_ts'] = now_ts
return redirect(next_target if next_target.startswith('/') else reverse('home'))
return render(
request,
'workflows/auth/login.html',
{
'form': form,
'next': next_target,
'login_step': 'totp',
'login_totp_user': user,
'show_recovery_code': show_recovery,
},
)
@login_required
def account_profile_page(request):
session_secret_key = 'account_totp_pending_secret'
session_codes_key = 'account_totp_recovery_codes'
profile, created = UserProfile.objects.get_or_create(user=request.user)
recovery_codes = request.session.pop(session_codes_key, [])
pending_totp_secret = request.session.get(session_secret_key) or ''
if profile.totp_enabled:
pending_totp_secret = ''
request.session.pop(session_secret_key, None)
elif not pending_totp_secret:
pending_totp_secret = generate_totp_secret()
request.session[session_secret_key] = pending_totp_secret
avatar_form = AccountAvatarForm(instance=profile)
details_form = AccountDetailsForm(user=request.user, profile=profile)
notification_preferences_form = AccountNotificationPreferencesForm(profile=profile, user=request.user)
totp_enable_form = AccountTOTPEnableForm(user=request.user, secret=pending_totp_secret)
totp_disable_form = AccountTOTPDisableForm(user=request.user, profile=profile)
totp_regenerate_form = AccountTOTPRegenerateRecoveryCodesForm(user=request.user, profile=profile)
account_edit_open = False
notifications_edit_open = False
totp_edit_open = False
if request.method == 'POST':
form_kind = (request.POST.get('account_form') or '').strip()
if form_kind == 'avatar':
avatar_form = AccountAvatarForm(request.POST, request.FILES, instance=profile)
if avatar_form.is_valid():
avatar_form.save()
messages.success(request, _('Profilbild gespeichert.'))
return redirect('account_profile_page')
messages.error(request, _('Profilbild konnte nicht gespeichert werden.'))
elif form_kind == 'details':
account_edit_open = True
details_form = AccountDetailsForm(request.POST, user=request.user, profile=profile)
if details_form.is_valid():
details_form.save()
messages.success(request, _('Profildaten gespeichert.'))
return redirect('account_profile_page')
messages.error(request, _('Profildaten konnten nicht gespeichert werden.'))
elif form_kind == 'notification_preferences':
notifications_edit_open = True
notification_preferences_form = AccountNotificationPreferencesForm(request.POST, profile=profile, user=request.user)
if notification_preferences_form.is_valid():
notification_preferences_form.save()
messages.success(request, _('Benachrichtigungseinstellungen gespeichert.'))
return redirect('account_profile_page')
messages.error(request, _('Benachrichtigungseinstellungen konnten nicht gespeichert werden.'))
elif form_kind == 'totp_enable':
totp_edit_open = True
totp_enable_form = AccountTOTPEnableForm(request.POST, user=request.user, secret=pending_totp_secret)
if totp_enable_form.is_valid():
recovery_codes = generate_recovery_codes()
profile.enable_totp(pending_totp_secret, recovery_codes)
request.session[session_codes_key] = recovery_codes
request.session.pop(session_secret_key, None)
messages.success(request, _('TOTP wurde aktiviert.'))
return redirect('account_profile_page')
messages.error(request, _('TOTP konnte nicht aktiviert werden.'))
elif form_kind == 'totp_disable':
totp_edit_open = True
totp_disable_form = AccountTOTPDisableForm(request.POST, user=request.user, profile=profile)
if totp_disable_form.is_valid():
profile.disable_totp()
request.session.pop(session_secret_key, None)
messages.success(request, _('TOTP wurde deaktiviert.'))
return redirect('account_profile_page')
messages.error(request, _('TOTP konnte nicht deaktiviert werden.'))
elif form_kind == 'totp_regenerate_codes':
totp_edit_open = True
totp_regenerate_form = AccountTOTPRegenerateRecoveryCodesForm(request.POST, user=request.user, profile=profile)
if totp_regenerate_form.is_valid():
recovery_codes = generate_recovery_codes()
profile.set_recovery_codes(recovery_codes)
profile.save(update_fields=['totp_recovery_codes', 'updated_at'])
request.session[session_codes_key] = recovery_codes
messages.success(request, _('Recovery-Codes wurden neu erzeugt.'))
return redirect('account_profile_page')
messages.error(request, _('Recovery-Codes konnten nicht neu erzeugt werden.'))
branding_context = get_branding_email_copy()
totp_account_name = (request.user.email or request.user.username or '').strip()
totp_issuer = (branding_context.get('portal_title') or branding_context.get('company_name') or 'Workdock').strip()
totp_otpauth_uri = '' if profile.totp_enabled else build_otpauth_uri(pending_totp_secret, account_name=totp_account_name, issuer=totp_issuer)
totp_qr_svg = ''
if totp_otpauth_uri:
try:
import qrcode
import qrcode.image.svg
qr_image = qrcode.make(totp_otpauth_uri, image_factory=qrcode.image.svg.SvgPathImage)
stream = BytesIO()
qr_image.save(stream)
totp_qr_svg = stream.getvalue().decode('utf-8')
except Exception:
totp_qr_svg = ''
return render(
request,
'workflows/account_profile.html',
{
'account_user': request.user,
'account_user_profile': profile,
'avatar_form': avatar_form,
'details_form': details_form,
'notification_preferences_form': notification_preferences_form,
'notification_preference_groups': notification_preferences_form.grouped_fields(),
'totp_enable_form': totp_enable_form,
'totp_disable_form': totp_disable_form,
'totp_regenerate_form': totp_regenerate_form,
'account_edit_open': account_edit_open,
'notifications_edit_open': notifications_edit_open,
'totp_edit_open': totp_edit_open,
'role_label': get_user_role_label(request.user),
'totp_pending_secret': pending_totp_secret,
'totp_otpauth_uri': totp_otpauth_uri,
'totp_qr_svg': totp_qr_svg,
'totp_recovery_codes': recovery_codes,
},
)
def _require_capability(capability: str):
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped(request, *args, **kwargs):
if not user_has_capability(request.user, capability):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
return view_func(request, *args, **kwargs)
return wrapped
return decorator
def _display_user_name(user) -> str:
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
username = (getattr(user, 'username', '') or '').strip()
if username:
return username
return (getattr(user, 'email', '') or '').strip()
def _audit(
request,
action: str,
*,
target_type: str = '',
target_id: int | None = None,
target_label: str = '',
details: dict | None = None,
) -> None:
if not getattr(request, 'user', None) or not request.user.is_authenticated:
return
AdminAuditLog.objects.create(
actor=request.user,
actor_display=_display_user_name(request.user),
action=action,
target_type=target_type,
target_id=target_id,
target_label=target_label,
details=details or {},
)
def _form_field_labels(form_type: str) -> dict[str, str]:
if form_type == 'onboarding':
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
if form_type == 'offboarding':
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
return {}
def _request_target_label(obj, kind: str | None = None) -> str:
request_kind = (kind or '').strip()
if not request_kind:
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
email = (getattr(obj, 'work_email', '') or '').strip()
created_at = getattr(obj, 'created_at', None)
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
parts = [request_kind.capitalize(), name]
if email:
parts.append(f'<{email}>')
if date_label:
parts.append(date_label)
return ' | '.join(parts)
def _request_status_label(status_key: str, language_code: str | None = None) -> str:
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
with override(lang):
labels = {
'submitted': _('Eingereicht'),
'processing': _('In Bearbeitung'),
'completed': _('Abgeschlossen'),
'failed': _('Fehlgeschlagen'),
}
return labels.get(status_key, status_key)
def _audit_action_label(action: str) -> str:
labels = {
'requests_deleted': _('Vorgänge gelöscht'),
'request_deleted': _('Vorgang gelöscht'),
'request_retried': _('Vorgang erneut angestoßen'),
'intro_pdf_generated': _('Einweisungs-PDF erzeugt'),
'intro_live_pdf_generated': _('Live-Protokoll erzeugt'),
'intro_session_reset': _('Einweisung zurückgesetzt'),
'intro_session_saved': _('Einweisung als Entwurf gespeichert'),
'intro_session_completed': _('Einweisung abgeschlossen'),
'form_option_deleted': _('Formularoption gelöscht'),
'form_options_saved': _('Formularoptionen gespeichert'),
'form_field_texts_saved': _('Feldtexte gespeichert'),
'form_layout_saved': _('Formularlayout gespeichert'),
'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'),
'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'),
'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'),
'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'),
'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'),
'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'),
'welcome_email_paused': _('Welcome E-Mail pausiert'),
'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'),
'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'),
'smtp_test_sent': _('SMTP-Test gesendet'),
'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'),
'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'),
'email_mode_toggled': _('E-Mail-Modus umgeschaltet'),
'integrations_saved': _('Integrationen gespeichert'),
'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'),
'mail_settings_saved': _('Mail-Einstellungen gespeichert'),
'email_routing_saved': _('E-Mail-Routing gespeichert'),
'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'),
'user_created': _('Benutzer erstellt'),
'user_updated': _('Benutzer aktualisiert'),
'user_password_reset_sent': _('Passwort-Reset-Link versendet'),
'user_deleted': _('Benutzer gelöscht'),
'backup_created': _('Backup erstellt'),
'backup_verified': _('Backup verifiziert'),
'backup_deleted': _('Backup gelöscht'),
'backup_settings_saved': _('Backup-Einstellungen gespeichert'),
'portal_app_registry_saved': _('App-Registry gespeichert'),
}
return labels.get(action, action.replace('_', ' ').strip().capitalize())
def _translate_choice_list(choices):
return [(value, str(label)) for value, label in choices]
def _build_onboarding_layout(form) -> list[dict]:
ordered_names = list(form.fields.keys())
group_by_field = {}
for group_id, group_fields in ONBOARDING_GROUPS.items():
for name in group_fields:
group_by_field[name] = group_id
rendered_groups = set()
consumed = set()
blocks = []
for field_name in ordered_names:
if field_name in consumed:
continue
group_id = group_by_field.get(field_name)
if group_id:
if group_id in rendered_groups:
continue
group_fields = [
form[name]
for name in ONBOARDING_GROUPS[group_id]
if name in form.fields
]
if not group_fields:
continue
blocks.append(
{
'kind': 'group',
'id': group_id,
'hidden_default': group_id in ONBOARDING_HIDDEN_BY_DEFAULT,
'fields': group_fields,
}
)
rendered_groups.add(group_id)
consumed.update([f.name for f in group_fields])
continue
blocks.append({'kind': 'field', 'field': form[field_name]})
consumed.add(field_name)
return blocks
def _section_for_block(block: dict, field_pages: dict[str, str]) -> str:
if block['kind'] == 'field':
return field_pages.get(block['field'].name, 'abschluss')
fields = block.get('fields') or []
if not fields:
return 'abschluss'
return field_pages.get(fields[0].name, 'abschluss')
def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]:
grouped = {key: [] for key in ONBOARDING_SECTION_ORDER}
for block in blocks:
section_key = _section_for_block(block, field_pages)
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(block)
visible_keys = visible_section_keys or set(ONBOARDING_SECTION_ORDER)
return [
{
'key': key,
'title': ONBOARDING_SECTION_META[key]['title'],
'subtitle': ONBOARDING_SECTION_META[key]['subtitle'],
'blocks': grouped[key],
}
for key in ONBOARDING_SECTION_ORDER
if key in visible_keys
]
OFFBOARDING_SECTION_META = {
'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')},
'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')},
}
def _build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]:
field_pages = getattr(form, '_field_page_keys', {})
grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER}
for field_name in form.fields.keys():
section_key = field_pages.get(field_name, 'abschluss')
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(form[field_name])
visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER)
return [
{
'key': key,
'title': OFFBOARDING_SECTION_META[key]['title'],
'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'],
'fields': grouped[key],
}
for key in OFFBOARDING_PAGE_ORDER
if key in visible_keys and grouped[key]
]
@login_required
def home(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
role_key = get_user_role_key(request.user)
return render(
request,
'workflows/home.html',
{
'nextcloud_enabled': is_nextcloud_enabled(),
'email_test_mode': is_email_test_mode(),
'workflow_config': config,
'role_label': get_user_role_label(request.user),
'role_key': role_key,
'portal_app_sections': build_portal_app_sections(request.user),
},
)
@_require_capability('manage_app_registry')
def portal_app_registry_page(request):
return render(
request,
'workflows/app_registry.html',
{
'rows': get_portal_app_registry_rows(),
'section_choices': _translate_choice_list(PortalAppConfig.SECTION_CHOICES),
},
)
@_require_capability('view_job_monitor')
def job_monitor_page(request):
status_filter = (request.GET.get('status') or '').strip()
task_filter = (request.GET.get('task') or '').strip()
logs = AsyncTaskLog.objects.all()
if status_filter:
logs = logs.filter(status=status_filter)
if task_filter:
logs = logs.filter(task_name=task_filter)
logs = logs.order_by('-started_at', '-id')[:200]
task_names = list(AsyncTaskLog.objects.order_by('task_name').values_list('task_name', flat=True).distinct())
return render(
request,
'workflows/job_monitor.html',
{
'logs': logs,
'status_filter': status_filter,
'task_filter': task_filter,
'task_names': task_names,
'status_choices': [('started', _('Gestartet')), ('succeeded', _('Erfolgreich')), ('failed', _('Fehlgeschlagen'))],
},
)
@_require_capability('manage_app_registry')
@require_POST
def save_portal_app_registry(request):
rows = get_portal_app_registry_rows()
updated_configs = []
for row in rows:
config = row['config']
key = config.key
config.section = (request.POST.get(f'section__{key}') or config.section).strip()
if config.section not in dict(PortalAppConfig.SECTION_CHOICES):
config.section = row['default_section']
config.is_enabled = request.POST.get(f'is_enabled__{key}') == 'on'
config.visible_to_super_admin = request.POST.get(f'visible_to_super_admin__{key}') == 'on'
config.visible_to_admin = request.POST.get(f'visible_to_admin__{key}') == 'on'
config.visible_to_it_staff = request.POST.get(f'visible_to_it_staff__{key}') == 'on'
config.visible_to_staff = request.POST.get(f'visible_to_staff__{key}') == 'on'
try:
config.sort_order = int((request.POST.get(f'sort_order__{key}') or '').strip() or row['default_sort_order'])
except ValueError:
config.sort_order = row['default_sort_order']
config.title_override = (request.POST.get(f'title_override__{key}') or '').strip()
config.title_override_en = (request.POST.get(f'title_override_en__{key}') or '').strip()
config.description_override = (request.POST.get(f'description_override__{key}') or '').strip()
config.description_override_en = (request.POST.get(f'description_override_en__{key}') or '').strip()
config.action_label_override = (request.POST.get(f'action_label_override__{key}') or '').strip()
config.action_label_override_en = (request.POST.get(f'action_label_override_en__{key}') or '').strip()
config.save()
updated_configs.append(config)
normalize_portal_app_sort_orders()
_audit(
request,
'portal_app_registry_saved',
target_type='portal_app_registry',
target_label='Portal App Registry',
details={'updated_apps': len(rows)},
)
messages.success(request, _('App-Registry gespeichert.'))
return redirect('portal_app_registry_page')
def _user_management_rows():
user_model = get_user_model()
role_order = {
ROLE_PLATFORM_OWNER: 0,
ROLE_SUPER_ADMIN: 0,
'admin': 1,
'it_staff': 2,
'staff': 3,
}
rows = []
for user in user_model.objects.all().order_by('-is_active', 'username'):
role_key = get_user_role_key(user)
rows.append(
{
'user': user,
'role_key': role_key,
'role_label': str(ROLE_LABELS[role_key]),
'role_sort': role_order.get(role_key, 99),
'display_name': _display_user_name(user),
}
)
rows.sort(key=lambda item: (not item['user'].is_active, item['role_sort'], item['user'].username.lower()))
return rows
def _render_user_management(request, create_form=None, status_code: int = 200):
recent_user_events = list(
AdminAuditLog.objects.select_related('actor')
.filter(action__in=['user_created', 'user_updated', 'user_password_reset_sent', 'user_deleted'])
.order_by('-created_at', '-id')[:12]
)
for row in recent_user_events:
row.action_label = _audit_action_label(row.action)
role_key = (row.details or {}).get('role')
row.role_label = str(ROLE_LABELS[role_key]) if role_key in ROLE_LABELS else role_key
include_product_owner = get_user_role_key(request.user) == ROLE_PLATFORM_OWNER
return render(
request,
'workflows/user_management.html',
{
'create_form': create_form or UserManagementCreateForm(include_product_owner=include_product_owner),
'rows': _user_management_rows(),
'role_choices': [
(key, str(ROLE_LABELS[key]))
for key in ROLE_GROUP_NAMES
if include_product_owner or key != ROLE_PLATFORM_OWNER
],
'include_product_owner': include_product_owner,
'recent_user_events': recent_user_events,
},
status=status_code,
)
def _platform_owner_user_count() -> int:
user_model = get_user_model()
return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_PLATFORM_OWNER and user.is_active)
def _super_admin_user_count() -> int:
user_model = get_user_model()
return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_SUPER_ADMIN and user.is_active)
def _would_remove_last_super_admin(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool:
if get_user_role_key(user) != ROLE_SUPER_ADMIN or not user.is_active:
return False
if _super_admin_user_count() > 1:
return False
if deleting:
return True
if new_role_key is not None and new_role_key != ROLE_SUPER_ADMIN:
return True
if new_is_active is not None and not new_is_active:
return True
return False
def _would_remove_last_platform_owner(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool:
if get_user_role_key(user) != ROLE_PLATFORM_OWNER or not user.is_active:
return False
if _platform_owner_user_count() > 1:
return False
if deleting:
return True
if new_role_key is not None and new_role_key != ROLE_PLATFORM_OWNER:
return True
if new_is_active is not None and not new_is_active:
return True
return False
def _send_user_access_email(request, target_user, *, invitation: bool) -> None:
email = (target_user.email or '').strip()
if not email:
raise ValueError(_('Für diesen Benutzer ist keine E-Mail-Adresse hinterlegt.'))
uid = urlsafe_base64_encode(force_bytes(target_user.pk))
token = default_token_generator.make_token(target_user)
reset_path = reverse('password_reset_confirm', kwargs={'uidb64': uid, 'token': token})
reset_url = request.build_absolute_uri(reset_path)
branding_copy = get_branding_email_copy()
if invitation:
subject = _('Zugangseinladung für %(username)s') % {'username': target_user.username}
body = _(
'Hallo %(name)s,\n\n'
'für Sie wurde ein Benutzerkonto im %(portal_title)s angelegt.\n'
'Bitte öffnen Sie den folgenden Link, um Ihr Passwort zu setzen:\n'
'%(url)s\n\n'
'Wenn Sie diese Einladung nicht erwartet haben, melden Sie sich bitte bei Ihrem Administrator.'
) % {
'name': _display_user_name(target_user),
'portal_title': branding_copy['portal_title'],
'url': reset_url,
}
else:
subject = _('Passwort zurücksetzen für %(username)s') % {'username': target_user.username}
body = _(
'Hallo %(name)s,\n\n'
'für Ihr Konto wurde ein Link zum Zurücksetzen des Passworts erstellt.\n'
'Bitte öffnen Sie den folgenden Link:\n'
'%(url)s\n\n'
'Wenn Sie diese Anfrage nicht erwartet haben, können Sie diese E-Mail ignorieren.'
) % {
'name': _display_user_name(target_user),
'url': reset_url,
}
send_system_email(subject=subject, body=body, to=[email])
@_require_capability('manage_users')
def user_management_page(request):
return _render_user_management(request)
@_require_capability('manage_product_branding')
def portal_branding_page(request):
branding, created = PortalBranding.objects.get_or_create(name='Default')
form = PortalBrandingForm(instance=branding)
return render(
request,
'workflows/branding_settings.html',
{
'form': form,
'branding': branding,
'branding_sections': _build_branding_sections(form, branding),
'editing_branding_section': '',
},
)
@_require_capability('manage_product_branding')
@require_POST
def save_portal_branding(request):
branding, created = PortalBranding.objects.get_or_create(name='Default')
section_key = (request.POST.get('section_key') or '').strip()
data = request.POST.copy()
for field_name in PortalBrandingForm.Meta.fields:
if field_name not in data:
field = PortalBranding._meta.get_field(field_name)
if getattr(field, 'many_to_many', False):
continue
if getattr(field, 'null', False) and getattr(branding, field_name, None) is None:
data[field_name] = ''
else:
data[field_name] = getattr(branding, field_name, '') or ''
form = PortalBrandingForm(data, request.FILES, instance=branding)
if not form.is_valid():
messages.error(request, _('Branding konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.'))
return render(
request,
'workflows/branding_settings.html',
{
'form': form,
'branding': branding,
'branding_sections': _build_branding_sections(form, branding),
'editing_branding_section': section_key,
},
status=400,
)
branding = form.save()
_audit(
request,
'portal_branding_saved',
target_type='portal_branding',
target_id=branding.id,
target_label=branding.portal_title,
details={
'company_name': branding.company_name,
'support_email': branding.support_email,
'default_language': branding.default_language,
'has_custom_logo': bool(branding.logo_image),
'has_custom_letterhead': bool(branding.pdf_letterhead),
},
)
messages.success(request, _('Portal-Branding wurde gespeichert.'))
return render(
request,
'workflows/branding_settings.html',
{
'form': PortalBrandingForm(instance=branding),
'branding': branding,
'branding_sections': _build_branding_sections(PortalBrandingForm(instance=branding), branding),
'editing_branding_section': '',
},
)
def _build_branding_sections(form, branding):
sections = [
{
'key': 'identity',
'title': _('Identität'),
'subtitle': _('Titel, Firmenname und zentrale Spracheinstellungen.'),
'fields': ['portal_title', 'company_name', 'company_domain', 'default_language', 'login_subtitle'],
'field_full': {'login_subtitle'},
'hint_map': {
'company_domain': _('Wird für E-Mail-Vorschläge und Domain-bezogene Standardtexte verwendet, z. B. workdock.de.'),
},
},
{
'key': 'appearance',
'title': _('Farben & Erscheinungsbild'),
'subtitle': _('Zentrale visuelle Markenwerte und Browser-Icon.'),
'fields': ['primary_color', 'secondary_color', 'logo_image', 'favicon_image'],
'field_full': set(),
'hint_map': {
'logo_image': _('Erlaubte Formate: SVG, PNG, JPG, JPEG, WEBP. Maximal 5 MB.'),
'favicon_image': _('Erlaubte Formate: ICO, PNG, SVG, WEBP. Maximal 2 MB.'),
},
},
{
'key': 'communication',
'title': _('Kommunikation'),
'subtitle': _('Absender, Support und PDF-Branding für ausgehende Kommunikation.'),
'fields': ['support_email', 'sender_display_name', 'pdf_letterhead'],
'field_full': {'pdf_letterhead'},
'hint_map': {
'sender_display_name': _('Wird für ausgehende System-E-Mails als Anzeigename verwendet.'),
'pdf_letterhead': _('Erlaubtes Format: PDF. Maximal 10 MB.'),
},
},
{
'key': 'legal',
'title': _('Footer & Rechtliches'),
'subtitle': _('Gemeinsame Footer-Texte und rechtliche Hinweise für die Shell.'),
'fields': ['footer_text', 'legal_notice', 'footer_text_en', 'legal_notice_en'],
'field_full': {'legal_notice', 'legal_notice_en'},
'hint_map': {},
},
]
for section in sections:
rows = []
for field_name in section['fields']:
field = form[field_name]
value = getattr(branding, field_name, '') or ''
is_file = bool(getattr(field.field.widget, 'input_type', '') == 'file')
rows.append(
{
'name': field_name,
'bound_field': field,
'label': field.label,
'value': value,
'is_file': is_file,
'is_full': field_name in section.get('field_full', set()),
'hint': section.get('hint_map', {}).get(field_name, ''),
}
)
section['rows'] = rows
return sections
@_require_capability('manage_company_config')
def portal_company_config_page(request):
company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default')
form = PortalCompanyConfigForm(instance=company_config)
return render(
request,
'workflows/company_config.html',
{
'form': form,
'company_config': company_config,
'company_config_sections': _build_company_config_sections(form, company_config),
'editing_company_section': '',
},
)
@_require_capability('manage_company_config')
@require_POST
def save_portal_company_config(request):
company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default')
section_key = (request.POST.get('section_key') or '').strip()
data = request.POST.copy()
for field_name in PortalCompanyConfigForm.Meta.fields:
if field_name not in data:
data[field_name] = getattr(company_config, field_name, '') or ''
form = PortalCompanyConfigForm(data, instance=company_config)
if not form.is_valid():
messages.error(request, _('Firmenkonfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.'))
return render(
request,
'workflows/company_config.html',
{
'form': form,
'company_config': company_config,
'company_config_sections': _build_company_config_sections(form, company_config),
'editing_company_section': section_key,
},
status=400,
)
company_config = form.save()
_audit(
request,
'portal_company_config_saved',
target_type='portal_company_config',
target_id=company_config.id,
target_label=company_config.legal_company_name or 'Default',
details={
'website_url': company_config.website_url,
'imprint_url': company_config.imprint_url,
'privacy_url': company_config.privacy_url,
'hr_contact_email': company_config.hr_contact_email,
'it_contact_email': company_config.it_contact_email,
'operations_contact_email': company_config.operations_contact_email,
},
)
messages.success(request, _('Firmenkonfiguration wurde gespeichert.'))
return render(
request,
'workflows/company_config.html',
{
'form': PortalCompanyConfigForm(instance=company_config),
'company_config': company_config,
'company_config_sections': _build_company_config_sections(PortalCompanyConfigForm(instance=company_config), company_config),
'editing_company_section': '',
},
)
def _build_company_config_sections(form, company_config):
sections = [
{
'key': 'profile',
'title': _('Firmenprofil'),
'subtitle': _('Rechtlicher Name und zentrale Stammdaten der Firma.'),
'fields': ['legal_company_name', 'phone_number', 'website_url', 'country'],
},
{
'key': 'address',
'title': _('Adresse & Register'),
'subtitle': _('Anschrift sowie optionale Register- und Steuerangaben.'),
'fields': ['street_address', 'postal_code', 'city', 'registration_number', 'vat_id'],
},
{
'key': 'contacts',
'title': _('Kontaktpunkte'),
'subtitle': _('Zentrale Ansprechpartner für HR, IT und Operations.'),
'fields': ['hr_contact_email', 'it_contact_email', 'operations_contact_email'],
},
{
'key': 'public',
'title': _('Recht & Öffentlichkeit'),
'subtitle': _('Öffentliche Links für Website, Impressum und Datenschutz.'),
'fields': ['imprint_url', 'privacy_url'],
'hint': _('Diese Links können später im Portal-Footer oder in öffentlichen Seiten verwendet werden.'),
},
]
for section in sections:
rows = []
for field_name in section['fields']:
field = form[field_name]
rows.append(
{
'name': field_name,
'bound_field': field,
'label': field.label,
'value': getattr(company_config, field_name, '') or '',
}
)
section['rows'] = rows
return sections
@_require_capability('manage_trial_lifecycle')
def portal_trial_config_page(request):
trial_config = get_portal_trial_config()
form = PortalTrialConfigForm(instance=trial_config)
return render(
request,
'workflows/trial_management.html',
{
'form': form,
'trial_config': trial_config,
'trial_is_expired': is_trial_expired(),
},
)
@_require_capability('manage_trial_lifecycle')
@require_POST
def save_portal_trial_config(request):
trial_config = get_portal_trial_config()
form = PortalTrialConfigForm(request.POST, instance=trial_config)
if not form.is_valid():
messages.error(request, _('Trial-Konfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.'))
return render(
request,
'workflows/trial_management.html',
{
'form': form,
'trial_config': trial_config,
'trial_is_expired': is_trial_expired(),
},
status=400,
)
trial_config = form.save()
_audit(
request,
'portal_trial_config_saved',
target_type='portal_trial_config',
target_id=trial_config.id,
target_label='Default',
details={
'is_trial_mode': trial_config.is_trial_mode,
'trial_started_at': trial_config.trial_started_at.isoformat() if trial_config.trial_started_at else '',
'trial_expires_at': trial_config.trial_expires_at.isoformat() if trial_config.trial_expires_at else '',
'restrict_production_integrations': trial_config.restrict_production_integrations,
'auto_cleanup_enabled': trial_config.auto_cleanup_enabled,
},
)
if trial_config.is_trial_mode and trial_config.trial_expires_at:
remaining = trial_config.trial_expires_at - timezone.now()
if remaining.total_seconds() <= 0:
notify_user(
user=request.user,
title=_('Trial ist abgelaufen'),
body=_('Der Trial-Zeitraum ist überschritten. Nicht-Platform-Owner werden jetzt blockiert.'),
level=UserNotification.LEVEL_WARNING,
link_url='/admin-tools/trial/',
event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS,
)
elif remaining <= timedelta(days=7):
notify_user(
user=request.user,
title=_('Trial läuft bald ab'),
body=_('Der Trial endet am %(date)s.') % {'date': timezone.localtime(trial_config.trial_expires_at).strftime('%d.%m.%Y %H:%M')},
level=UserNotification.LEVEL_WARNING,
link_url='/admin-tools/trial/',
event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS,
)
elif not trial_config.is_trial_mode:
notify_user(
user=request.user,
title=_('Trial-Modus deaktiviert'),
body=_('Der Trial-Modus wurde ausgeschaltet.'),
level=UserNotification.LEVEL_INFO,
link_url='/admin-tools/trial/',
event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS,
)
messages.success(request, _('Trial-Konfiguration wurde gespeichert.'))
return render(
request,
'workflows/trial_management.html',
{
'form': PortalTrialConfigForm(instance=trial_config),
'trial_config': trial_config,
'trial_is_expired': is_trial_expired(),
},
)
@_require_capability('manage_users')
@require_POST
def create_user_from_admin(request):
form = UserManagementCreateForm(request.POST, include_product_owner=(get_user_role_key(request.user) == ROLE_PLATFORM_OWNER))
if not form.is_valid():
messages.error(request, _('Benutzer konnte nicht erstellt werden. Bitte prüfen Sie die Eingaben.'))
return _render_user_management(request, create_form=form, status_code=400)
user = form.save()
_send_user_access_email(request, user, invitation=True)
_audit(
request,
'user_created',
target_type='user',
target_id=user.id,
target_label=_display_user_name(user),
details={'username': user.username, 'role': get_user_role_key(user), 'invitation_sent': True},
)
messages.success(request, _('Benutzer wurde erstellt und eingeladen: %(username)s') % {'username': user.username})
return redirect('user_management_page')
@_require_capability('manage_users')
@require_POST
def update_user_from_admin(request, user_id: int):
user_model = get_user_model()
target_user = get_object_or_404(user_model, id=user_id)
role_key = (request.POST.get('role_key') or '').strip()
is_active = request.POST.get('is_active') == 'on'
new_password = (request.POST.get('new_password') or '').strip()
if role_key not in ROLE_GROUP_NAMES:
messages.error(request, _('Ungültige Rolle.'))
return redirect('user_management_page')
if role_key == ROLE_PLATFORM_OWNER and get_user_role_key(request.user) != ROLE_PLATFORM_OWNER:
messages.error(request, _('Nur Platform Owner dürfen diese Rolle vergeben.'))
return redirect('user_management_page')
current_role = get_user_role_key(request.user)
if target_user == request.user and current_role == ROLE_PLATFORM_OWNER and (role_key != ROLE_PLATFORM_OWNER or not is_active):
messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst sperren oder herabstufen.'))
return redirect('user_management_page')
if target_user == request.user and current_role == ROLE_SUPER_ADMIN and (role_key != ROLE_SUPER_ADMIN or not is_active):
messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst sperren oder herabstufen.'))
return redirect('user_management_page')
if _would_remove_last_platform_owner(target_user, new_role_key=role_key, new_is_active=is_active):
messages.error(request, _('Der letzte aktive Platform Owner kann nicht deaktiviert oder herabgestuft werden.'))
return redirect('user_management_page')
if _would_remove_last_super_admin(target_user, new_role_key=role_key, new_is_active=is_active):
messages.error(request, _('Der letzte aktive Super Admin kann nicht deaktiviert oder herabgestuft werden.'))
return redirect('user_management_page')
assign_user_role(target_user, role_key)
target_user.is_active = is_active
if new_password:
target_user.set_password(new_password)
target_user.save()
_audit(
request,
'user_updated',
target_type='user',
target_id=target_user.id,
target_label=_display_user_name(target_user),
details={'username': target_user.username, 'role': role_key, 'is_active': is_active, 'password_changed': bool(new_password)},
)
messages.success(request, _('Benutzer wurde aktualisiert: %(username)s') % {'username': target_user.username})
return redirect('user_management_page')
@_require_capability('manage_users')
@require_POST
def send_password_reset_from_admin(request, user_id: int):
user_model = get_user_model()
target_user = get_object_or_404(user_model, id=user_id)
try:
_send_user_access_email(request, target_user, invitation=False)
except ValueError as exc:
messages.error(request, str(exc))
return redirect('user_management_page')
_audit(
request,
'user_password_reset_sent',
target_type='user',
target_id=target_user.id,
target_label=_display_user_name(target_user),
details={'username': target_user.username, 'email': target_user.email},
)
messages.success(request, _('Passwort-Reset-Link wurde versendet: %(username)s') % {'username': target_user.username})
return redirect('user_management_page')
@_require_capability('manage_users')
@require_POST
def delete_user_from_admin(request, user_id: int):
user_model = get_user_model()
target_user = get_object_or_404(user_model, id=user_id)
current_role = get_user_role_key(request.user)
if target_user == request.user and current_role == ROLE_PLATFORM_OWNER:
messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst löschen.'))
return redirect('user_management_page')
if target_user == request.user:
messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst löschen.'))
return redirect('user_management_page')
if _would_remove_last_platform_owner(target_user, deleting=True):
messages.error(request, _('Der letzte aktive Platform Owner kann nicht gelöscht werden.'))
return redirect('user_management_page')
if _would_remove_last_super_admin(target_user, deleting=True):
messages.error(request, _('Der letzte aktive Super Admin kann nicht gelöscht werden.'))
return redirect('user_management_page')
target_label = _display_user_name(target_user)
username = target_user.username
target_user.delete()
_audit(
request,
'user_deleted',
target_type='user',
target_label=target_label,
details={'username': username},
)
messages.success(request, _('Benutzer wurde gelöscht: %(username)s') % {'username': username})
return redirect('user_management_page')
@_require_capability('view_docs')
def handbook_page(request):
return render(request, 'workflows/handbook.html')
@_require_capability('view_docs')
def project_wiki_page(request):
return render(request, 'workflows/project_wiki.html')
@_require_capability('view_docs')
def developer_handbook_page(request):
return render(request, 'workflows/developer_handbook.html')
@_require_capability('view_docs')
def release_checklist_page(request):
return render(request, 'workflows/release_checklist.html')
@_require_capability('view_audit_log')
def audit_log_page(request):
action = (request.GET.get('action') or '').strip()
user_query = (request.GET.get('user') or '').strip()
date_from = (request.GET.get('date_from') or '').strip()
date_to = (request.GET.get('date_to') or '').strip()
rows_qs = AdminAuditLog.objects.select_related('actor').all()
if action:
rows_qs = rows_qs.filter(action=action)
if user_query:
rows_qs = rows_qs.filter(
Q(actor_display__icontains=user_query)
| Q(actor__username__icontains=user_query)
| Q(actor__email__icontains=user_query)
)
if date_from:
rows_qs = rows_qs.filter(created_at__date__gte=date_from)
if date_to:
rows_qs = rows_qs.filter(created_at__date__lte=date_to)
rows = list(rows_qs[:300])
action_choices = (
AdminAuditLog.objects.order_by('action').values_list('action', flat=True).distinct()
)
return render(
request,
'workflows/audit_log.html',
{
'rows': rows,
'action_choices': action_choices,
'selected_action': action,
'user_query': user_query,
'date_from': date_from,
'date_to': date_to,
},
)
@_require_capability('manage_backups')
def backup_recovery_page(request):
rows = list_backup_bundles()
return render(
request,
'workflows/backup_recovery.html',
{
'rows': rows,
'backup_health': latest_backup_health_snapshot(),
},
)
@_require_capability('manage_backups')
@require_POST
def create_backup_from_admin(request):
try:
result = create_backup_bundle()
_audit(
request,
'backup_created',
target_type='backup_bundle',
target_label=result['name'],
details={'path': result['path']},
)
notify_user(
user=request.user,
title=_('Backup erstellt: %(name)s') % {'name': result['name']},
body=_('Das Backup-Bundle wurde erfolgreich erstellt.'),
level=UserNotification.LEVEL_SUCCESS,
link_url='/admin-tools/backups/',
event_key=UserProfile.NOTIFICATION_BACKUP_SUCCESS,
)
messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']})
except Exception as exc:
notify_user(
user=request.user,
title=_('Backup fehlgeschlagen'),
body=str(exc),
level=UserNotification.LEVEL_ERROR,
link_url='/admin-tools/backups/',
event_key=UserProfile.NOTIFICATION_BACKUP_FAILURE,
)
messages.error(request, _('Backup konnte nicht erstellt werden: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('manage_backups')
@require_POST
def verify_backup_from_admin(request, backup_name: str):
try:
result = verify_backup_bundle(backup_name)
_audit(
request,
'backup_verified',
target_type='backup_bundle',
target_label=backup_name,
details={'summary': result['summary']},
)
notify_user(
user=request.user,
title=_('Backup verifiziert: %(name)s') % {'name': result['name']},
body=result.get('summary') or _('Das Backup wurde erfolgreich verifiziert.'),
level=UserNotification.LEVEL_SUCCESS,
link_url='/admin-tools/backups/',
event_key=UserProfile.NOTIFICATION_BACKUP_SUCCESS,
)
messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']})
except Exception as exc:
notify_user(
user=request.user,
title=_('Backup-Verifikation fehlgeschlagen'),
body=str(exc),
level=UserNotification.LEVEL_ERROR,
link_url='/admin-tools/backups/',
event_key=UserProfile.NOTIFICATION_BACKUP_FAILURE,
)
messages.error(request, _('Backup-Verifikation fehlgeschlagen: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('manage_backups')
@require_POST
def delete_backup_from_admin(request, backup_name: str):
try:
result = delete_backup_bundle(backup_name)
_audit(
request,
'backup_deleted',
target_type='backup_bundle',
target_label=backup_name,
details={},
)
messages.success(request, _('Backup wurde gelöscht: %(name)s') % {'name': result['name']})
except Exception as exc:
messages.error(request, _('Backup konnte nicht gelöscht werden: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('view_request_timeline')
def request_timeline_page(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
request_label = _request_target_label(obj, kind)
audit_rows = list(
AdminAuditLog.objects.select_related('actor')
.filter(target_type__in=[kind, 'request'])
.filter(Q(target_id=request_id) | Q(target_label__icontains=(obj.full_name or '').strip()))
.order_by('-created_at', '-id')[:200]
)
timeline_rows = [
{
'created_at': obj.created_at,
'kind': 'system',
'title': _('Anfrage erstellt'),
'summary': request_label,
'meta': _('Status: %(status)s') % {'status': obj.get_processing_status_display()},
}
]
contract_start = getattr(obj, 'contract_start', None)
if contract_start:
timeline_rows.append(
{
'created_at': timezone.make_aware(timezone.datetime.combine(contract_start, timezone.datetime.min.time())),
'kind': 'milestone',
'title': _('Vertragsbeginn'),
'summary': str(contract_start),
'meta': _('Geplanter Start'),
}
)
handover_date = getattr(obj, 'handover_date', None)
if handover_date:
timeline_rows.append(
{
'created_at': timezone.make_aware(timezone.datetime.combine(handover_date, timezone.datetime.min.time())),
'kind': 'milestone',
'title': _('Geräteübergabe / Hardware-Abholung'),
'summary': str(handover_date),
'meta': _('Geplanter Hardware-Termin'),
}
)
if getattr(obj, 'generated_pdf_path', ''):
timeline_rows.append(
{
'created_at': obj.created_at,
'kind': 'document',
'title': _('PDF verfügbar'),
'summary': Path(obj.generated_pdf_path).name,
'meta': '',
'url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}",
}
)
for row in audit_rows:
timeline_rows.append(
{
'created_at': row.created_at,
'kind': 'audit',
'title': _audit_action_label(row.action),
'summary': row.target_label or row.target_type or '-',
'meta': row.actor_display or '-',
'details': row.details,
}
)
if kind == 'onboarding':
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
if intro_session:
timeline_rows.append(
{
'created_at': intro_session.updated_at,
'kind': 'session',
'title': _('Einweisungssitzung'),
'summary': intro_session.get_status_display(),
'meta': intro_session.completed_by_name or '-',
'url': (f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" if intro_session.exported_pdf_path else ''),
}
)
welcome_email = ScheduledWelcomeEmail.objects.filter(onboarding_request=obj).first()
if welcome_email:
timeline_rows.append(
{
'created_at': welcome_email.updated_at,
'kind': 'email',
'title': _('Welcome E-Mail'),
'summary': welcome_email.get_status_display(),
'meta': welcome_email.recipient_email,
}
)
timeline_rows.sort(key=lambda item: item['created_at'])
return render(
request,
'workflows/request_timeline.html',
{
'request_kind': kind,
'request_obj': obj,
'request_label': request_label,
'timeline_rows': timeline_rows,
'contract_start': getattr(obj, 'contract_start', None),
'handover_date': getattr(obj, 'handover_date', None),
},
)
@login_required
def requests_dashboard(request):
if not user_has_capability(request.user, 'access_requests_dashboard'):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
if request.method == 'POST':
if not user_has_capability(request.user, 'delete_requests'):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('requests_dashboard')
selected = request.POST.getlist('selected_requests')
single_delete = (request.POST.get('single_delete') or '').strip()
if single_delete:
selected = [single_delete]
if not selected:
messages.warning(request, _('Keine Einträge ausgewählt.'))
return redirect('requests_dashboard')
deleted_count = 0
invalid_count = 0
deleted_labels = []
for token in selected:
try:
kind, raw_id = token.split(':', 1)
request_id = int(raw_id)
except (ValueError, TypeError):
invalid_count += 1
continue
model = None
if kind == 'onboarding':
model = OnboardingRequest
elif kind == 'offboarding':
model = OffboardingRequest
else:
invalid_count += 1
continue
obj = model.objects.filter(id=request_id).first()
if not obj:
continue
deleted_labels.append(_request_target_label(obj, kind))
obj.delete()
deleted_count += 1
if deleted_count:
_audit(
request,
'requests_deleted',
target_type='request',
target_label='Dashboard bulk/single delete',
details={
'deleted_count': deleted_count,
'invalid_count': invalid_count,
'selected': selected,
'request_labels': deleted_labels,
},
)
messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count})
if invalid_count:
messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count})
if not deleted_count and not invalid_count:
messages.info(request, _('Keine passenden Einträge gefunden.'))
return redirect('requests_dashboard')
search_query = request.GET.get('q', '').strip()
type_filter = (request.GET.get('type') or '').strip().lower()
status_filter = (request.GET.get('status') or '').strip().lower()
department_filter = (request.GET.get('department') or '').strip()
date_from = (request.GET.get('date_from') or '').strip()
date_to = (request.GET.get('date_to') or '').strip()
onboarding_qs = OnboardingRequest.objects.order_by('-created_at')
offboarding_qs = OffboardingRequest.objects.order_by('-created_at')
all_onboarding = OnboardingRequest.objects.all()
all_offboarding = OffboardingRequest.objects.all()
if search_query:
onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
if status_filter in {'submitted', 'processing', 'completed', 'failed'}:
onboarding_qs = onboarding_qs.filter(processing_status=status_filter)
offboarding_qs = offboarding_qs.filter(processing_status=status_filter)
if department_filter:
onboarding_qs = onboarding_qs.filter(department=department_filter)
offboarding_qs = offboarding_qs.filter(department=department_filter)
if date_from:
onboarding_qs = onboarding_qs.filter(created_at__date__gte=date_from)
offboarding_qs = offboarding_qs.filter(created_at__date__gte=date_from)
if date_to:
onboarding_qs = onboarding_qs.filter(created_at__date__lte=date_to)
offboarding_qs = offboarding_qs.filter(created_at__date__lte=date_to)
if type_filter == 'onboarding':
offboarding_qs = offboarding_qs.none()
elif type_filter == 'offboarding':
onboarding_qs = onboarding_qs.none()
onboarding_items = onboarding_qs[:50]
offboarding_items = offboarding_qs[:50]
language_code = (
request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME)
or getattr(request, 'LANGUAGE_CODE', '')
or get_language()
or 'de'
).split('-')[0].lower()
rows = []
for obj in onboarding_items:
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
if intro_session and intro_session.exported_pdf_path:
intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}"
rows.append(
{
'id': obj.id,
'kind': 'Onboarding',
'kind_slug': 'onboarding',
'name': obj.full_name,
'work_email': obj.work_email,
'created_at': obj.created_at,
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None,
'intro_session': intro_session,
'status': _request_status_label(obj.processing_status, language_code),
'status_key': obj.processing_status,
'last_error': obj.last_error,
}
)
for obj in offboarding_items:
rows.append(
{
'id': obj.id,
'kind': 'Offboarding',
'kind_slug': 'offboarding',
'name': obj.full_name,
'work_email': obj.work_email,
'created_at': obj.created_at,
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
'intro_pdf_url': None,
'intro_session': None,
'status': _request_status_label(obj.processing_status, language_code),
'status_key': obj.processing_status,
'last_error': obj.last_error,
}
)
rows.sort(key=lambda x: x['created_at'], reverse=True)
today = timezone.localdate()
start_date = today - timedelta(days=13)
onboarding_daily = {}
offboarding_daily = {}
for i in range(14):
day = start_date + timedelta(days=i)
onboarding_daily[day] = 0
offboarding_daily[day] = 0
for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
onboarding_daily[timezone.localtime(dt).date()] += 1
for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
offboarding_daily[timezone.localtime(dt).date()] += 1
chart_points = []
max_total = 1
for i in range(14):
day = start_date + timedelta(days=i)
on_count = onboarding_daily[day]
off_count = offboarding_daily[day]
total = on_count + off_count
max_total = max(max_total, total)
chart_points.append(
{
'label': day.strftime('%d.%m'),
'onboarding': on_count,
'offboarding': off_count,
'total': total,
}
)
for point in chart_points:
point['height'] = max(8, int((point['total'] / max_total) * 84))
onboarding_total = onboarding_qs.count()
offboarding_total = offboarding_qs.count()
departments = sorted(
{
value.strip()
for value in list(all_onboarding.exclude(department='').values_list('department', flat=True))
+ list(all_offboarding.exclude(department='').values_list('department', flat=True))
if value and value.strip()
},
key=str.lower,
)
status_choices = [
{'value': 'submitted', 'label': _request_status_label('submitted', language_code)},
{'value': 'processing', 'label': _request_status_label('processing', language_code)},
{'value': 'completed', 'label': _request_status_label('completed', language_code)},
{'value': 'failed', 'label': _request_status_label('failed', language_code)},
]
has_filters = any([search_query, type_filter, status_filter, department_filter, date_from, date_to])
column_count = 4
if user_has_capability(request.user, 'delete_requests'):
column_count += 1
if user_has_capability(request.user, 'run_intro_session') or user_has_capability(request.user, 'generate_intro_pdfs'):
column_count += 1
if user_has_capability(request.user, 'access_requests_dashboard'):
column_count += 1
return render(
request,
'workflows/requests_dashboard.html',
{
'rows': rows[:60],
'search_query': search_query,
'selected_type': type_filter,
'selected_status': status_filter,
'selected_department': department_filter,
'date_from': date_from,
'date_to': date_to,
'departments': departments,
'status_choices': status_choices,
'has_filters': has_filters,
'column_count': column_count,
'onboarding_total': onboarding_total,
'offboarding_total': offboarding_total,
'combined_total': onboarding_total + offboarding_total,
'chart_points': chart_points,
},
)
@login_required
@ensure_csrf_cookie
def onboarding_create(request):
config = WorkflowConfig.objects.order_by('id').first()
legal_text = (
config.legal_text
if config and config.legal_text
else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.'
)
if request.method == 'POST':
form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email)
if form.is_valid():
obj = form.save()
obj.onboarded_by_name = _display_user_name(request.user)
obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0])
obj.save(update_fields=['onboarded_by_name', 'preferred_language'])
process_onboarding_request.delay(obj.id)
return redirect(f"/onboarding/new/?saved=1&id={obj.id}")
else:
form = OnboardingRequestForm(requester_email=request.user.email)
onboarding_blocks = _build_onboarding_layout(form)
field_pages = getattr(form, '_field_page_keys', {})
section_configs = ensure_form_section_configs('onboarding')
visible_section_keys = {
key for key in ONBOARDING_SECTION_ORDER
if key in LOCKED_SECTION_RULES.get('onboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible
}
onboarding_sections = _build_onboarding_sections(onboarding_blocks, field_pages, visible_section_keys=visible_section_keys)
return render(
request,
'workflows/onboarding_form.html',
{
'form': form,
'onboarding_blocks': onboarding_blocks,
'onboarding_sections': onboarding_sections,
'onboarding_inline_checks': ONBOARDING_INLINE_CHECKS,
'onboarding_checkbox_lists': ONBOARDING_CHECKBOX_LISTS,
'legal_text': legal_text,
'saved': request.GET.get('saved') == '1',
'saved_request_id': request.GET.get('id', ''),
'portal_email_domain': get_company_email_domain(),
},
)
@login_required
def onboarding_success(request, request_id: int):
obj = get_object_or_404(OnboardingRequest, id=request_id)
pdf_url = None
if obj.generated_pdf_path:
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
@_require_capability('generate_intro_pdfs')
@require_POST
def generate_onboarding_intro_pdf(request, request_id: int):
obj = get_object_or_404(OnboardingRequest, id=request_id)
pdf_path = _generate_onboarding_intro_pdf(obj, language_code=get_language())
obj.intro_pdf_path = str(pdf_path)
obj.save(update_fields=['intro_pdf_path'])
_audit(request, 'intro_pdf_generated', target_type='onboarding', target_id=obj.id, target_label=obj.full_name)
messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.'))
return redirect('requests_dashboard')
@_require_capability('generate_intro_pdfs')
@require_POST
def generate_onboarding_intro_session_pdf(request, request_id: int):
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
pdf_path = _generate_onboarding_intro_session_pdf(
session,
admin_signature_name=_display_user_name(request.user),
language_code=get_language(),
)
session.exported_pdf_path = str(pdf_path)
session.save(update_fields=['exported_pdf_path'])
_audit(request, 'intro_live_pdf_generated', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name)
messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.'))
return redirect('onboarding_intro_session_page', request_id=request_id)
@_require_capability('run_intro_session')
def onboarding_intro_session_page(request, request_id: int):
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
sections = build_intro_sections_for_request(onboarding, language_code=get_language())
if request.method == 'POST':
checked_ids = set(request.POST.getlist('checked_items'))
checklist_state = {}
for section in sections:
for item in section['items']:
checklist_state[item['id']] = item['id'] in checked_ids
action = (request.POST.get('session_action') or 'save').strip()
session.checklist_state = checklist_state
session.notes = (request.POST.get('notes') or '').strip()
if action == 'reset':
session.checklist_state = {}
session.notes = ''
session.status = 'draft'
session.completed_at = None
session.completed_by_name = ''
session.exported_pdf_path = ''
session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path'])
_audit(request, 'intro_session_reset', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name)
messages.success(request, _('Einweisung wurde zurückgesetzt.'))
return redirect('onboarding_intro_session_page', request_id=request_id)
if action == 'complete':
session.status = 'completed'
session.completed_at = timezone.now()
session.completed_by_name = _display_user_name(request.user)
_audit(
request,
'intro_session_completed',
target_type='onboarding',
target_id=onboarding.id,
target_label=onboarding.full_name,
details={'checked_count': len([value for value in checklist_state.values() if value])},
)
messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.'))
else:
session.status = 'draft'
session.completed_at = None
session.completed_by_name = ''
_audit(
request,
'intro_session_saved',
target_type='onboarding',
target_id=onboarding.id,
target_label=onboarding.full_name,
details={'checked_count': len([value for value in checklist_state.values() if value])},
)
messages.success(request, _('Einweisung wurde als Entwurf gespeichert.'))
session.save()
return redirect('onboarding_intro_session_page', request_id=request_id)
checked_map = session.checklist_state or {}
checked_count = 0
total_count = 0
for section in sections:
for item in section['items']:
item['checked'] = bool(checked_map.get(item['id']))
total_count += 1
if item['checked']:
checked_count += 1
salutation = (onboarding.get_gender_display() or '').strip()
display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name
progress_percent = int((checked_count / total_count) * 100) if total_count else 0
return render(
request,
'workflows/onboarding_intro_session.html',
{
'onboarding': onboarding,
'session': session,
'display_name': display_name,
'sections': sections,
'checked_count': checked_count,
'total_count': total_count,
'progress_percent': progress_percent,
'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None,
},
)
@login_required
@ensure_csrf_cookie
def offboarding_create(request):
profile_id = request.GET.get('profile')
search_query = request.GET.get('q', '').strip()
selected_profile = None
if profile_id:
selected_profile = EmployeeProfile.objects.filter(id=profile_id).first()
search_results = []
if search_query:
search_results = list(
EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10]
) + list(
EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10]
)
# preserve order while removing duplicates
seen = set()
unique = []
for r in search_results:
if r.id not in seen:
unique.append(r)
seen.add(r.id)
search_results = unique[:10]
if request.method == 'POST':
form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile)
if form.is_valid():
obj = form.save(commit=False)
if selected_profile:
obj.employee_profile = selected_profile
requester_email = (request.user.email or '').strip().lower()
company_suffix = f"@{get_company_email_domain()}"
if requester_email and requester_email.endswith(company_suffix):
obj.requested_by_email = requester_email
else:
obj.requested_by_email = settings.DEFAULT_FROM_EMAIL
obj.requested_by_name = _display_user_name(request.user)
obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0])
obj.save()
process_offboarding_request.delay(obj.id)
return redirect(f"/offboarding/new/?saved=1&id={obj.id}")
else:
form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query})
field_pages = getattr(form, '_field_page_keys', {})
section_configs = ensure_form_section_configs('offboarding')
visible_section_keys = {
key for key in OFFBOARDING_PAGE_ORDER
if key in LOCKED_SECTION_RULES.get('offboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible
}
offboarding_sections = _build_offboarding_sections(form, visible_section_keys=visible_section_keys)
return render(
request,
'workflows/offboarding_form.html',
{
'form': form,
'search_results': search_results,
'selected_profile': selected_profile,
'search_query': search_query,
'saved': request.GET.get('saved') == '1',
'saved_request_id': request.GET.get('id', ''),
'portal_email_domain': get_company_email_domain(),
'offboarding_sections': offboarding_sections,
},
)
@login_required
def offboarding_success(request, request_id: int):
obj = get_object_or_404(OffboardingRequest, id=request_id)
pdf_url = None
if obj.generated_pdf_path:
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
@_require_capability('manage_builders')
def form_builder_page(request):
language_code = get_language()
form_type = request.GET.get('form_type', 'onboarding')
anchor = (request.GET.get('anchor') or '').strip()
active_panel = (request.GET.get('panel') or '').strip()
active_subpanel = (request.GET.get('subpanel') or '').strip()
if form_type not in DEFAULT_FIELD_ORDER:
form_type = 'onboarding'
option_category = request.GET.get('option_category', 'department')
option_categories = [c[0] for c in FormOption.CATEGORY_CHOICES]
if option_category not in option_categories:
option_category = option_categories[0]
if request.method == 'POST':
delete_option_id = request.POST.get('delete_option_id', '').strip()
if delete_option_id:
option = FormOption.objects.filter(id=delete_option_id).first()
if not option:
messages.error(request, 'Option nicht gefunden.')
else:
option_category = option.category
deleted_label = option.label
deleted_id = option.id
option.delete()
_audit(request, 'form_option_deleted', target_type='form_option', target_id=deleted_id, target_label=deleted_label)
messages.success(request, 'Option wurde gelöscht.')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}&panel=builder-content&subpanel=options#builder-content")
action = request.POST.get('builder_action', '')
if action == 'add_option':
category = request.POST.get('category', '').strip()
label = request.POST.get('label', '').strip()
label_en = request.POST.get('label_en', '').strip()
value = request.POST.get('value', '').strip()
if category not in option_categories:
messages.error(request, 'Ungültige Kategorie.')
elif not label:
messages.error(request, 'Bitte einen Namen für die Option angeben.')
else:
next_sort = (
FormOption.objects.filter(category=category).order_by('-sort_order').values_list('sort_order', flat=True).first()
)
FormOption.objects.create(
# Global form option catalog entry
category=category,
label=label,
label_en=label_en,
value=value or label,
sort_order=(next_sort + 1) if next_sort is not None else 0,
is_active=True,
)
_audit(
request,
'form_option_added',
target_type='form_option',
target_label=label,
details={'category': category, 'label_en': label_en, 'value': value or label},
)
messages.success(request, 'Option wurde hinzugefügt.')
option_category = category
elif action == 'save_options':
option_ids = request.POST.getlist('option_ids')
for pos, raw_id in enumerate(option_ids):
option = FormOption.objects.filter(id=raw_id).first()
if not option:
continue
next_label = request.POST.get(f'label_{option.id}', '').strip() or option.label
option.label = next_label
option.label_en = request.POST.get(f'label_en_{option.id}', '').strip()
option.value = request.POST.get(f'value_{option.id}', '').strip() or next_label
option.is_active = request.POST.get(f'active_{option.id}') == 'on'
option.sort_order = pos
try:
option.save(update_fields=['label', 'label_en', 'value', 'is_active', 'sort_order'])
except IntegrityError:
messages.error(request, f'Doppelte Bezeichnung in Kategorie: {next_label}')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option.category}&panel=builder-content&subpanel=options#builder-content")
option_category = option.category
_audit(request, 'form_options_saved', target_type='form_option', target_label=option_category, details={'count': len(option_ids)})
messages.success(request, 'Optionen wurden gespeichert.')
elif action == 'save_field_texts':
field_ids = request.POST.getlist('field_ids')
for raw_id in field_ids:
cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first()
if not cfg:
continue
cfg.label_override = (request.POST.get(f'label_override_{cfg.id}') or '').strip()
cfg.label_override_en = (request.POST.get(f'label_override_en_{cfg.id}') or '').strip()
cfg.help_text_override = (request.POST.get(f'help_text_override_{cfg.id}') or '').strip()
cfg.help_text_override_en = (request.POST.get(f'help_text_override_en_{cfg.id}') or '').strip()
cfg.save(update_fields=['label_override', 'label_override_en', 'help_text_override', 'help_text_override_en'])
_audit(request, 'form_field_texts_saved', target_type='form_config', target_label=form_type, details={'count': len(field_ids)})
messages.success(request, 'Feldtexte wurden gespeichert.')
elif action == 'save_field_rules':
field_ids = request.POST.getlist('field_rule_ids')
locked_fields = LOCKED_FIELD_RULES.get(form_type, set())
updated = 0
for raw_id in field_ids:
cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first()
if not cfg:
continue
if cfg.field_name in locked_fields:
cfg.is_visible = True
cfg.is_required = None
else:
cfg.is_visible = request.POST.get(f'is_visible_{cfg.id}') == 'on'
required_mode = (request.POST.get(f'is_required_{cfg.id}') or '').strip()
cfg.is_required = True if required_mode == 'required' else False if required_mode == 'optional' else None
cfg.save(update_fields=['is_visible', 'is_required'])
updated += 1
_audit(request, 'form_field_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated})
messages.success(request, 'Feldregeln wurden gespeichert.')
elif action == 'save_section_rules' and form_type in {'onboarding', 'offboarding'}:
section_configs = ensure_form_section_configs(form_type)
locked_sections = LOCKED_SECTION_RULES.get(form_type, set())
updated = 0
for section_key, cfg in section_configs.items():
if section_key in locked_sections:
if not cfg.is_visible:
cfg.is_visible = True
cfg.save(update_fields=['is_visible'])
continue
cfg.is_visible = request.POST.get(f'section_visible_{section_key}') == 'on'
cfg.save(update_fields=['is_visible'])
updated += 1
_audit(request, 'form_section_rules_saved', target_type='form_config', target_label=form_type, details={'count': updated})
messages.success(request, 'Abschnittsregeln wurden gespeichert.')
elif action == 'apply_preset':
preset_key = (request.POST.get('preset_key') or '').strip()
if apply_form_preset(form_type, preset_key):
active_panel = 'builder-preview'
_audit(request, 'form_preset_applied', target_type='form_config', target_label=form_type, details={'preset': preset_key})
messages.success(request, 'Preset wurde angewendet.')
else:
messages.error(request, 'Preset konnte nicht angewendet werden.')
if action in {'add_option', 'save_options', 'save_field_texts'}:
active_panel = 'builder-content'
if action in {'add_option', 'save_options'}:
active_subpanel = 'options'
elif action == 'save_field_texts':
active_subpanel = 'field-texts'
elif action in {'save_field_rules', 'save_section_rules'}:
active_panel = 'builder-rules'
redirect_target = f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}"
if active_panel:
redirect_target += f"&panel={active_panel}"
if active_subpanel:
redirect_target += f"&subpanel={active_subpanel}"
if anchor == 'builder-content' or active_panel == 'builder-content':
redirect_target += "#builder-content"
return redirect(redirect_target)
default_names = list(DEFAULT_FIELD_ORDER.get(form_type, []))
existing_names = list(
OnboardingRequestForm.base_fields.keys()
if form_type == 'onboarding'
else OffboardingRequestForm.base_fields.keys()
)
for name in existing_names:
if name not in default_names:
default_names.append(name)
ensure_form_field_configs(form_type, default_names)
section_configs = ensure_form_section_configs(form_type)
section_order = get_section_order(form_type)
section_labels = get_section_labels(form_type)
default_page_map = get_default_page_map(form_type)
configs = list(
FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name')
)
labels = _form_field_labels(form_type)
locked = LOCKED_FIELD_RULES.get(form_type, set())
locked_sections = LOCKED_SECTION_RULES.get(form_type, set())
if form_type == 'onboarding':
columns = [
{
'key': key,
'title': ONBOARDING_PAGE_LABELS.get(key, key),
'items': [],
}
for key in ONBOARDING_PAGE_ORDER
]
column_by_key = {c['key']: c for c in columns}
fallback = 'abschluss'
for cfg in configs:
page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(cfg.field_name, fallback)
if page_key not in column_by_key:
page_key = fallback
column_by_key[page_key]['items'].append(
{
'field_name': cfg.field_name,
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name),
'label_en': cfg.label_override_en,
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
'page_key': page_key,
}
)
else:
columns = [
{
'key': key,
'title': section_labels.get(key, key),
'items': [],
}
for key in section_order
]
column_by_key = {c['key']: c for c in columns}
fallback = section_order[-1] if section_order else 'all'
for cfg in configs:
page_key = cfg.page_key or default_page_map.get(cfg.field_name, fallback)
if page_key not in column_by_key:
page_key = fallback
column_by_key[page_key]['items'].append(
{
'field_name': cfg.field_name,
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name),
'label_en': cfg.label_override_en,
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
'page_key': page_key,
}
)
section_rule_items = []
if section_order:
fallback_section = section_order[-1] if section_order else ''
for key in section_order:
cfg = section_configs.get(key)
section_rule_items.append(
{
'key': key,
'title': section_labels.get(key, key),
'is_visible': True if not cfg else cfg.is_visible,
'locked': key in locked_sections,
'field_count': len([c for c in configs if (c.page_key or default_page_map.get(c.field_name, fallback_section)) == key]),
}
)
field_rule_items = []
for cfg in configs:
page_key = cfg.page_key or default_page_map.get(cfg.field_name, section_order[-1] if section_order else '')
field_rule_items.append(
{
'id': cfg.id,
'field_name': cfg.field_name,
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
'page_key': page_key,
'page_label': section_labels.get(page_key, page_key) if section_order else '',
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
}
)
field_rule_groups = []
if section_order:
grouped_rules = {key: [] for key in section_order}
for item in field_rule_items:
grouped_rules.setdefault(item['page_key'], []).append(item)
for key in section_order:
field_rule_groups.append(
{
'key': key,
'title': section_labels.get(key, key),
'items': grouped_rules.get(key, []),
}
)
field_text_groups = []
if section_order:
grouped_texts = {key: [] for key in section_order}
for cfg in configs:
page_key = cfg.page_key or default_page_map.get(cfg.field_name, section_order[-1] if section_order else '')
grouped_texts.setdefault(page_key, []).append(cfg)
for key in section_order:
field_text_groups.append(
{
'key': key,
'title': section_labels.get(key, key),
'items': grouped_texts.get(key, []),
}
)
preview_sections = []
if section_order:
field_rule_group_map = {group['key']: group['items'] for group in field_rule_groups}
for key in section_order:
section_cfg = section_configs.get(key)
section_locked = key in locked_sections
section_visible = True if section_locked or not section_cfg else section_cfg.is_visible
visible_items = [
item for item in field_rule_group_map.get(key, [])
if item['locked'] or item['is_visible']
]
if section_visible:
preview_sections.append(
{
'key': key,
'title': section_labels.get(key, key),
'items': visible_items,
}
)
locked_field_count = len([item for item in field_rule_items if item['locked']])
hidden_field_count = len([item for item in field_rule_items if not item['is_visible']])
configurable_field_count = len(field_rule_items) - locked_field_count
hidden_section_count = len([item for item in section_rule_items if not item['is_visible']]) if section_rule_items else 0
builder_summary = {
'locked_field_count': locked_field_count,
'configurable_field_count': configurable_field_count,
'hidden_field_count': hidden_field_count,
'hidden_section_count': hidden_section_count,
}
return render(
request,
'workflows/form_builder.html',
{
'form_type': form_type,
'columns': columns,
'form_types': [('onboarding', 'Onboarding'), ('offboarding', 'Offboarding')],
'option_categories': _translate_choice_list(FormOption.CATEGORY_CHOICES),
'selected_option_category': option_category,
'option_items': FormOption.objects.filter(category=option_category).order_by('sort_order', 'label'),
'field_text_items': configs,
'field_rule_items': field_rule_items,
'field_rule_groups': field_rule_groups,
'field_text_groups': field_text_groups,
'preview_sections': preview_sections,
'section_rule_items': section_rule_items,
'builder_summary': builder_summary,
'active_panel': active_panel,
'active_subpanel': active_subpanel,
'available_presets': FORM_PRESETS.get(form_type, {}),
},
)
@_require_capability('manage_builders')
def intro_builder_page(request):
if request.method == 'POST':
delete_id = (request.POST.get('delete_item_id') or '').strip()
if delete_id:
item = IntroChecklistItem.objects.filter(id=delete_id).first()
if item:
deleted_label = item.label
deleted_id_int = item.id
item.delete()
_audit(request, 'intro_checklist_item_deleted', target_type='intro_checklist_item', target_id=deleted_id_int, target_label=deleted_label)
messages.success(request, 'Checklistenpunkt wurde gelöscht.')
else:
messages.error(request, 'Checklistenpunkt nicht gefunden.')
return redirect('intro_builder_page')
action = (request.POST.get('builder_action') or '').strip()
if action == 'add_item':
section = (request.POST.get('section') or '').strip()
label = (request.POST.get('label') or '').strip()
label_en = (request.POST.get('label_en') or '').strip()
if section not in {k for k, _ in IntroChecklistItem.SECTION_CHOICES}:
messages.error(request, 'Ungültiger Abschnitt.')
return redirect('intro_builder_page')
if not label:
messages.error(request, 'Bitte eine Bezeichnung für den Checklistenpunkt angeben.')
return redirect('intro_builder_page')
next_sort = (
IntroChecklistItem.objects.filter(section=section).order_by('-sort_order').values_list('sort_order', flat=True).first()
)
IntroChecklistItem.objects.create(
section=section,
label=label,
label_en=label_en,
sort_order=(next_sort + 1) if next_sort is not None else 0,
is_active=True,
condition_operator='always',
)
_audit(request, 'intro_checklist_item_added', target_type='intro_checklist_item', target_label=label, details={'section': section, 'label_en': label_en})
messages.success(request, 'Checklistenpunkt wurde hinzugefügt.')
return redirect('intro_builder_page')
if action == 'save_items':
item_ids = request.POST.getlist('item_ids')
valid_sections = {k for k, _ in IntroChecklistItem.SECTION_CHOICES}
valid_ops = {k for k, _ in IntroChecklistItem.OPERATOR_CHOICES}
for pos, raw_id in enumerate(item_ids):
item = IntroChecklistItem.objects.filter(id=raw_id).first()
if not item:
continue
section = (request.POST.get(f'section_{item.id}') or item.section).strip()
if section not in valid_sections:
section = item.section
operator = (request.POST.get(f'operator_{item.id}') or item.condition_operator).strip()
if operator not in valid_ops:
operator = 'always'
item.section = section
item.label = (request.POST.get(f'label_{item.id}') or item.label).strip() or item.label
item.label_en = (request.POST.get(f'label_en_{item.id}') or '').strip()
item.is_active = request.POST.get(f'active_{item.id}') == 'on'
item.condition_field = (request.POST.get(f'field_{item.id}') or '').strip()
item.condition_operator = operator
item.condition_value = (request.POST.get(f'value_{item.id}') or '').strip()
item.sort_order = pos
item.save(
update_fields=[
'section',
'label',
'label_en',
'is_active',
'condition_field',
'condition_operator',
'condition_value',
'sort_order',
]
)
_audit(request, 'intro_checklist_saved', target_type='intro_checklist_item', details={'count': len(item_ids)})
messages.success(request, 'Einweisungs-Checkliste wurde gespeichert.')
return redirect('intro_builder_page')
condition_field_choices = [
('', 'Keine Bedingung'),
('needed_devices', 'Benötigte Geräte und Gegenstände'),
('needed_software', 'Benötigte Software'),
('needed_accesses', 'Benötigte Zugänge'),
('needed_workspace_groups', 'Benötigte Gruppen im Workspace'),
('needed_resources', 'Benötigte Ressourcen'),
('additional_hardware', 'Zusätzliche Hardware'),
('additional_software', 'Zusätzliche Software'),
('additional_access_text', 'Weitere Zugänge (Freitext)'),
('group_mailboxes_required', 'Gruppenpostfächer erforderlich'),
('order_business_cards', 'Visitenkarten bestellt'),
('phone_number', 'Direktwahl vorhanden'),
('successor_name', 'Nachfolge vorhanden'),
('department', 'Abteilung'),
]
items = list(IntroChecklistItem.objects.all().order_by('section', 'sort_order', 'label'))
return render(
request,
'workflows/intro_builder.html',
{
'items': items,
'section_choices': _translate_choice_list(IntroChecklistItem.SECTION_CHOICES),
'operator_choices': _translate_choice_list(IntroChecklistItem.OPERATOR_CHOICES),
'condition_field_choices': condition_field_choices,
},
)
@_require_capability('manage_integrations')
def integrations_setup_page(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
kind = (request.GET.get('kind') or 'nextcloud').strip().lower()
if kind not in {'nextcloud', 'mail', 'emails', 'rules', 'backup'}:
kind = 'nextcloud'
templates = list(NotificationTemplate.objects.all().order_by('key'))
system_email_config = (
SystemEmailConfig.objects.filter(is_active=True).order_by('-updated_at').first()
or SystemEmailConfig.objects.filter(name='Default SMTP').first()
)
return render(
request,
'workflows/integrations_setup.html',
{
'workflow_config': config,
'system_email_config': system_email_config,
'nextcloud_enabled': is_nextcloud_enabled(),
'email_test_mode': is_email_test_mode(),
'kind': kind,
'templates': templates,
'notification_rules': NotificationRule.objects.all().order_by('event_type', 'sort_order', 'id'),
'rule_event_choices': NotificationRule.EVENT_CHOICES,
'rule_operator_choices': NotificationRule.OPERATOR_CHOICES,
'template_choices': NotificationTemplate.TEMPLATE_CHOICES,
'remote_backup_target_choices': WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES,
},
)
@_require_capability('manage_welcome_emails')
def welcome_emails_page(request):
rows = ScheduledWelcomeEmail.objects.select_related('onboarding_request').order_by('-send_at', '-id')[:200]
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
welcome_template = NotificationTemplate.objects.filter(key='onboarding_welcome').first()
default_welcome = get_default_notification_templates().get('onboarding_welcome', {})
default_subject = (default_welcome.get('subject') or '').strip()
default_body = (default_welcome.get('body') or '').strip()
default_subject_en = (default_welcome.get('subject_en') or '').strip()
default_body_en = (default_welcome.get('body_en') or '').strip()
subject_value = (welcome_template.subject_template if welcome_template else '').strip() or default_subject
body_value = (welcome_template.body_template if welcome_template else '').strip() or default_body
subject_value_en = (welcome_template.subject_template_en if welcome_template else '').strip() or default_subject_en
body_value_en = (welcome_template.body_template_en if welcome_template else '').strip() or default_body_en
return render(
request,
'workflows/welcome_emails.html',
{
'rows': rows,
'workflow_config': config,
'welcome_template': welcome_template,
'welcome_subject_value': subject_value,
'welcome_body_value': body_value,
'welcome_subject_value_en': subject_value_en,
'welcome_body_value_en': body_value_en,
'welcome_keywords': ['{{ FULL_NAME }}', '{{ VORNAME }}', '{{ NACHNAME }}', '{{ DEPARTMENT }}', '{{ CONTRACT_START }}', '{{ EMAIL }}', '{{ REQUESTED_BY }}'],
},
)
@_require_capability('manage_welcome_emails')
@require_POST
def trigger_welcome_email_now(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status == 'cancelled':
messages.error(request, f'Welcome E-Mail #{schedule_id} ist abgebrochen und kann nicht gesendet werden.')
return redirect('welcome_emails_page')
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
_audit(request, 'welcome_email_triggered_now', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde sofort angestoßen.')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def save_welcome_email_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
delay_days = int(request.POST.get('welcome_email_delay_days', config.welcome_email_delay_days or 5))
except ValueError:
messages.error(request, 'Ungültige Zahl bei der Welcome-Verzögerung.')
return redirect('welcome_emails_page')
config.welcome_email_delay_days = max(0, delay_days)
config.welcome_sender_email = request.POST.get('welcome_sender_email', '').strip()
config.welcome_include_pdf = request.POST.get('welcome_include_pdf') == 'on'
config.save(update_fields=['welcome_email_delay_days', 'welcome_sender_email', 'welcome_include_pdf'])
subject = request.POST.get('welcome_subject')
body = request.POST.get('welcome_body')
subject_en = request.POST.get('welcome_subject_en')
body_en = request.POST.get('welcome_body_en')
if subject is not None or body is not None or subject_en is not None or body_en is not None:
default_welcome = get_default_notification_templates().get('onboarding_welcome', {})
default_subject = (default_welcome.get('subject') or '').strip()
default_body = (default_welcome.get('body') or '').strip()
default_subject_en = (default_welcome.get('subject_en') or '').strip()
default_body_en = (default_welcome.get('body_en') or '').strip()
subject_clean = (subject or '').strip() or default_subject
body_clean = (body or '').strip() or default_body
subject_clean_en = (subject_en or '').strip() or default_subject_en
body_clean_en = (body_en or '').strip() or default_body_en
template, _ = NotificationTemplate.objects.get_or_create(
key='onboarding_welcome',
defaults={
'subject_template': subject_clean,
'body_template': body_clean,
'subject_template_en': subject_clean_en,
'body_template_en': body_clean_en,
'is_active': True,
},
)
changes = []
if template.subject_template != subject_clean:
template.subject_template = subject_clean
changes.append('subject_template')
if template.body_template != body_clean:
template.body_template = body_clean
changes.append('body_template')
if template.subject_template_en != subject_clean_en:
template.subject_template_en = subject_clean_en
changes.append('subject_template_en')
if template.body_template_en != body_clean_en:
template.body_template_en = body_clean_en
changes.append('body_template_en')
if not template.is_active:
template.is_active = True
changes.append('is_active')
if changes:
template.save(update_fields=changes)
_audit(
request,
'welcome_email_settings_saved',
target_type='welcome_email_settings',
target_label='onboarding_welcome',
details={
'delay_days': config.welcome_email_delay_days,
'sender_email': config.welcome_sender_email,
'include_pdf': config.welcome_include_pdf,
},
)
messages.success(request, 'Welcome-E-Mail Einstellungen wurden gespeichert.')
return redirect('welcome_emails_page')
def _revoke_celery_task(task_id: str) -> None:
if not task_id:
return
try:
current_app.control.revoke(task_id, terminate=False)
except Exception:
return
def _parse_selected_schedule_ids(raw: str) -> list[int]:
if not raw:
return []
parsed: list[int] = []
seen: set[int] = set()
for token in raw.split(','):
token = token.strip()
if not token:
continue
try:
schedule_id = int(token)
except ValueError:
continue
if schedule_id in seen:
continue
seen.add(schedule_id)
parsed.append(schedule_id)
return parsed
@_require_capability('manage_welcome_emails')
@require_POST
def bulk_welcome_email_action(request):
action = (request.POST.get('bulk_action') or '').strip().lower()
selected_ids = _parse_selected_schedule_ids(request.POST.get('selected_ids', ''))
if action not in {'pause', 'send_now', 'delete'}:
messages.error(request, 'Ungültige Bulk-Aktion.')
return redirect('welcome_emails_page')
if not selected_ids:
messages.warning(request, 'Keine Welcome-Einträge ausgewählt.')
return redirect('welcome_emails_page')
rows = list(ScheduledWelcomeEmail.objects.filter(id__in=selected_ids).order_by('id'))
if not rows:
messages.warning(request, 'Keine passenden Welcome-Einträge gefunden.')
return redirect('welcome_emails_page')
success_count = 0
skipped_count = 0
for scheduled in rows:
if action == 'pause':
if scheduled.status in {'sent', 'cancelled'}:
skipped_count += 1
continue
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'paused'
scheduled.save(update_fields=['status', 'updated_at'])
success_count += 1
continue
if action == 'send_now':
if scheduled.status == 'cancelled':
skipped_count += 1
continue
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
success_count += 1
continue
if action == 'delete':
if scheduled.status == 'scheduled':
_revoke_celery_task(scheduled.celery_task_id)
scheduled.delete()
success_count += 1
action_label = {
'pause': 'pausiert',
'send_now': 'sofort angestoßen',
'delete': 'gelöscht',
}[action]
if success_count:
_audit(
request,
'welcome_email_bulk_action',
target_type='welcome_email',
target_label=action,
details={'selected_ids': selected_ids, 'success_count': success_count, 'skipped_count': skipped_count},
)
messages.success(request, f'{success_count} Welcome-Eintrag/Einträge {action_label}.')
if skipped_count:
messages.warning(request, f'{skipped_count} Eintrag/Einträge wurden übersprungen (Status nicht geeignet).')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def pause_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status in {'sent', 'cancelled'}:
messages.error(request, f'Welcome E-Mail #{schedule_id} kann nicht pausiert werden.')
return redirect('welcome_emails_page')
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'paused'
scheduled.save(update_fields=['status', 'updated_at'])
_audit(request, 'welcome_email_paused', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde pausiert.')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def resume_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status != 'paused':
messages.error(request, f'Welcome E-Mail #{schedule_id} ist nicht pausiert.')
return redirect('welcome_emails_page')
eta = scheduled.send_at if timezone.now() < scheduled.send_at else None
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=eta)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
_audit(request, 'welcome_email_resumed', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde fortgesetzt.')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def cancel_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status == 'sent':
messages.error(request, f'Welcome E-Mail #{schedule_id} wurde bereits gesendet.')
return redirect('welcome_emails_page')
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'cancelled'
scheduled.last_error = ''
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
_audit(request, 'welcome_email_cancelled', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde abgebrochen.')
return redirect('welcome_emails_page')
@_require_capability('manage_builders')
@require_POST
def form_builder_save_order(request):
try:
payload = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
return JsonResponse({'ok': False, 'error': 'Ungültige JSON-Daten.'}, status=400)
form_type = payload.get('form_type')
if form_type not in DEFAULT_FIELD_ORDER:
return JsonResponse({'ok': False, 'error': 'Ungültiger Formulartyp.'}, status=400)
columns = payload.get('columns')
if not isinstance(columns, dict):
return JsonResponse({'ok': False, 'error': 'Spalten-Daten fehlen.'}, status=400)
configs = list(FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name'))
allowed_names = {cfg.field_name for cfg in configs}
seen = set()
ordered_names = []
if form_type == 'onboarding':
allowed_columns = ONBOARDING_PAGE_ORDER
else:
allowed_columns = ['all']
name_to_cfg = {cfg.field_name: cfg for cfg in configs}
sort_order = 0
for column_key in allowed_columns:
names = columns.get(column_key, [])
if not isinstance(names, list):
return JsonResponse({'ok': False, 'error': f'Ungültige Spalte: {column_key}'}, status=400)
for name in names:
if not isinstance(name, str):
continue
if name not in allowed_names or name in seen:
continue
seen.add(name)
ordered_names.append(name)
cfg = name_to_cfg[name]
cfg.sort_order = sort_order
sort_order += 1
if form_type == 'onboarding':
cfg.page_key = column_key
else:
cfg.page_key = ''
missing = [cfg.field_name for cfg in configs if cfg.field_name not in seen]
for name in missing:
cfg = name_to_cfg[name]
cfg.sort_order = sort_order
sort_order += 1
if form_type == 'onboarding':
cfg.page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(name, 'abschluss')
else:
cfg.page_key = ''
FormFieldConfig.objects.bulk_update(configs, ['sort_order', 'page_key'])
_audit(request, 'form_layout_saved', target_type='form_config', target_label=form_type, details={'saved_count': len(configs)})
return JsonResponse({'ok': True, 'saved_count': len(configs)})
@_require_capability('manage_integrations')
@require_POST
def send_test_email(request):
mode = 'TEST_MODE_ON' if is_email_test_mode() else 'TEST_MODE_OFF'
redirect_email = get_email_test_redirect()
try:
send_system_email(
subject=f'SMTP test from onboarding/offboarding v2 ({mode})',
body=(
'This is a test email. If you see this, SMTP is configured correctly.\n'
f'EMAIL_TEST_MODE={is_email_test_mode()}\n'
f'EMAIL_TEST_REDIRECT={redirect_email}\n'
),
to=[settings.TEST_NOTIFICATION_EMAIL],
)
_audit(request, 'smtp_test_sent', target_type='system_email', target_label=settings.TEST_NOTIFICATION_EMAIL, details={'email_test_mode': is_email_test_mode()})
notify_user(
user=request.user,
title=_('SMTP-Test erfolgreich'),
body=_('Die SMTP-Testmail wurde erfolgreich gesendet.'),
level=UserNotification.LEVEL_SUCCESS,
link_url='/admin-tools/integrations/',
event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS,
)
messages.success(request, f'SMTP-Testmail wurde gesendet ({mode}).')
except Exception as exc:
notify_user(
user=request.user,
title=_('SMTP-Test fehlgeschlagen'),
body=str(exc),
level=UserNotification.LEVEL_ERROR,
link_url='/admin-tools/integrations/',
event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS,
)
messages.error(request, _('SMTP-Testmail konnte nicht gesendet werden: %(error)s') % {'error': exc})
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def nextcloud_test_upload(request):
filename = f"nextcloud_test_{timezone.now().strftime('%Y%m%d_%H%M%S')}.txt"
content = (
"Nextcloud test upload from onboarding/offboarding system.\n"
f"Time: {timezone.now().isoformat()}\n"
f"User: {request.user.username}\n"
)
temp_path = None
try:
with NamedTemporaryFile('w', suffix='.txt', delete=False, encoding='utf-8') as tf:
tf.write(content)
temp_path = Path(tf.name)
ok = upload_to_nextcloud(temp_path, filename)
if ok:
_audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'success'})
notify_user(
user=request.user,
title=_('Nextcloud-Test erfolgreich'),
body=_('Der Testupload nach Nextcloud war erfolgreich.'),
level=UserNotification.LEVEL_SUCCESS,
link_url='/admin-tools/integrations/',
event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS,
)
messages.success(request, f'Nextcloud-Testupload erfolgreich: {filename}')
else:
_audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'error'})
notify_user(
user=request.user,
title=_('Nextcloud-Test fehlgeschlagen'),
body=_('Der Testupload nach Nextcloud ist fehlgeschlagen.'),
level=UserNotification.LEVEL_ERROR,
link_url='/admin-tools/integrations/',
event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS,
)
messages.error(request, 'Nextcloud-Testupload fehlgeschlagen. Bitte Konfiguration prüfen.')
except Exception as exc:
notify_user(
user=request.user,
title=_('Nextcloud-Test fehlgeschlagen'),
body=str(exc),
level=UserNotification.LEVEL_ERROR,
link_url='/admin-tools/integrations/',
event_key=UserProfile.NOTIFICATION_SYSTEM_ALERTS,
)
messages.error(request, f'Nextcloud-Testupload fehlgeschlagen: {exc}')
finally:
if temp_path and temp_path.exists():
temp_path.unlink(missing_ok=True)
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def toggle_nextcloud_enabled(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
currently_enabled = is_nextcloud_enabled()
config.nextcloud_enabled_override = not currently_enabled
config.save(update_fields=['nextcloud_enabled_override'])
_audit(request, 'nextcloud_mode_toggled', target_type='workflow_config', target_label='nextcloud', details={'enabled': config.nextcloud_enabled_override})
state = 'aktiviert' if config.nextcloud_enabled_override else 'deaktiviert'
messages.success(request, f'Nextcloud Upload wurde {state}.')
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def toggle_email_mode(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
currently_test_mode = is_email_test_mode()
config.email_test_mode_override = not currently_test_mode
config.save(update_fields=['email_test_mode_override'])
_audit(request, 'email_mode_toggled', target_type='workflow_config', target_label='email_mode', details={'test_mode': config.email_test_mode_override})
state = 'Testmodus (Umleitung)' if config.email_test_mode_override else 'Produktionsmodus'
messages.success(request, f'E-Mail-Modus wurde auf {state} gesetzt.')
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def save_integrations_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
except ValueError:
messages.error(request, 'Ungültige Zahl bei Sync-Intervall oder SMTP Port.')
return redirect('home')
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
config.sync_interval_seconds = max(10, sync_interval)
config.imap_server = request.POST.get('imap_server', '').strip()
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
config.smtp_server = request.POST.get('smtp_server', '').strip()
config.smtp_port = max(1, smtp_port)
config.email_account = request.POST.get('email_account', '').strip()
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
if nextcloud_password:
config.nextcloud_password_override = nextcloud_password
email_password = request.POST.get('email_password', '').strip()
if email_password:
config.email_password = email_password
config.save()
_audit(request, 'integrations_saved', target_type='workflow_config', target_label='all_integrations')
messages.success(request, 'Integrations-Einstellungen wurden gespeichert.')
return redirect('home')
@_require_capability('manage_integrations')
@require_POST
def save_nextcloud_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
except ValueError:
messages.error(request, 'Ungültige Zahl beim Sync-Intervall.')
return redirect('home')
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
config.sync_interval_seconds = max(10, sync_interval)
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
if nextcloud_password:
config.nextcloud_password_override = nextcloud_password
config.save()
_audit(request, 'nextcloud_settings_saved', target_type='workflow_config', target_label='nextcloud')
messages.success(request, 'Nextcloud-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=nextcloud')
@_require_capability('manage_integrations')
@require_POST
def save_workflow_rules(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
handover_lead_days = int(
request.POST.get(
'device_handover_lead_days',
config.device_handover_lead_days or 5,
)
)
except ValueError:
messages.error(request, 'Ungültige Zahl beim Hardware-Vorlauf.')
return redirect('/admin-tools/integrations/?kind=rules')
config.device_handover_lead_days = max(0, handover_lead_days)
config.save(update_fields=['device_handover_lead_days'])
_audit(
request,
'workflow_rules_saved',
target_type='workflow_config',
target_label='workflow_rules',
details={
'device_handover_lead_days': config.device_handover_lead_days,
},
)
messages.success(request, 'Workflow-Regeln wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=rules')
@_require_capability('manage_integrations')
@require_POST
def save_backup_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
target_type = (request.POST.get('remote_backup_target_type') or config.remote_backup_target_type or 'nextcloud').strip().lower()
if target_type not in {choice for choice, _ in WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES}:
target_type = 'nextcloud'
remote_backup_enabled = request.POST.get('remote_backup_enabled') == 'on'
remote_backup_nextcloud_directory = request.POST.get('remote_backup_nextcloud_directory', '').strip()
primary_nextcloud_directory = (
(config.nextcloud_directory_override or '').strip()
or settings.NEXTCLOUD_DIRECTORY.strip()
).strip('/')
if remote_backup_enabled and target_type == 'nextcloud':
if not remote_backup_nextcloud_directory:
messages.error(request, 'Bitte ein separates Nextcloud Backup-Verzeichnis angeben.')
return redirect('/admin-tools/integrations/?kind=backup')
if remote_backup_nextcloud_directory.strip('/') == primary_nextcloud_directory:
messages.error(request, 'Das Backup-Verzeichnis muss vom normalen Nextcloud Dokumentenordner getrennt sein.')
return redirect('/admin-tools/integrations/?kind=backup')
config.remote_backup_enabled = remote_backup_enabled
config.remote_backup_target_type = target_type
config.remote_backup_nextcloud_directory = remote_backup_nextcloud_directory
config.remote_backup_s3_bucket = request.POST.get('remote_backup_s3_bucket', '').strip()
config.remote_backup_nfs_path = request.POST.get('remote_backup_nfs_path', '').strip()
config.save(
update_fields=[
'device_handover_lead_days',
'remote_backup_enabled',
'remote_backup_target_type',
'remote_backup_nextcloud_directory',
'remote_backup_s3_bucket',
'remote_backup_nfs_path',
]
)
_audit(
request,
'backup_settings_saved',
target_type='workflow_config',
target_label='backup_settings',
details={
'remote_backup_enabled': config.remote_backup_enabled,
'remote_backup_target_type': config.remote_backup_target_type,
'remote_backup_nextcloud_directory': config.remote_backup_nextcloud_directory,
},
)
messages.success(request, 'Backup-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=backup')
@_require_capability('manage_integrations')
@require_POST
def save_mail_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
except ValueError:
messages.error(request, 'Ungültige Zahl beim SMTP Port.')
return redirect('home')
config.imap_server = request.POST.get('imap_server', '').strip()
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
config.smtp_server = request.POST.get('smtp_server', '').strip()
config.smtp_port = max(1, smtp_port)
config.email_account = request.POST.get('email_account', '').strip()
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
email_password = request.POST.get('email_password', '').strip()
if email_password:
config.email_password = email_password
config.save()
smtp_cfg, _ = SystemEmailConfig.objects.get_or_create(name='Default SMTP')
SystemEmailConfig.objects.exclude(id=smtp_cfg.id).update(is_active=False)
smtp_cfg.is_active = True
smtp_cfg.host = config.smtp_server
smtp_cfg.port = config.smtp_port
smtp_cfg.username = config.email_account
if email_password:
smtp_cfg.password = email_password
smtp_cfg.use_ssl = config.smtp_use_ssl
smtp_cfg.use_tls = config.smtp_use_tls
smtp_cfg.from_email = request.POST.get('from_email', '').strip()
smtp_cfg.save()
_audit(request, 'mail_settings_saved', target_type='workflow_config', target_label='mail')
messages.success(request, 'Mail-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=mail')
@_require_capability('manage_integrations')
@require_POST
def save_email_routing_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
config.it_onboarding_email = request.POST.get('it_onboarding_email', '').strip()
config.general_info_email = request.POST.get('general_info_email', '').strip()
config.business_card_email = request.POST.get('business_card_email', '').strip()
config.hr_works_email = request.POST.get('hr_works_email', '').strip()
config.key_notification_email = request.POST.get('key_notification_email', '').strip()
config.save(
update_fields=[
'it_onboarding_email',
'general_info_email',
'business_card_email',
'hr_works_email',
'key_notification_email',
]
)
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
for key in known_keys:
subject = request.POST.get(f'subject_{key}')
body = request.POST.get(f'body_{key}')
subject_en = request.POST.get(f'subject_en_{key}')
body_en = request.POST.get(f'body_en_{key}')
if subject is None and body is None and subject_en is None and body_en is None:
continue
subject = (subject or '').strip()
body = (body or '').strip()
subject_en = (subject_en or '').strip()
body_en = (body_en or '').strip()
if not subject and not body and not subject_en and not body_en:
continue
obj, _ = NotificationTemplate.objects.get_or_create(
key=key,
defaults={
'subject_template': subject or f'[{key}]',
'body_template': body or '-',
'subject_template_en': subject_en,
'body_template_en': body_en,
'is_active': True,
},
)
changed = []
if subject and obj.subject_template != subject:
obj.subject_template = subject
changed.append('subject_template')
if body and obj.body_template != body:
obj.body_template = body
changed.append('body_template')
if obj.subject_template_en != subject_en:
obj.subject_template_en = subject_en
changed.append('subject_template_en')
if obj.body_template_en != body_en:
obj.body_template_en = body_en
changed.append('body_template_en')
if not obj.is_active:
obj.is_active = True
changed.append('is_active')
if changed:
obj.save(update_fields=changed)
_audit(request, 'email_routing_saved', target_type='workflow_config', target_label='email_routing')
messages.success(request, 'E-Mail Routing und Vorlagen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=emails')
@_require_capability('manage_integrations')
@require_POST
def save_notification_rules(request):
rule_ids = request.POST.getlist('rule_ids')
for position, raw_id in enumerate(rule_ids):
rule = NotificationRule.objects.filter(id=raw_id).first()
if not rule:
continue
if request.POST.get(f'delete_{rule.id}') == 'on':
rule.delete()
continue
rule.name = request.POST.get(f'name_{rule.id}', '').strip() or rule.name
event_type = request.POST.get(f'event_type_{rule.id}', '').strip()
if event_type in {'onboarding', 'offboarding'}:
rule.event_type = event_type
operator = request.POST.get(f'operator_{rule.id}', '').strip()
if operator in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
rule.operator = operator
rule.field_name = request.POST.get(f'field_name_{rule.id}', '').strip()
rule.expected_value = request.POST.get(f'expected_value_{rule.id}', '').strip()
rule.recipients = request.POST.get(f'recipients_{rule.id}', '').strip()
rule.template_key = request.POST.get(f'template_key_{rule.id}', '').strip()
rule.custom_subject = request.POST.get(f'custom_subject_{rule.id}', '').strip()
rule.custom_body = request.POST.get(f'custom_body_{rule.id}', '').strip()
rule.custom_subject_en = request.POST.get(f'custom_subject_en_{rule.id}', '').strip()
rule.custom_body_en = request.POST.get(f'custom_body_en_{rule.id}', '').strip()
rule.include_pdf_attachment = request.POST.get(f'include_pdf_{rule.id}') == 'on'
rule.is_active = request.POST.get(f'active_{rule.id}') == 'on'
rule.sort_order = position
rule.save()
new_name = request.POST.get('new_name', '').strip()
new_recipients = request.POST.get('new_recipients', '').strip()
if new_name and new_recipients:
new_event = request.POST.get('new_event_type', 'onboarding').strip()
if new_event not in {'onboarding', 'offboarding'}:
new_event = 'onboarding'
new_operator = request.POST.get('new_operator', 'always').strip()
if new_operator not in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
new_operator = 'always'
NotificationRule.objects.create(
name=new_name,
event_type=new_event,
field_name=request.POST.get('new_field_name', '').strip(),
operator=new_operator,
expected_value=request.POST.get('new_expected_value', '').strip(),
recipients=new_recipients,
template_key=request.POST.get('new_template_key', '').strip(),
custom_subject=request.POST.get('new_custom_subject', '').strip(),
custom_body=request.POST.get('new_custom_body', '').strip(),
custom_subject_en=request.POST.get('new_custom_subject_en', '').strip(),
custom_body_en=request.POST.get('new_custom_body_en', '').strip(),
include_pdf_attachment=request.POST.get('new_include_pdf') == 'on',
is_active=True,
sort_order=NotificationRule.objects.filter(event_type=new_event).count() + 1,
)
_audit(request, 'notification_rules_saved', target_type='notification_rule')
messages.success(request, 'Benachrichtigungsregeln wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=emails')
@_require_capability('delete_requests')
@require_POST
def delete_request_from_dashboard(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
target_label = _request_target_label(obj, kind)
obj.delete()
_audit(request, 'request_deleted', target_type=kind, target_id=request_id, target_label=target_label)
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.')
return redirect('requests_dashboard')
@_require_capability('retry_requests')
@require_POST
def retry_request_from_dashboard(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
obj.processing_status = 'submitted'
obj.last_error = ''
obj.save(update_fields=['processing_status', 'last_error'])
process_onboarding_request.delay(obj.id)
_audit(request, 'request_retried', target_type='onboarding', target_id=obj.id, target_label=_request_target_label(obj, 'onboarding'))
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
obj.processing_status = 'submitted'
obj.last_error = ''
obj.save(update_fields=['processing_status', 'last_error'])
process_offboarding_request.delay(obj.id)
_audit(request, 'request_retried', target_type='offboarding', target_id=obj.id, target_label=_request_target_label(obj, 'offboarding'))
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde erneut angestoßen.')
return redirect('requests_dashboard')