Files
workdock-platform/backend/workflows/views.py
2026-03-26 10:49:59 +01:00

2365 lines
99 KiB
Python

from pathlib import Path
from datetime import timedelta
from tempfile import NamedTemporaryFile
import json
from functools import wraps
from celery import current_app
from django.conf import settings
from django.db import connection
from django.db import IntegrityError
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect, render
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.contrib.auth.tokens import default_token_generator
from django.http import JsonResponse
from django.views.decorators.http import require_POST
from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import gettext as _, gettext_lazy
from django.utils.translation import get_language, override
from django.urls import reverse
from .backup_ops import create_backup_bundle, delete_backup_bundle, list_backup_bundles, verify_backup_bundle
from .forms import OffboardingRequestForm, OnboardingRequestForm, UserManagementCreateForm
from .form_builder import (
DEFAULT_FIELD_ORDER,
LOCKED_FIELD_RULES,
ONBOARDING_DEFAULT_PAGE,
ONBOARDING_PAGE_LABELS,
ONBOARDING_PAGE_ORDER,
ensure_form_field_configs,
)
from .models import AdminAuditLog, EmployeeProfile, FormFieldConfig, FormOption, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, SystemEmailConfig, WorkflowConfig
from .emailing import send_system_email
from .roles import ROLE_GROUP_NAMES, ROLE_LABELS, ROLE_SUPER_ADMIN, assign_user_role, get_user_role_key, get_user_role_label, user_has_capability
from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud
from .tasks import (
DEFAULT_NOTIFICATION_TEMPLATES,
_generate_onboarding_intro_pdf,
_generate_onboarding_intro_session_pdf,
build_intro_sections_for_request,
process_offboarding_request,
process_onboarding_request,
send_scheduled_welcome_email,
)
def _redirect_back(request, fallback: str):
target = (request.POST.get('next') or request.GET.get('next') or '').strip()
if target.startswith('/'):
return redirect(target)
referer = (request.META.get('HTTP_REFERER') or '').strip()
if referer.startswith('http://127.0.0.1') or referer.startswith('http://localhost') or referer.startswith('/'):
return redirect(referer)
return redirect(fallback)
ONBOARDING_GROUPS = {
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
'employment-end-box': ['employment_end_date'],
'group-mailboxes-box': ['group_mailboxes'],
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
'extra-software-box': ['additional_software_multi', 'additional_software'],
'extra-access-box': ['additional_access_text'],
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
'phone-box': ['phone_number_choice'],
}
ONBOARDING_HIDDEN_BY_DEFAULT = {
'business-card-box',
'employment-end-box',
'group-mailboxes-box',
'extra-hardware-box',
'extra-software-box',
'extra-access-box',
'successor-box',
}
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
ONBOARDING_CHECKBOX_LISTS = {
'needed_devices_multi',
'additional_hardware_multi',
'needed_software_multi',
'additional_software_multi',
'needed_accesses_multi',
'needed_workspace_groups_multi',
'needed_resources_multi',
}
ONBOARDING_SECTION_ORDER = ['stammdaten', 'vertrag', 'itsetup', 'abschluss']
ONBOARDING_SECTION_META = {
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
}
def healthz(request):
db_ok = True
try:
with connection.cursor() as cursor:
cursor.execute('SELECT 1')
cursor.fetchone()
except Exception:
db_ok = False
status_code = 200 if db_ok else 503
return JsonResponse(
{
'status': 'ok' if db_ok else 'degraded',
'service': 'onoff_v2',
'db': 'ok' if db_ok else 'error',
'time': timezone.now().isoformat(),
},
status=status_code,
)
def _require_capability(capability: str):
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped(request, *args, **kwargs):
if not user_has_capability(request.user, capability):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
return view_func(request, *args, **kwargs)
return wrapped
return decorator
def _display_user_name(user) -> str:
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
username = (getattr(user, 'username', '') or '').strip()
if username:
return username
return (getattr(user, 'email', '') or '').strip()
def _audit(
request,
action: str,
*,
target_type: str = '',
target_id: int | None = None,
target_label: str = '',
details: dict | None = None,
) -> None:
if not getattr(request, 'user', None) or not request.user.is_authenticated:
return
AdminAuditLog.objects.create(
actor=request.user,
actor_display=_display_user_name(request.user),
action=action,
target_type=target_type,
target_id=target_id,
target_label=target_label,
details=details or {},
)
def _form_field_labels(form_type: str) -> dict[str, str]:
if form_type == 'onboarding':
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
if form_type == 'offboarding':
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
return {}
def _request_target_label(obj, kind: str | None = None) -> str:
request_kind = (kind or '').strip()
if not request_kind:
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
email = (getattr(obj, 'work_email', '') or '').strip()
created_at = getattr(obj, 'created_at', None)
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
parts = [request_kind.capitalize(), name]
if email:
parts.append(f'<{email}>')
if date_label:
parts.append(date_label)
return ' | '.join(parts)
def _request_status_label(status_key: str, language_code: str | None = None) -> str:
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
with override(lang):
labels = {
'submitted': _('Eingereicht'),
'processing': _('In Bearbeitung'),
'completed': _('Abgeschlossen'),
'failed': _('Fehlgeschlagen'),
}
return labels.get(status_key, status_key)
def _audit_action_label(action: str) -> str:
labels = {
'requests_deleted': _('Vorgänge gelöscht'),
'request_deleted': _('Vorgang gelöscht'),
'request_retried': _('Vorgang erneut angestoßen'),
'intro_pdf_generated': _('Einweisungs-PDF erzeugt'),
'intro_live_pdf_generated': _('Live-Protokoll erzeugt'),
'intro_session_reset': _('Einweisung zurückgesetzt'),
'intro_session_saved': _('Einweisung als Entwurf gespeichert'),
'intro_session_completed': _('Einweisung abgeschlossen'),
'form_option_deleted': _('Formularoption gelöscht'),
'form_options_saved': _('Formularoptionen gespeichert'),
'form_field_texts_saved': _('Feldtexte gespeichert'),
'form_layout_saved': _('Formularlayout gespeichert'),
'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'),
'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'),
'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'),
'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'),
'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'),
'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'),
'welcome_email_paused': _('Welcome E-Mail pausiert'),
'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'),
'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'),
'smtp_test_sent': _('SMTP-Test gesendet'),
'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'),
'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'),
'email_mode_toggled': _('E-Mail-Modus umgeschaltet'),
'integrations_saved': _('Integrationen gespeichert'),
'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'),
'mail_settings_saved': _('Mail-Einstellungen gespeichert'),
'email_routing_saved': _('E-Mail-Routing gespeichert'),
'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'),
'user_created': _('Benutzer erstellt'),
'user_updated': _('Benutzer aktualisiert'),
'user_password_reset_sent': _('Passwort-Reset-Link versendet'),
'user_deleted': _('Benutzer gelöscht'),
'backup_created': _('Backup erstellt'),
'backup_verified': _('Backup verifiziert'),
'backup_deleted': _('Backup gelöscht'),
'backup_settings_saved': _('Backup-Einstellungen gespeichert'),
}
return labels.get(action, action.replace('_', ' ').strip().capitalize())
def _translate_choice_list(choices):
return [(value, str(label)) for value, label in choices]
def _build_onboarding_layout(form) -> list[dict]:
ordered_names = list(form.fields.keys())
group_by_field = {}
for group_id, group_fields in ONBOARDING_GROUPS.items():
for name in group_fields:
group_by_field[name] = group_id
rendered_groups = set()
consumed = set()
blocks = []
for field_name in ordered_names:
if field_name in consumed:
continue
group_id = group_by_field.get(field_name)
if group_id:
if group_id in rendered_groups:
continue
group_fields = [
form[name]
for name in ONBOARDING_GROUPS[group_id]
if name in form.fields
]
if not group_fields:
continue
blocks.append(
{
'kind': 'group',
'id': group_id,
'hidden_default': group_id in ONBOARDING_HIDDEN_BY_DEFAULT,
'fields': group_fields,
}
)
rendered_groups.add(group_id)
consumed.update([f.name for f in group_fields])
continue
blocks.append({'kind': 'field', 'field': form[field_name]})
consumed.add(field_name)
return blocks
def _section_for_block(block: dict, field_pages: dict[str, str]) -> str:
if block['kind'] == 'field':
return field_pages.get(block['field'].name, 'abschluss')
fields = block.get('fields') or []
if not fields:
return 'abschluss'
return field_pages.get(fields[0].name, 'abschluss')
def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str]) -> list[dict]:
grouped = {key: [] for key in ONBOARDING_SECTION_ORDER}
for block in blocks:
section_key = _section_for_block(block, field_pages)
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(block)
return [
{
'key': key,
'title': ONBOARDING_SECTION_META[key]['title'],
'subtitle': ONBOARDING_SECTION_META[key]['subtitle'],
'blocks': grouped[key],
}
for key in ONBOARDING_SECTION_ORDER
]
@login_required
def home(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
return render(
request,
'workflows/home.html',
{
'nextcloud_enabled': is_nextcloud_enabled(),
'email_test_mode': is_email_test_mode(),
'workflow_config': config,
'role_label': get_user_role_label(request.user),
},
)
def _user_management_rows():
user_model = get_user_model()
role_order = {
ROLE_SUPER_ADMIN: 0,
'admin': 1,
'it_staff': 2,
'staff': 3,
}
rows = []
for user in user_model.objects.all().order_by('-is_active', 'username'):
role_key = get_user_role_key(user)
rows.append(
{
'user': user,
'role_key': role_key,
'role_label': str(ROLE_LABELS[role_key]),
'role_sort': role_order.get(role_key, 99),
'display_name': _display_user_name(user),
}
)
rows.sort(key=lambda item: (not item['user'].is_active, item['role_sort'], item['user'].username.lower()))
return rows
def _render_user_management(request, create_form=None, status_code: int = 200):
recent_user_events = list(
AdminAuditLog.objects.select_related('actor')
.filter(action__in=['user_created', 'user_updated', 'user_password_reset_sent', 'user_deleted'])
.order_by('-created_at', '-id')[:12]
)
for row in recent_user_events:
row.action_label = _audit_action_label(row.action)
role_key = (row.details or {}).get('role')
row.role_label = str(ROLE_LABELS[role_key]) if role_key in ROLE_LABELS else role_key
return render(
request,
'workflows/user_management.html',
{
'create_form': create_form or UserManagementCreateForm(),
'rows': _user_management_rows(),
'role_choices': [(key, str(ROLE_LABELS[key])) for key in ROLE_GROUP_NAMES],
'recent_user_events': recent_user_events,
},
status=status_code,
)
def _super_admin_user_count() -> int:
user_model = get_user_model()
return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_SUPER_ADMIN and user.is_active)
def _would_remove_last_super_admin(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool:
if get_user_role_key(user) != ROLE_SUPER_ADMIN or not user.is_active:
return False
if _super_admin_user_count() > 1:
return False
if deleting:
return True
if new_role_key is not None and new_role_key != ROLE_SUPER_ADMIN:
return True
if new_is_active is not None and not new_is_active:
return True
return False
def _send_user_access_email(request, target_user, *, invitation: bool) -> None:
email = (target_user.email or '').strip()
if not email:
raise ValueError(_('Für diesen Benutzer ist keine E-Mail-Adresse hinterlegt.'))
uid = urlsafe_base64_encode(force_bytes(target_user.pk))
token = default_token_generator.make_token(target_user)
reset_path = reverse('password_reset_confirm', kwargs={'uidb64': uid, 'token': token})
reset_url = request.build_absolute_uri(reset_path)
if invitation:
subject = _('Zugangseinladung für %(username)s') % {'username': target_user.username}
body = _(
'Hallo %(name)s,\n\n'
'für Sie wurde ein Benutzerkonto im TUBCO Onboarding- und Offboarding-Portal angelegt.\n'
'Bitte öffnen Sie den folgenden Link, um Ihr Passwort zu setzen:\n'
'%(url)s\n\n'
'Wenn Sie diese Einladung nicht erwartet haben, melden Sie sich bitte bei Ihrem Administrator.'
) % {
'name': _display_user_name(target_user),
'url': reset_url,
}
else:
subject = _('Passwort zurücksetzen für %(username)s') % {'username': target_user.username}
body = _(
'Hallo %(name)s,\n\n'
'für Ihr Konto wurde ein Link zum Zurücksetzen des Passworts erstellt.\n'
'Bitte öffnen Sie den folgenden Link:\n'
'%(url)s\n\n'
'Wenn Sie diese Anfrage nicht erwartet haben, können Sie diese E-Mail ignorieren.'
) % {
'name': _display_user_name(target_user),
'url': reset_url,
}
send_system_email(subject=subject, body=body, to=[email])
@_require_capability('manage_users')
def user_management_page(request):
return _render_user_management(request)
@_require_capability('manage_users')
@require_POST
def create_user_from_admin(request):
form = UserManagementCreateForm(request.POST)
if not form.is_valid():
messages.error(request, _('Benutzer konnte nicht erstellt werden. Bitte prüfen Sie die Eingaben.'))
return _render_user_management(request, create_form=form, status_code=400)
user = form.save()
_send_user_access_email(request, user, invitation=True)
_audit(
request,
'user_created',
target_type='user',
target_id=user.id,
target_label=_display_user_name(user),
details={'username': user.username, 'role': get_user_role_key(user), 'invitation_sent': True},
)
messages.success(request, _('Benutzer wurde erstellt und eingeladen: %(username)s') % {'username': user.username})
return redirect('user_management_page')
@_require_capability('manage_users')
@require_POST
def update_user_from_admin(request, user_id: int):
user_model = get_user_model()
target_user = get_object_or_404(user_model, id=user_id)
role_key = (request.POST.get('role_key') or '').strip()
is_active = request.POST.get('is_active') == 'on'
new_password = (request.POST.get('new_password') or '').strip()
if role_key not in ROLE_GROUP_NAMES:
messages.error(request, _('Ungültige Rolle.'))
return redirect('user_management_page')
if target_user == request.user and (role_key != ROLE_SUPER_ADMIN or not is_active):
messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst sperren oder herabstufen.'))
return redirect('user_management_page')
if _would_remove_last_super_admin(target_user, new_role_key=role_key, new_is_active=is_active):
messages.error(request, _('Der letzte aktive Super Admin kann nicht deaktiviert oder herabgestuft werden.'))
return redirect('user_management_page')
assign_user_role(target_user, role_key)
target_user.is_active = is_active
if new_password:
target_user.set_password(new_password)
target_user.save()
_audit(
request,
'user_updated',
target_type='user',
target_id=target_user.id,
target_label=_display_user_name(target_user),
details={'username': target_user.username, 'role': role_key, 'is_active': is_active, 'password_changed': bool(new_password)},
)
messages.success(request, _('Benutzer wurde aktualisiert: %(username)s') % {'username': target_user.username})
return redirect('user_management_page')
@_require_capability('manage_users')
@require_POST
def send_password_reset_from_admin(request, user_id: int):
user_model = get_user_model()
target_user = get_object_or_404(user_model, id=user_id)
try:
_send_user_access_email(request, target_user, invitation=False)
except ValueError as exc:
messages.error(request, str(exc))
return redirect('user_management_page')
_audit(
request,
'user_password_reset_sent',
target_type='user',
target_id=target_user.id,
target_label=_display_user_name(target_user),
details={'username': target_user.username, 'email': target_user.email},
)
messages.success(request, _('Passwort-Reset-Link wurde versendet: %(username)s') % {'username': target_user.username})
return redirect('user_management_page')
@_require_capability('manage_users')
@require_POST
def delete_user_from_admin(request, user_id: int):
user_model = get_user_model()
target_user = get_object_or_404(user_model, id=user_id)
if target_user == request.user:
messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst löschen.'))
return redirect('user_management_page')
if _would_remove_last_super_admin(target_user, deleting=True):
messages.error(request, _('Der letzte aktive Super Admin kann nicht gelöscht werden.'))
return redirect('user_management_page')
target_label = _display_user_name(target_user)
username = target_user.username
target_user.delete()
_audit(
request,
'user_deleted',
target_type='user',
target_label=target_label,
details={'username': username},
)
messages.success(request, _('Benutzer wurde gelöscht: %(username)s') % {'username': username})
return redirect('user_management_page')
@_require_capability('view_docs')
def handbook_page(request):
return render(request, 'workflows/handbook.html')
@_require_capability('view_docs')
def project_wiki_page(request):
return render(request, 'workflows/project_wiki.html')
@_require_capability('view_docs')
def developer_handbook_page(request):
return render(request, 'workflows/developer_handbook.html')
@_require_capability('view_docs')
def release_checklist_page(request):
return render(request, 'workflows/release_checklist.html')
@_require_capability('view_audit_log')
def audit_log_page(request):
action = (request.GET.get('action') or '').strip()
user_query = (request.GET.get('user') or '').strip()
date_from = (request.GET.get('date_from') or '').strip()
date_to = (request.GET.get('date_to') or '').strip()
rows_qs = AdminAuditLog.objects.select_related('actor').all()
if action:
rows_qs = rows_qs.filter(action=action)
if user_query:
rows_qs = rows_qs.filter(
Q(actor_display__icontains=user_query)
| Q(actor__username__icontains=user_query)
| Q(actor__email__icontains=user_query)
)
if date_from:
rows_qs = rows_qs.filter(created_at__date__gte=date_from)
if date_to:
rows_qs = rows_qs.filter(created_at__date__lte=date_to)
rows = list(rows_qs[:300])
action_choices = (
AdminAuditLog.objects.order_by('action').values_list('action', flat=True).distinct()
)
return render(
request,
'workflows/audit_log.html',
{
'rows': rows,
'action_choices': action_choices,
'selected_action': action,
'user_query': user_query,
'date_from': date_from,
'date_to': date_to,
},
)
@_require_capability('manage_backups')
def backup_recovery_page(request):
return render(
request,
'workflows/backup_recovery.html',
{
'rows': list_backup_bundles(),
},
)
@_require_capability('manage_backups')
@require_POST
def create_backup_from_admin(request):
try:
result = create_backup_bundle()
_audit(
request,
'backup_created',
target_type='backup_bundle',
target_label=result['name'],
details={'path': result['path']},
)
messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']})
except Exception as exc:
messages.error(request, _('Backup konnte nicht erstellt werden: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('manage_backups')
@require_POST
def verify_backup_from_admin(request, backup_name: str):
try:
result = verify_backup_bundle(backup_name)
_audit(
request,
'backup_verified',
target_type='backup_bundle',
target_label=backup_name,
details={'summary': result['summary']},
)
messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']})
except Exception as exc:
messages.error(request, _('Backup-Verifikation fehlgeschlagen: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('manage_backups')
@require_POST
def delete_backup_from_admin(request, backup_name: str):
try:
result = delete_backup_bundle(backup_name)
_audit(
request,
'backup_deleted',
target_type='backup_bundle',
target_label=backup_name,
details={},
)
messages.success(request, _('Backup wurde gelöscht: %(name)s') % {'name': result['name']})
except Exception as exc:
messages.error(request, _('Backup konnte nicht gelöscht werden: %(error)s') % {'error': exc})
return redirect('backup_recovery_page')
@_require_capability('access_requests_dashboard')
def request_timeline_page(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
request_label = _request_target_label(obj, kind)
audit_rows = list(
AdminAuditLog.objects.select_related('actor')
.filter(target_type__in=[kind, 'request'])
.filter(Q(target_id=request_id) | Q(target_label__icontains=(obj.full_name or '').strip()))
.order_by('-created_at', '-id')[:200]
)
timeline_rows = [
{
'created_at': obj.created_at,
'kind': 'system',
'title': _('Anfrage erstellt'),
'summary': request_label,
'meta': _('Status: %(status)s') % {'status': obj.get_processing_status_display()},
}
]
contract_start = getattr(obj, 'contract_start', None)
if contract_start:
timeline_rows.append(
{
'created_at': timezone.make_aware(timezone.datetime.combine(contract_start, timezone.datetime.min.time())),
'kind': 'milestone',
'title': _('Vertragsbeginn'),
'summary': str(contract_start),
'meta': _('Geplanter Start'),
}
)
handover_date = getattr(obj, 'handover_date', None)
if handover_date:
timeline_rows.append(
{
'created_at': timezone.make_aware(timezone.datetime.combine(handover_date, timezone.datetime.min.time())),
'kind': 'milestone',
'title': _('Geräteübergabe / Hardware-Abholung'),
'summary': str(handover_date),
'meta': _('Geplanter Hardware-Termin'),
}
)
if getattr(obj, 'generated_pdf_path', ''):
timeline_rows.append(
{
'created_at': obj.created_at,
'kind': 'document',
'title': _('PDF verfügbar'),
'summary': Path(obj.generated_pdf_path).name,
'meta': '',
'url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}",
}
)
for row in audit_rows:
timeline_rows.append(
{
'created_at': row.created_at,
'kind': 'audit',
'title': _audit_action_label(row.action),
'summary': row.target_label or row.target_type or '-',
'meta': row.actor_display or '-',
'details': row.details,
}
)
if kind == 'onboarding':
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
if intro_session:
timeline_rows.append(
{
'created_at': intro_session.updated_at,
'kind': 'session',
'title': _('Einweisungssitzung'),
'summary': intro_session.get_status_display(),
'meta': intro_session.completed_by_name or '-',
'url': (f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" if intro_session.exported_pdf_path else ''),
}
)
welcome_email = ScheduledWelcomeEmail.objects.filter(onboarding_request=obj).first()
if welcome_email:
timeline_rows.append(
{
'created_at': welcome_email.updated_at,
'kind': 'email',
'title': _('Welcome E-Mail'),
'summary': welcome_email.get_status_display(),
'meta': welcome_email.recipient_email,
}
)
timeline_rows.sort(key=lambda item: item['created_at'])
return render(
request,
'workflows/request_timeline.html',
{
'request_kind': kind,
'request_obj': obj,
'request_label': request_label,
'timeline_rows': timeline_rows,
'contract_start': getattr(obj, 'contract_start', None),
'handover_date': getattr(obj, 'handover_date', None),
},
)
@login_required
def requests_dashboard(request):
if not user_has_capability(request.user, 'access_requests_dashboard'):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
if request.method == 'POST':
if not user_has_capability(request.user, 'delete_requests'):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('requests_dashboard')
selected = request.POST.getlist('selected_requests')
single_delete = (request.POST.get('single_delete') or '').strip()
if single_delete:
selected = [single_delete]
if not selected:
messages.warning(request, _('Keine Einträge ausgewählt.'))
return redirect('requests_dashboard')
deleted_count = 0
invalid_count = 0
deleted_labels = []
for token in selected:
try:
kind, raw_id = token.split(':', 1)
request_id = int(raw_id)
except (ValueError, TypeError):
invalid_count += 1
continue
model = None
if kind == 'onboarding':
model = OnboardingRequest
elif kind == 'offboarding':
model = OffboardingRequest
else:
invalid_count += 1
continue
obj = model.objects.filter(id=request_id).first()
if not obj:
continue
deleted_labels.append(_request_target_label(obj, kind))
obj.delete()
deleted_count += 1
if deleted_count:
_audit(
request,
'requests_deleted',
target_type='request',
target_label='Dashboard bulk/single delete',
details={
'deleted_count': deleted_count,
'invalid_count': invalid_count,
'selected': selected,
'request_labels': deleted_labels,
},
)
messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count})
if invalid_count:
messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count})
if not deleted_count and not invalid_count:
messages.info(request, _('Keine passenden Einträge gefunden.'))
return redirect('requests_dashboard')
search_query = request.GET.get('q', '').strip()
type_filter = (request.GET.get('type') or '').strip().lower()
status_filter = (request.GET.get('status') or '').strip().lower()
department_filter = (request.GET.get('department') or '').strip()
date_from = (request.GET.get('date_from') or '').strip()
date_to = (request.GET.get('date_to') or '').strip()
onboarding_qs = OnboardingRequest.objects.order_by('-created_at')
offboarding_qs = OffboardingRequest.objects.order_by('-created_at')
all_onboarding = OnboardingRequest.objects.all()
all_offboarding = OffboardingRequest.objects.all()
if search_query:
onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
if status_filter in {'submitted', 'processing', 'completed', 'failed'}:
onboarding_qs = onboarding_qs.filter(processing_status=status_filter)
offboarding_qs = offboarding_qs.filter(processing_status=status_filter)
if department_filter:
onboarding_qs = onboarding_qs.filter(department=department_filter)
offboarding_qs = offboarding_qs.filter(department=department_filter)
if date_from:
onboarding_qs = onboarding_qs.filter(created_at__date__gte=date_from)
offboarding_qs = offboarding_qs.filter(created_at__date__gte=date_from)
if date_to:
onboarding_qs = onboarding_qs.filter(created_at__date__lte=date_to)
offboarding_qs = offboarding_qs.filter(created_at__date__lte=date_to)
if type_filter == 'onboarding':
offboarding_qs = offboarding_qs.none()
elif type_filter == 'offboarding':
onboarding_qs = onboarding_qs.none()
onboarding_items = onboarding_qs[:50]
offboarding_items = offboarding_qs[:50]
language_code = (
request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME)
or getattr(request, 'LANGUAGE_CODE', '')
or get_language()
or 'de'
).split('-')[0].lower()
rows = []
for obj in onboarding_items:
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
if intro_session and intro_session.exported_pdf_path:
intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}"
rows.append(
{
'id': obj.id,
'kind': 'Onboarding',
'kind_slug': 'onboarding',
'name': obj.full_name,
'work_email': obj.work_email,
'created_at': obj.created_at,
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None,
'intro_session': intro_session,
'status': _request_status_label(obj.processing_status, language_code),
'status_key': obj.processing_status,
'last_error': obj.last_error,
}
)
for obj in offboarding_items:
rows.append(
{
'id': obj.id,
'kind': 'Offboarding',
'kind_slug': 'offboarding',
'name': obj.full_name,
'work_email': obj.work_email,
'created_at': obj.created_at,
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
'intro_pdf_url': None,
'intro_session': None,
'status': _request_status_label(obj.processing_status, language_code),
'status_key': obj.processing_status,
'last_error': obj.last_error,
}
)
rows.sort(key=lambda x: x['created_at'], reverse=True)
today = timezone.localdate()
start_date = today - timedelta(days=13)
onboarding_daily = {}
offboarding_daily = {}
for i in range(14):
day = start_date + timedelta(days=i)
onboarding_daily[day] = 0
offboarding_daily[day] = 0
for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
onboarding_daily[timezone.localtime(dt).date()] += 1
for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
offboarding_daily[timezone.localtime(dt).date()] += 1
chart_points = []
max_total = 1
for i in range(14):
day = start_date + timedelta(days=i)
on_count = onboarding_daily[day]
off_count = offboarding_daily[day]
total = on_count + off_count
max_total = max(max_total, total)
chart_points.append(
{
'label': day.strftime('%d.%m'),
'onboarding': on_count,
'offboarding': off_count,
'total': total,
}
)
for point in chart_points:
point['height'] = max(8, int((point['total'] / max_total) * 84))
onboarding_total = onboarding_qs.count()
offboarding_total = offboarding_qs.count()
departments = sorted(
{
value.strip()
for value in list(all_onboarding.exclude(department='').values_list('department', flat=True))
+ list(all_offboarding.exclude(department='').values_list('department', flat=True))
if value and value.strip()
},
key=str.lower,
)
status_choices = [
{'value': 'submitted', 'label': _request_status_label('submitted', language_code)},
{'value': 'processing', 'label': _request_status_label('processing', language_code)},
{'value': 'completed', 'label': _request_status_label('completed', language_code)},
{'value': 'failed', 'label': _request_status_label('failed', language_code)},
]
has_filters = any([search_query, type_filter, status_filter, department_filter, date_from, date_to])
column_count = 4
if user_has_capability(request.user, 'delete_requests'):
column_count += 1
if user_has_capability(request.user, 'run_intro_session') or user_has_capability(request.user, 'generate_intro_pdfs'):
column_count += 1
if user_has_capability(request.user, 'access_requests_dashboard'):
column_count += 1
return render(
request,
'workflows/requests_dashboard.html',
{
'rows': rows[:60],
'search_query': search_query,
'selected_type': type_filter,
'selected_status': status_filter,
'selected_department': department_filter,
'date_from': date_from,
'date_to': date_to,
'departments': departments,
'status_choices': status_choices,
'has_filters': has_filters,
'column_count': column_count,
'onboarding_total': onboarding_total,
'offboarding_total': offboarding_total,
'combined_total': onboarding_total + offboarding_total,
'chart_points': chart_points,
},
)
@login_required
@ensure_csrf_cookie
def onboarding_create(request):
config = WorkflowConfig.objects.order_by('id').first()
legal_text = (
config.legal_text
if config and config.legal_text
else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.'
)
if request.method == 'POST':
form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email)
if form.is_valid():
obj = form.save()
obj.onboarded_by_name = _display_user_name(request.user)
obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0])
obj.save(update_fields=['onboarded_by_name', 'preferred_language'])
process_onboarding_request.delay(obj.id)
return redirect(f"/onboarding/new/?saved=1&id={obj.id}")
else:
form = OnboardingRequestForm(requester_email=request.user.email)
onboarding_blocks = _build_onboarding_layout(form)
field_pages = getattr(form, '_field_page_keys', {})
onboarding_sections = _build_onboarding_sections(onboarding_blocks, field_pages)
return render(
request,
'workflows/onboarding_form.html',
{
'form': form,
'onboarding_blocks': onboarding_blocks,
'onboarding_sections': onboarding_sections,
'onboarding_inline_checks': ONBOARDING_INLINE_CHECKS,
'onboarding_checkbox_lists': ONBOARDING_CHECKBOX_LISTS,
'legal_text': legal_text,
'saved': request.GET.get('saved') == '1',
'saved_request_id': request.GET.get('id', ''),
},
)
@login_required
def onboarding_success(request, request_id: int):
obj = get_object_or_404(OnboardingRequest, id=request_id)
pdf_url = None
if obj.generated_pdf_path:
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
@_require_capability('generate_intro_pdfs')
@require_POST
def generate_onboarding_intro_pdf(request, request_id: int):
obj = get_object_or_404(OnboardingRequest, id=request_id)
pdf_path = _generate_onboarding_intro_pdf(obj, language_code=get_language())
obj.intro_pdf_path = str(pdf_path)
obj.save(update_fields=['intro_pdf_path'])
_audit(request, 'intro_pdf_generated', target_type='onboarding', target_id=obj.id, target_label=obj.full_name)
messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.'))
return redirect('requests_dashboard')
@_require_capability('generate_intro_pdfs')
@require_POST
def generate_onboarding_intro_session_pdf(request, request_id: int):
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
pdf_path = _generate_onboarding_intro_session_pdf(
session,
admin_signature_name=_display_user_name(request.user),
language_code=get_language(),
)
session.exported_pdf_path = str(pdf_path)
session.save(update_fields=['exported_pdf_path'])
_audit(request, 'intro_live_pdf_generated', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name)
messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.'))
return redirect('onboarding_intro_session_page', request_id=request_id)
@_require_capability('run_intro_session')
def onboarding_intro_session_page(request, request_id: int):
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
sections = build_intro_sections_for_request(onboarding, language_code=get_language())
if request.method == 'POST':
checked_ids = set(request.POST.getlist('checked_items'))
checklist_state = {}
for section in sections:
for item in section['items']:
checklist_state[item['id']] = item['id'] in checked_ids
action = (request.POST.get('session_action') or 'save').strip()
session.checklist_state = checklist_state
session.notes = (request.POST.get('notes') or '').strip()
if action == 'reset':
session.checklist_state = {}
session.notes = ''
session.status = 'draft'
session.completed_at = None
session.completed_by_name = ''
session.exported_pdf_path = ''
session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path'])
_audit(request, 'intro_session_reset', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name)
messages.success(request, _('Einweisung wurde zurückgesetzt.'))
return redirect('onboarding_intro_session_page', request_id=request_id)
if action == 'complete':
session.status = 'completed'
session.completed_at = timezone.now()
session.completed_by_name = _display_user_name(request.user)
_audit(
request,
'intro_session_completed',
target_type='onboarding',
target_id=onboarding.id,
target_label=onboarding.full_name,
details={'checked_count': len([value for value in checklist_state.values() if value])},
)
messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.'))
else:
session.status = 'draft'
session.completed_at = None
session.completed_by_name = ''
_audit(
request,
'intro_session_saved',
target_type='onboarding',
target_id=onboarding.id,
target_label=onboarding.full_name,
details={'checked_count': len([value for value in checklist_state.values() if value])},
)
messages.success(request, _('Einweisung wurde als Entwurf gespeichert.'))
session.save()
return redirect('onboarding_intro_session_page', request_id=request_id)
checked_map = session.checklist_state or {}
checked_count = 0
total_count = 0
for section in sections:
for item in section['items']:
item['checked'] = bool(checked_map.get(item['id']))
total_count += 1
if item['checked']:
checked_count += 1
salutation = (onboarding.get_gender_display() or '').strip()
display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name
progress_percent = int((checked_count / total_count) * 100) if total_count else 0
return render(
request,
'workflows/onboarding_intro_session.html',
{
'onboarding': onboarding,
'session': session,
'display_name': display_name,
'sections': sections,
'checked_count': checked_count,
'total_count': total_count,
'progress_percent': progress_percent,
'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None,
},
)
@login_required
@ensure_csrf_cookie
def offboarding_create(request):
profile_id = request.GET.get('profile')
search_query = request.GET.get('q', '').strip()
selected_profile = None
if profile_id:
selected_profile = EmployeeProfile.objects.filter(id=profile_id).first()
search_results = []
if search_query:
search_results = list(
EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10]
) + list(
EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10]
)
# preserve order while removing duplicates
seen = set()
unique = []
for r in search_results:
if r.id not in seen:
unique.append(r)
seen.add(r.id)
search_results = unique[:10]
if request.method == 'POST':
form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile)
if form.is_valid():
obj = form.save(commit=False)
if selected_profile:
obj.employee_profile = selected_profile
requester_email = (request.user.email or '').strip().lower()
if requester_email and requester_email.endswith('@tub.co'):
obj.requested_by_email = requester_email
else:
obj.requested_by_email = settings.DEFAULT_FROM_EMAIL
obj.requested_by_name = _display_user_name(request.user)
obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0])
obj.save()
process_offboarding_request.delay(obj.id)
return redirect(f"/offboarding/new/?saved=1&id={obj.id}")
else:
form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query})
return render(
request,
'workflows/offboarding_form.html',
{
'form': form,
'search_results': search_results,
'selected_profile': selected_profile,
'search_query': search_query,
'saved': request.GET.get('saved') == '1',
'saved_request_id': request.GET.get('id', ''),
},
)
@login_required
def offboarding_success(request, request_id: int):
obj = get_object_or_404(OffboardingRequest, id=request_id)
pdf_url = None
if obj.generated_pdf_path:
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
@_require_capability('manage_builders')
def form_builder_page(request):
language_code = get_language()
form_type = request.GET.get('form_type', 'onboarding')
if form_type not in DEFAULT_FIELD_ORDER:
form_type = 'onboarding'
option_category = request.GET.get('option_category', 'department')
option_categories = [c[0] for c in FormOption.CATEGORY_CHOICES]
if option_category not in option_categories:
option_category = option_categories[0]
if request.method == 'POST':
delete_option_id = request.POST.get('delete_option_id', '').strip()
if delete_option_id:
option = FormOption.objects.filter(id=delete_option_id).first()
if not option:
messages.error(request, 'Option nicht gefunden.')
else:
option_category = option.category
deleted_label = option.label
deleted_id = option.id
option.delete()
_audit(request, 'form_option_deleted', target_type='form_option', target_id=deleted_id, target_label=deleted_label)
messages.success(request, 'Option wurde gelöscht.')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}")
action = request.POST.get('builder_action', '')
if action == 'add_option':
category = request.POST.get('category', '').strip()
label = request.POST.get('label', '').strip()
label_en = request.POST.get('label_en', '').strip()
value = request.POST.get('value', '').strip()
if category not in option_categories:
messages.error(request, 'Ungültige Kategorie.')
elif not label:
messages.error(request, 'Bitte einen Namen für die Option angeben.')
else:
next_sort = (
FormOption.objects.filter(category=category).order_by('-sort_order').values_list('sort_order', flat=True).first()
)
FormOption.objects.create(
# Global form option catalog entry
category=category,
label=label,
label_en=label_en,
value=value or label,
sort_order=(next_sort + 1) if next_sort is not None else 0,
is_active=True,
)
_audit(
request,
'form_option_added',
target_type='form_option',
target_label=label,
details={'category': category, 'label_en': label_en, 'value': value or label},
)
messages.success(request, 'Option wurde hinzugefügt.')
option_category = category
elif action == 'save_options':
option_ids = request.POST.getlist('option_ids')
for pos, raw_id in enumerate(option_ids):
option = FormOption.objects.filter(id=raw_id).first()
if not option:
continue
next_label = request.POST.get(f'label_{option.id}', '').strip() or option.label
option.label = next_label
option.label_en = request.POST.get(f'label_en_{option.id}', '').strip()
option.value = request.POST.get(f'value_{option.id}', '').strip() or next_label
option.is_active = request.POST.get(f'active_{option.id}') == 'on'
option.sort_order = pos
try:
option.save(update_fields=['label', 'label_en', 'value', 'is_active', 'sort_order'])
except IntegrityError:
messages.error(request, f'Doppelte Bezeichnung in Kategorie: {next_label}')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option.category}")
option_category = option.category
_audit(request, 'form_options_saved', target_type='form_option', target_label=option_category, details={'count': len(option_ids)})
messages.success(request, 'Optionen wurden gespeichert.')
elif action == 'save_field_texts':
field_ids = request.POST.getlist('field_ids')
for raw_id in field_ids:
cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first()
if not cfg:
continue
cfg.label_override = (request.POST.get(f'label_override_{cfg.id}') or '').strip()
cfg.label_override_en = (request.POST.get(f'label_override_en_{cfg.id}') or '').strip()
cfg.help_text_override = (request.POST.get(f'help_text_override_{cfg.id}') or '').strip()
cfg.help_text_override_en = (request.POST.get(f'help_text_override_en_{cfg.id}') or '').strip()
cfg.save(update_fields=['label_override', 'label_override_en', 'help_text_override', 'help_text_override_en'])
_audit(request, 'form_field_texts_saved', target_type='form_config', target_label=form_type, details={'count': len(field_ids)})
messages.success(request, 'Feldtexte wurden gespeichert.')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}")
default_names = list(DEFAULT_FIELD_ORDER.get(form_type, []))
existing_names = list(
OnboardingRequestForm.base_fields.keys()
if form_type == 'onboarding'
else OffboardingRequestForm.base_fields.keys()
)
for name in existing_names:
if name not in default_names:
default_names.append(name)
ensure_form_field_configs(form_type, default_names)
configs = list(
FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name')
)
labels = _form_field_labels(form_type)
locked = LOCKED_FIELD_RULES.get(form_type, set())
if form_type == 'onboarding':
columns = [
{
'key': key,
'title': ONBOARDING_PAGE_LABELS.get(key, key),
'items': [],
}
for key in ONBOARDING_PAGE_ORDER
]
column_by_key = {c['key']: c for c in columns}
fallback = 'abschluss'
for cfg in configs:
page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(cfg.field_name, fallback)
if page_key not in column_by_key:
page_key = fallback
column_by_key[page_key]['items'].append(
{
'field_name': cfg.field_name,
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name),
'label_en': cfg.label_override_en,
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
}
)
else:
columns = [
{
'key': 'all',
'title': 'Offboarding Felder',
'items': [
{
'field_name': cfg.field_name,
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name),
'label_en': cfg.label_override_en,
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
}
for cfg in configs
],
}
]
return render(
request,
'workflows/form_builder.html',
{
'form_type': form_type,
'columns': columns,
'form_types': [('onboarding', 'Onboarding'), ('offboarding', 'Offboarding')],
'option_categories': _translate_choice_list(FormOption.CATEGORY_CHOICES),
'selected_option_category': option_category,
'option_items': FormOption.objects.filter(category=option_category).order_by('sort_order', 'label'),
'field_text_items': configs,
},
)
@_require_capability('manage_builders')
def intro_builder_page(request):
if request.method == 'POST':
delete_id = (request.POST.get('delete_item_id') or '').strip()
if delete_id:
item = IntroChecklistItem.objects.filter(id=delete_id).first()
if item:
deleted_label = item.label
deleted_id_int = item.id
item.delete()
_audit(request, 'intro_checklist_item_deleted', target_type='intro_checklist_item', target_id=deleted_id_int, target_label=deleted_label)
messages.success(request, 'Checklistenpunkt wurde gelöscht.')
else:
messages.error(request, 'Checklistenpunkt nicht gefunden.')
return redirect('intro_builder_page')
action = (request.POST.get('builder_action') or '').strip()
if action == 'add_item':
section = (request.POST.get('section') or '').strip()
label = (request.POST.get('label') or '').strip()
label_en = (request.POST.get('label_en') or '').strip()
if section not in {k for k, _ in IntroChecklistItem.SECTION_CHOICES}:
messages.error(request, 'Ungültiger Abschnitt.')
return redirect('intro_builder_page')
if not label:
messages.error(request, 'Bitte eine Bezeichnung für den Checklistenpunkt angeben.')
return redirect('intro_builder_page')
next_sort = (
IntroChecklistItem.objects.filter(section=section).order_by('-sort_order').values_list('sort_order', flat=True).first()
)
IntroChecklistItem.objects.create(
section=section,
label=label,
label_en=label_en,
sort_order=(next_sort + 1) if next_sort is not None else 0,
is_active=True,
condition_operator='always',
)
_audit(request, 'intro_checklist_item_added', target_type='intro_checklist_item', target_label=label, details={'section': section, 'label_en': label_en})
messages.success(request, 'Checklistenpunkt wurde hinzugefügt.')
return redirect('intro_builder_page')
if action == 'save_items':
item_ids = request.POST.getlist('item_ids')
valid_sections = {k for k, _ in IntroChecklistItem.SECTION_CHOICES}
valid_ops = {k for k, _ in IntroChecklistItem.OPERATOR_CHOICES}
for pos, raw_id in enumerate(item_ids):
item = IntroChecklistItem.objects.filter(id=raw_id).first()
if not item:
continue
section = (request.POST.get(f'section_{item.id}') or item.section).strip()
if section not in valid_sections:
section = item.section
operator = (request.POST.get(f'operator_{item.id}') or item.condition_operator).strip()
if operator not in valid_ops:
operator = 'always'
item.section = section
item.label = (request.POST.get(f'label_{item.id}') or item.label).strip() or item.label
item.label_en = (request.POST.get(f'label_en_{item.id}') or '').strip()
item.is_active = request.POST.get(f'active_{item.id}') == 'on'
item.condition_field = (request.POST.get(f'field_{item.id}') or '').strip()
item.condition_operator = operator
item.condition_value = (request.POST.get(f'value_{item.id}') or '').strip()
item.sort_order = pos
item.save(
update_fields=[
'section',
'label',
'label_en',
'is_active',
'condition_field',
'condition_operator',
'condition_value',
'sort_order',
]
)
_audit(request, 'intro_checklist_saved', target_type='intro_checklist_item', details={'count': len(item_ids)})
messages.success(request, 'Einweisungs-Checkliste wurde gespeichert.')
return redirect('intro_builder_page')
condition_field_choices = [
('', 'Keine Bedingung'),
('needed_devices', 'Benötigte Geräte und Gegenstände'),
('needed_software', 'Benötigte Software'),
('needed_accesses', 'Benötigte Zugänge'),
('needed_workspace_groups', 'Benötigte Gruppen im Workspace'),
('needed_resources', 'Benötigte Ressourcen'),
('additional_hardware', 'Zusätzliche Hardware'),
('additional_software', 'Zusätzliche Software'),
('additional_access_text', 'Weitere Zugänge (Freitext)'),
('group_mailboxes_required', 'Gruppenpostfächer erforderlich'),
('order_business_cards', 'Visitenkarten bestellt'),
('phone_number', 'Direktwahl vorhanden'),
('successor_name', 'Nachfolge vorhanden'),
('department', 'Abteilung'),
]
items = list(IntroChecklistItem.objects.all().order_by('section', 'sort_order', 'label'))
return render(
request,
'workflows/intro_builder.html',
{
'items': items,
'section_choices': _translate_choice_list(IntroChecklistItem.SECTION_CHOICES),
'operator_choices': _translate_choice_list(IntroChecklistItem.OPERATOR_CHOICES),
'condition_field_choices': condition_field_choices,
},
)
@_require_capability('manage_integrations')
def integrations_setup_page(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
kind = (request.GET.get('kind') or 'nextcloud').strip().lower()
if kind not in {'nextcloud', 'mail', 'emails', 'rules', 'backup'}:
kind = 'nextcloud'
templates = list(NotificationTemplate.objects.all().order_by('key'))
system_email_config = (
SystemEmailConfig.objects.filter(is_active=True).order_by('-updated_at').first()
or SystemEmailConfig.objects.filter(name='Default SMTP').first()
)
return render(
request,
'workflows/integrations_setup.html',
{
'workflow_config': config,
'system_email_config': system_email_config,
'nextcloud_enabled': is_nextcloud_enabled(),
'email_test_mode': is_email_test_mode(),
'kind': kind,
'templates': templates,
'notification_rules': NotificationRule.objects.all().order_by('event_type', 'sort_order', 'id'),
'rule_event_choices': NotificationRule.EVENT_CHOICES,
'rule_operator_choices': NotificationRule.OPERATOR_CHOICES,
'template_choices': NotificationTemplate.TEMPLATE_CHOICES,
'remote_backup_target_choices': WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES,
},
)
@_require_capability('manage_welcome_emails')
def welcome_emails_page(request):
rows = ScheduledWelcomeEmail.objects.select_related('onboarding_request').order_by('-send_at', '-id')[:200]
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
welcome_template = NotificationTemplate.objects.filter(key='onboarding_welcome').first()
default_welcome = DEFAULT_NOTIFICATION_TEMPLATES.get('onboarding_welcome', {})
default_subject = (default_welcome.get('subject') or 'Willkommen bei TUB/CO, {{ FULL_NAME }}').strip()
default_body = (default_welcome.get('body') or 'Hallo {{ FULL_NAME }}, willkommen bei TUB/CO.').strip()
default_subject_en = (default_welcome.get('subject_en') or 'Welcome to TUB/CO, {{ FULL_NAME }}').strip()
default_body_en = (default_welcome.get('body_en') or 'Hello {{ FULL_NAME }}, welcome to TUB/CO.').strip()
subject_value = (welcome_template.subject_template if welcome_template else '').strip() or default_subject
body_value = (welcome_template.body_template if welcome_template else '').strip() or default_body
subject_value_en = (welcome_template.subject_template_en if welcome_template else '').strip() or default_subject_en
body_value_en = (welcome_template.body_template_en if welcome_template else '').strip() or default_body_en
return render(
request,
'workflows/welcome_emails.html',
{
'rows': rows,
'workflow_config': config,
'welcome_template': welcome_template,
'welcome_subject_value': subject_value,
'welcome_body_value': body_value,
'welcome_subject_value_en': subject_value_en,
'welcome_body_value_en': body_value_en,
'welcome_keywords': ['{{ FULL_NAME }}', '{{ VORNAME }}', '{{ NACHNAME }}', '{{ DEPARTMENT }}', '{{ CONTRACT_START }}', '{{ EMAIL }}', '{{ REQUESTED_BY }}'],
},
)
@_require_capability('manage_welcome_emails')
@require_POST
def trigger_welcome_email_now(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status == 'cancelled':
messages.error(request, f'Welcome E-Mail #{schedule_id} ist abgebrochen und kann nicht gesendet werden.')
return redirect('welcome_emails_page')
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
_audit(request, 'welcome_email_triggered_now', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde sofort angestoßen.')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def save_welcome_email_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
delay_days = int(request.POST.get('welcome_email_delay_days', config.welcome_email_delay_days or 5))
except ValueError:
messages.error(request, 'Ungültige Zahl bei der Welcome-Verzögerung.')
return redirect('welcome_emails_page')
config.welcome_email_delay_days = max(0, delay_days)
config.welcome_sender_email = request.POST.get('welcome_sender_email', '').strip()
config.welcome_include_pdf = request.POST.get('welcome_include_pdf') == 'on'
config.save(update_fields=['welcome_email_delay_days', 'welcome_sender_email', 'welcome_include_pdf'])
subject = request.POST.get('welcome_subject')
body = request.POST.get('welcome_body')
subject_en = request.POST.get('welcome_subject_en')
body_en = request.POST.get('welcome_body_en')
if subject is not None or body is not None or subject_en is not None or body_en is not None:
default_welcome = DEFAULT_NOTIFICATION_TEMPLATES.get('onboarding_welcome', {})
default_subject = (default_welcome.get('subject') or 'Willkommen bei TUB/CO, {{ FULL_NAME }}').strip()
default_body = (default_welcome.get('body') or 'Hallo {{ FULL_NAME }}, willkommen bei TUB/CO.').strip()
default_subject_en = (default_welcome.get('subject_en') or 'Welcome to TUB/CO, {{ FULL_NAME }}').strip()
default_body_en = (default_welcome.get('body_en') or 'Hello {{ FULL_NAME }}, welcome to TUB/CO.').strip()
subject_clean = (subject or '').strip() or default_subject
body_clean = (body or '').strip() or default_body
subject_clean_en = (subject_en or '').strip() or default_subject_en
body_clean_en = (body_en or '').strip() or default_body_en
template, _ = NotificationTemplate.objects.get_or_create(
key='onboarding_welcome',
defaults={
'subject_template': subject_clean,
'body_template': body_clean,
'subject_template_en': subject_clean_en,
'body_template_en': body_clean_en,
'is_active': True,
},
)
changes = []
if template.subject_template != subject_clean:
template.subject_template = subject_clean
changes.append('subject_template')
if template.body_template != body_clean:
template.body_template = body_clean
changes.append('body_template')
if template.subject_template_en != subject_clean_en:
template.subject_template_en = subject_clean_en
changes.append('subject_template_en')
if template.body_template_en != body_clean_en:
template.body_template_en = body_clean_en
changes.append('body_template_en')
if not template.is_active:
template.is_active = True
changes.append('is_active')
if changes:
template.save(update_fields=changes)
_audit(
request,
'welcome_email_settings_saved',
target_type='welcome_email_settings',
target_label='onboarding_welcome',
details={
'delay_days': config.welcome_email_delay_days,
'sender_email': config.welcome_sender_email,
'include_pdf': config.welcome_include_pdf,
},
)
messages.success(request, 'Welcome-E-Mail Einstellungen wurden gespeichert.')
return redirect('welcome_emails_page')
def _revoke_celery_task(task_id: str) -> None:
if not task_id:
return
try:
current_app.control.revoke(task_id, terminate=False)
except Exception:
return
def _parse_selected_schedule_ids(raw: str) -> list[int]:
if not raw:
return []
parsed: list[int] = []
seen: set[int] = set()
for token in raw.split(','):
token = token.strip()
if not token:
continue
try:
schedule_id = int(token)
except ValueError:
continue
if schedule_id in seen:
continue
seen.add(schedule_id)
parsed.append(schedule_id)
return parsed
@_require_capability('manage_welcome_emails')
@require_POST
def bulk_welcome_email_action(request):
action = (request.POST.get('bulk_action') or '').strip().lower()
selected_ids = _parse_selected_schedule_ids(request.POST.get('selected_ids', ''))
if action not in {'pause', 'send_now', 'delete'}:
messages.error(request, 'Ungültige Bulk-Aktion.')
return redirect('welcome_emails_page')
if not selected_ids:
messages.warning(request, 'Keine Welcome-Einträge ausgewählt.')
return redirect('welcome_emails_page')
rows = list(ScheduledWelcomeEmail.objects.filter(id__in=selected_ids).order_by('id'))
if not rows:
messages.warning(request, 'Keine passenden Welcome-Einträge gefunden.')
return redirect('welcome_emails_page')
success_count = 0
skipped_count = 0
for scheduled in rows:
if action == 'pause':
if scheduled.status in {'sent', 'cancelled'}:
skipped_count += 1
continue
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'paused'
scheduled.save(update_fields=['status', 'updated_at'])
success_count += 1
continue
if action == 'send_now':
if scheduled.status == 'cancelled':
skipped_count += 1
continue
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
success_count += 1
continue
if action == 'delete':
if scheduled.status == 'scheduled':
_revoke_celery_task(scheduled.celery_task_id)
scheduled.delete()
success_count += 1
action_label = {
'pause': 'pausiert',
'send_now': 'sofort angestoßen',
'delete': 'gelöscht',
}[action]
if success_count:
_audit(
request,
'welcome_email_bulk_action',
target_type='welcome_email',
target_label=action,
details={'selected_ids': selected_ids, 'success_count': success_count, 'skipped_count': skipped_count},
)
messages.success(request, f'{success_count} Welcome-Eintrag/Einträge {action_label}.')
if skipped_count:
messages.warning(request, f'{skipped_count} Eintrag/Einträge wurden übersprungen (Status nicht geeignet).')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def pause_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status in {'sent', 'cancelled'}:
messages.error(request, f'Welcome E-Mail #{schedule_id} kann nicht pausiert werden.')
return redirect('welcome_emails_page')
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'paused'
scheduled.save(update_fields=['status', 'updated_at'])
_audit(request, 'welcome_email_paused', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde pausiert.')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def resume_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status != 'paused':
messages.error(request, f'Welcome E-Mail #{schedule_id} ist nicht pausiert.')
return redirect('welcome_emails_page')
eta = scheduled.send_at if timezone.now() < scheduled.send_at else None
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=eta)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
_audit(request, 'welcome_email_resumed', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde fortgesetzt.')
return redirect('welcome_emails_page')
@_require_capability('manage_welcome_emails')
@require_POST
def cancel_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status == 'sent':
messages.error(request, f'Welcome E-Mail #{schedule_id} wurde bereits gesendet.')
return redirect('welcome_emails_page')
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'cancelled'
scheduled.last_error = ''
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
_audit(request, 'welcome_email_cancelled', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde abgebrochen.')
return redirect('welcome_emails_page')
@_require_capability('manage_builders')
@require_POST
def form_builder_save_order(request):
try:
payload = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
return JsonResponse({'ok': False, 'error': 'Ungültige JSON-Daten.'}, status=400)
form_type = payload.get('form_type')
if form_type not in DEFAULT_FIELD_ORDER:
return JsonResponse({'ok': False, 'error': 'Ungültiger Formulartyp.'}, status=400)
columns = payload.get('columns')
if not isinstance(columns, dict):
return JsonResponse({'ok': False, 'error': 'Spalten-Daten fehlen.'}, status=400)
configs = list(FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name'))
allowed_names = {cfg.field_name for cfg in configs}
seen = set()
ordered_names = []
if form_type == 'onboarding':
allowed_columns = ONBOARDING_PAGE_ORDER
else:
allowed_columns = ['all']
name_to_cfg = {cfg.field_name: cfg for cfg in configs}
sort_order = 0
for column_key in allowed_columns:
names = columns.get(column_key, [])
if not isinstance(names, list):
return JsonResponse({'ok': False, 'error': f'Ungültige Spalte: {column_key}'}, status=400)
for name in names:
if not isinstance(name, str):
continue
if name not in allowed_names or name in seen:
continue
seen.add(name)
ordered_names.append(name)
cfg = name_to_cfg[name]
cfg.sort_order = sort_order
sort_order += 1
if form_type == 'onboarding':
cfg.page_key = column_key
else:
cfg.page_key = ''
missing = [cfg.field_name for cfg in configs if cfg.field_name not in seen]
for name in missing:
cfg = name_to_cfg[name]
cfg.sort_order = sort_order
sort_order += 1
if form_type == 'onboarding':
cfg.page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(name, 'abschluss')
else:
cfg.page_key = ''
FormFieldConfig.objects.bulk_update(configs, ['sort_order', 'page_key'])
_audit(request, 'form_layout_saved', target_type='form_config', target_label=form_type, details={'saved_count': len(configs)})
return JsonResponse({'ok': True, 'saved_count': len(configs)})
@_require_capability('manage_integrations')
@require_POST
def send_test_email(request):
mode = 'TEST_MODE_ON' if is_email_test_mode() else 'TEST_MODE_OFF'
redirect_email = get_email_test_redirect()
send_system_email(
subject=f'SMTP test from onboarding/offboarding v2 ({mode})',
body=(
'This is a test email. If you see this, SMTP is configured correctly.\n'
f'EMAIL_TEST_MODE={is_email_test_mode()}\n'
f'EMAIL_TEST_REDIRECT={redirect_email}\n'
),
to=[settings.TEST_NOTIFICATION_EMAIL],
)
_audit(request, 'smtp_test_sent', target_type='system_email', target_label=settings.TEST_NOTIFICATION_EMAIL, details={'email_test_mode': is_email_test_mode()})
messages.success(request, f'SMTP-Testmail wurde gesendet ({mode}).')
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def nextcloud_test_upload(request):
filename = f"nextcloud_test_{timezone.now().strftime('%Y%m%d_%H%M%S')}.txt"
content = (
"Nextcloud test upload from onboarding/offboarding system.\n"
f"Time: {timezone.now().isoformat()}\n"
f"User: {request.user.username}\n"
)
temp_path = None
try:
with NamedTemporaryFile('w', suffix='.txt', delete=False, encoding='utf-8') as tf:
tf.write(content)
temp_path = Path(tf.name)
ok = upload_to_nextcloud(temp_path, filename)
if ok:
_audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'success'})
messages.success(request, f'Nextcloud-Testupload erfolgreich: {filename}')
else:
_audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'error'})
messages.error(request, 'Nextcloud-Testupload fehlgeschlagen. Bitte Konfiguration prüfen.')
except Exception as exc:
messages.error(request, f'Nextcloud-Testupload fehlgeschlagen: {exc}')
finally:
if temp_path and temp_path.exists():
temp_path.unlink(missing_ok=True)
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def toggle_nextcloud_enabled(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
currently_enabled = is_nextcloud_enabled()
config.nextcloud_enabled_override = not currently_enabled
config.save(update_fields=['nextcloud_enabled_override'])
_audit(request, 'nextcloud_mode_toggled', target_type='workflow_config', target_label='nextcloud', details={'enabled': config.nextcloud_enabled_override})
state = 'aktiviert' if config.nextcloud_enabled_override else 'deaktiviert'
messages.success(request, f'Nextcloud Upload wurde {state}.')
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def toggle_email_mode(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
currently_test_mode = is_email_test_mode()
config.email_test_mode_override = not currently_test_mode
config.save(update_fields=['email_test_mode_override'])
_audit(request, 'email_mode_toggled', target_type='workflow_config', target_label='email_mode', details={'test_mode': config.email_test_mode_override})
state = 'Testmodus (Umleitung)' if config.email_test_mode_override else 'Produktionsmodus'
messages.success(request, f'E-Mail-Modus wurde auf {state} gesetzt.')
return _redirect_back(request, 'home')
@_require_capability('manage_integrations')
@require_POST
def save_integrations_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
except ValueError:
messages.error(request, 'Ungültige Zahl bei Sync-Intervall oder SMTP Port.')
return redirect('home')
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
config.sync_interval_seconds = max(10, sync_interval)
config.imap_server = request.POST.get('imap_server', '').strip()
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
config.smtp_server = request.POST.get('smtp_server', '').strip()
config.smtp_port = max(1, smtp_port)
config.email_account = request.POST.get('email_account', '').strip()
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
if nextcloud_password:
config.nextcloud_password_override = nextcloud_password
email_password = request.POST.get('email_password', '').strip()
if email_password:
config.email_password = email_password
config.save()
_audit(request, 'integrations_saved', target_type='workflow_config', target_label='all_integrations')
messages.success(request, 'Integrations-Einstellungen wurden gespeichert.')
return redirect('home')
@_require_capability('manage_integrations')
@require_POST
def save_nextcloud_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
except ValueError:
messages.error(request, 'Ungültige Zahl beim Sync-Intervall.')
return redirect('home')
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
config.sync_interval_seconds = max(10, sync_interval)
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
if nextcloud_password:
config.nextcloud_password_override = nextcloud_password
config.save()
_audit(request, 'nextcloud_settings_saved', target_type='workflow_config', target_label='nextcloud')
messages.success(request, 'Nextcloud-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=nextcloud')
@_require_capability('manage_integrations')
@require_POST
def save_workflow_rules(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
handover_lead_days = int(
request.POST.get(
'device_handover_lead_days',
config.device_handover_lead_days or 5,
)
)
except ValueError:
messages.error(request, 'Ungültige Zahl beim Hardware-Vorlauf.')
return redirect('/admin-tools/integrations/?kind=rules')
config.device_handover_lead_days = max(0, handover_lead_days)
config.save(update_fields=['device_handover_lead_days'])
_audit(
request,
'workflow_rules_saved',
target_type='workflow_config',
target_label='workflow_rules',
details={
'device_handover_lead_days': config.device_handover_lead_days,
},
)
messages.success(request, 'Workflow-Regeln wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=rules')
@_require_capability('manage_integrations')
@require_POST
def save_backup_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
target_type = (request.POST.get('remote_backup_target_type') or config.remote_backup_target_type or 'nextcloud').strip().lower()
if target_type not in {choice for choice, _ in WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES}:
target_type = 'nextcloud'
remote_backup_enabled = request.POST.get('remote_backup_enabled') == 'on'
remote_backup_nextcloud_directory = request.POST.get('remote_backup_nextcloud_directory', '').strip()
primary_nextcloud_directory = (
(config.nextcloud_directory_override or '').strip()
or settings.NEXTCLOUD_DIRECTORY.strip()
).strip('/')
if remote_backup_enabled and target_type == 'nextcloud':
if not remote_backup_nextcloud_directory:
messages.error(request, 'Bitte ein separates Nextcloud Backup-Verzeichnis angeben.')
return redirect('/admin-tools/integrations/?kind=backup')
if remote_backup_nextcloud_directory.strip('/') == primary_nextcloud_directory:
messages.error(request, 'Das Backup-Verzeichnis muss vom normalen Nextcloud Dokumentenordner getrennt sein.')
return redirect('/admin-tools/integrations/?kind=backup')
config.remote_backup_enabled = remote_backup_enabled
config.remote_backup_target_type = target_type
config.remote_backup_nextcloud_directory = remote_backup_nextcloud_directory
config.remote_backup_s3_bucket = request.POST.get('remote_backup_s3_bucket', '').strip()
config.remote_backup_nfs_path = request.POST.get('remote_backup_nfs_path', '').strip()
config.save(
update_fields=[
'device_handover_lead_days',
'remote_backup_enabled',
'remote_backup_target_type',
'remote_backup_nextcloud_directory',
'remote_backup_s3_bucket',
'remote_backup_nfs_path',
]
)
_audit(
request,
'backup_settings_saved',
target_type='workflow_config',
target_label='backup_settings',
details={
'remote_backup_enabled': config.remote_backup_enabled,
'remote_backup_target_type': config.remote_backup_target_type,
'remote_backup_nextcloud_directory': config.remote_backup_nextcloud_directory,
},
)
messages.success(request, 'Backup-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=backup')
@_require_capability('manage_integrations')
@require_POST
def save_mail_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
except ValueError:
messages.error(request, 'Ungültige Zahl beim SMTP Port.')
return redirect('home')
config.imap_server = request.POST.get('imap_server', '').strip()
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
config.smtp_server = request.POST.get('smtp_server', '').strip()
config.smtp_port = max(1, smtp_port)
config.email_account = request.POST.get('email_account', '').strip()
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
email_password = request.POST.get('email_password', '').strip()
if email_password:
config.email_password = email_password
config.save()
smtp_cfg, _ = SystemEmailConfig.objects.get_or_create(name='Default SMTP')
SystemEmailConfig.objects.exclude(id=smtp_cfg.id).update(is_active=False)
smtp_cfg.is_active = True
smtp_cfg.host = config.smtp_server
smtp_cfg.port = config.smtp_port
smtp_cfg.username = config.email_account
if email_password:
smtp_cfg.password = email_password
smtp_cfg.use_ssl = config.smtp_use_ssl
smtp_cfg.use_tls = config.smtp_use_tls
smtp_cfg.from_email = request.POST.get('from_email', '').strip()
smtp_cfg.save()
_audit(request, 'mail_settings_saved', target_type='workflow_config', target_label='mail')
messages.success(request, 'Mail-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=mail')
@_require_capability('manage_integrations')
@require_POST
def save_email_routing_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
config.it_onboarding_email = request.POST.get('it_onboarding_email', '').strip()
config.general_info_email = request.POST.get('general_info_email', '').strip()
config.business_card_email = request.POST.get('business_card_email', '').strip()
config.hr_works_email = request.POST.get('hr_works_email', '').strip()
config.key_notification_email = request.POST.get('key_notification_email', '').strip()
config.save(
update_fields=[
'it_onboarding_email',
'general_info_email',
'business_card_email',
'hr_works_email',
'key_notification_email',
]
)
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
for key in known_keys:
subject = request.POST.get(f'subject_{key}')
body = request.POST.get(f'body_{key}')
subject_en = request.POST.get(f'subject_en_{key}')
body_en = request.POST.get(f'body_en_{key}')
if subject is None and body is None and subject_en is None and body_en is None:
continue
subject = (subject or '').strip()
body = (body or '').strip()
subject_en = (subject_en or '').strip()
body_en = (body_en or '').strip()
if not subject and not body and not subject_en and not body_en:
continue
obj, _ = NotificationTemplate.objects.get_or_create(
key=key,
defaults={
'subject_template': subject or f'[{key}]',
'body_template': body or '-',
'subject_template_en': subject_en,
'body_template_en': body_en,
'is_active': True,
},
)
changed = []
if subject and obj.subject_template != subject:
obj.subject_template = subject
changed.append('subject_template')
if body and obj.body_template != body:
obj.body_template = body
changed.append('body_template')
if obj.subject_template_en != subject_en:
obj.subject_template_en = subject_en
changed.append('subject_template_en')
if obj.body_template_en != body_en:
obj.body_template_en = body_en
changed.append('body_template_en')
if not obj.is_active:
obj.is_active = True
changed.append('is_active')
if changed:
obj.save(update_fields=changed)
_audit(request, 'email_routing_saved', target_type='workflow_config', target_label='email_routing')
messages.success(request, 'E-Mail Routing und Vorlagen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=emails')
@_require_capability('manage_integrations')
@require_POST
def save_notification_rules(request):
rule_ids = request.POST.getlist('rule_ids')
for position, raw_id in enumerate(rule_ids):
rule = NotificationRule.objects.filter(id=raw_id).first()
if not rule:
continue
if request.POST.get(f'delete_{rule.id}') == 'on':
rule.delete()
continue
rule.name = request.POST.get(f'name_{rule.id}', '').strip() or rule.name
event_type = request.POST.get(f'event_type_{rule.id}', '').strip()
if event_type in {'onboarding', 'offboarding'}:
rule.event_type = event_type
operator = request.POST.get(f'operator_{rule.id}', '').strip()
if operator in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
rule.operator = operator
rule.field_name = request.POST.get(f'field_name_{rule.id}', '').strip()
rule.expected_value = request.POST.get(f'expected_value_{rule.id}', '').strip()
rule.recipients = request.POST.get(f'recipients_{rule.id}', '').strip()
rule.template_key = request.POST.get(f'template_key_{rule.id}', '').strip()
rule.custom_subject = request.POST.get(f'custom_subject_{rule.id}', '').strip()
rule.custom_body = request.POST.get(f'custom_body_{rule.id}', '').strip()
rule.custom_subject_en = request.POST.get(f'custom_subject_en_{rule.id}', '').strip()
rule.custom_body_en = request.POST.get(f'custom_body_en_{rule.id}', '').strip()
rule.include_pdf_attachment = request.POST.get(f'include_pdf_{rule.id}') == 'on'
rule.is_active = request.POST.get(f'active_{rule.id}') == 'on'
rule.sort_order = position
rule.save()
new_name = request.POST.get('new_name', '').strip()
new_recipients = request.POST.get('new_recipients', '').strip()
if new_name and new_recipients:
new_event = request.POST.get('new_event_type', 'onboarding').strip()
if new_event not in {'onboarding', 'offboarding'}:
new_event = 'onboarding'
new_operator = request.POST.get('new_operator', 'always').strip()
if new_operator not in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
new_operator = 'always'
NotificationRule.objects.create(
name=new_name,
event_type=new_event,
field_name=request.POST.get('new_field_name', '').strip(),
operator=new_operator,
expected_value=request.POST.get('new_expected_value', '').strip(),
recipients=new_recipients,
template_key=request.POST.get('new_template_key', '').strip(),
custom_subject=request.POST.get('new_custom_subject', '').strip(),
custom_body=request.POST.get('new_custom_body', '').strip(),
custom_subject_en=request.POST.get('new_custom_subject_en', '').strip(),
custom_body_en=request.POST.get('new_custom_body_en', '').strip(),
include_pdf_attachment=request.POST.get('new_include_pdf') == 'on',
is_active=True,
sort_order=NotificationRule.objects.filter(event_type=new_event).count() + 1,
)
_audit(request, 'notification_rules_saved', target_type='notification_rule')
messages.success(request, 'Benachrichtigungsregeln wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=emails')
@_require_capability('delete_requests')
@require_POST
def delete_request_from_dashboard(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
target_label = _request_target_label(obj, kind)
obj.delete()
_audit(request, 'request_deleted', target_type=kind, target_id=request_id, target_label=target_label)
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.')
return redirect('requests_dashboard')
@_require_capability('retry_requests')
@require_POST
def retry_request_from_dashboard(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
obj.processing_status = 'submitted'
obj.last_error = ''
obj.save(update_fields=['processing_status', 'last_error'])
process_onboarding_request.delay(obj.id)
_audit(request, 'request_retried', target_type='onboarding', target_id=obj.id, target_label=_request_target_label(obj, 'onboarding'))
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
obj.processing_status = 'submitted'
obj.last_error = ''
obj.save(update_fields=['processing_status', 'last_error'])
process_offboarding_request.delay(obj.id)
_audit(request, 'request_retried', target_type='offboarding', target_id=obj.id, target_label=_request_target_label(obj, 'offboarding'))
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde erneut angestoßen.')
return redirect('requests_dashboard')