Files
workdock-platform/backend/workflows/views.py
2026-03-24 11:27:49 +01:00

1450 lines
57 KiB
Python

from pathlib import Path
from datetime import timedelta
from tempfile import NamedTemporaryFile
import json
from celery import current_app
from django.conf import settings
from django.db import connection
from django.db import IntegrityError
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect, render
from django.contrib import messages
from django.contrib.auth.decorators import login_required, user_passes_test
from django.http import JsonResponse
from django.views.decorators.http import require_POST
from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils import timezone
from django.utils.translation import gettext as _, gettext_lazy
from .forms import OffboardingRequestForm, OnboardingRequestForm
from .form_builder import (
DEFAULT_FIELD_ORDER,
LOCKED_FIELD_RULES,
ONBOARDING_DEFAULT_PAGE,
ONBOARDING_PAGE_LABELS,
ONBOARDING_PAGE_ORDER,
ensure_form_field_configs,
)
from .models import EmployeeProfile, FormFieldConfig, FormOption, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig
from .emailing import send_system_email
from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud
from .tasks import (
DEFAULT_NOTIFICATION_TEMPLATES,
_generate_onboarding_intro_pdf,
_generate_onboarding_intro_session_pdf,
build_intro_sections_for_request,
process_offboarding_request,
process_onboarding_request,
send_scheduled_welcome_email,
)
ONBOARDING_GROUPS = {
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
'employment-end-box': ['employment_end_date'],
'group-mailboxes-box': ['group_mailboxes'],
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
'extra-software-box': ['additional_software_multi', 'additional_software'],
'extra-access-box': ['additional_access_text'],
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
'phone-box': ['phone_number_choice'],
}
ONBOARDING_HIDDEN_BY_DEFAULT = {
'business-card-box',
'employment-end-box',
'group-mailboxes-box',
'extra-hardware-box',
'extra-software-box',
'extra-access-box',
'successor-box',
}
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
ONBOARDING_CHECKBOX_LISTS = {
'needed_devices_multi',
'additional_hardware_multi',
'needed_software_multi',
'additional_software_multi',
'needed_accesses_multi',
'needed_workspace_groups_multi',
'needed_resources_multi',
}
ONBOARDING_SECTION_ORDER = ['stammdaten', 'vertrag', 'itsetup', 'abschluss']
ONBOARDING_SECTION_META = {
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
}
def healthz(request):
db_ok = True
try:
with connection.cursor() as cursor:
cursor.execute('SELECT 1')
cursor.fetchone()
except Exception:
db_ok = False
status_code = 200 if db_ok else 503
return JsonResponse(
{
'status': 'ok' if db_ok else 'degraded',
'service': 'onoff_v2',
'db': 'ok' if db_ok else 'error',
'time': timezone.now().isoformat(),
},
status=status_code,
)
def _is_staff(user) -> bool:
return user.is_authenticated and user.is_staff
def _display_user_name(user) -> str:
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
username = (getattr(user, 'username', '') or '').strip()
if username:
return username
return (getattr(user, 'email', '') or '').strip()
def _form_field_labels(form_type: str) -> dict[str, str]:
if form_type == 'onboarding':
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
if form_type == 'offboarding':
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
return {}
def _build_onboarding_layout(form) -> list[dict]:
ordered_names = list(form.fields.keys())
group_by_field = {}
for group_id, group_fields in ONBOARDING_GROUPS.items():
for name in group_fields:
group_by_field[name] = group_id
rendered_groups = set()
consumed = set()
blocks = []
for field_name in ordered_names:
if field_name in consumed:
continue
group_id = group_by_field.get(field_name)
if group_id:
if group_id in rendered_groups:
continue
group_fields = [
form[name]
for name in ONBOARDING_GROUPS[group_id]
if name in form.fields
]
if not group_fields:
continue
blocks.append(
{
'kind': 'group',
'id': group_id,
'hidden_default': group_id in ONBOARDING_HIDDEN_BY_DEFAULT,
'fields': group_fields,
}
)
rendered_groups.add(group_id)
consumed.update([f.name for f in group_fields])
continue
blocks.append({'kind': 'field', 'field': form[field_name]})
consumed.add(field_name)
return blocks
def _section_for_block(block: dict, field_pages: dict[str, str]) -> str:
if block['kind'] == 'field':
return field_pages.get(block['field'].name, 'abschluss')
fields = block.get('fields') or []
if not fields:
return 'abschluss'
return field_pages.get(fields[0].name, 'abschluss')
def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str]) -> list[dict]:
grouped = {key: [] for key in ONBOARDING_SECTION_ORDER}
for block in blocks:
section_key = _section_for_block(block, field_pages)
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(block)
return [
{
'key': key,
'title': ONBOARDING_SECTION_META[key]['title'],
'subtitle': ONBOARDING_SECTION_META[key]['subtitle'],
'blocks': grouped[key],
}
for key in ONBOARDING_SECTION_ORDER
]
@login_required
def home(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
return render(
request,
'workflows/home.html',
{
'nextcloud_enabled': is_nextcloud_enabled(),
'email_test_mode': is_email_test_mode(),
'workflow_config': config,
},
)
@login_required
@user_passes_test(_is_staff)
def project_wiki_page(request):
return render(request, 'workflows/project_wiki.html')
@login_required
def requests_dashboard(request):
if request.method == 'POST':
if not request.user.is_staff:
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('requests_dashboard')
selected = request.POST.getlist('selected_requests')
single_delete = (request.POST.get('single_delete') or '').strip()
if single_delete:
selected = [single_delete]
if not selected:
messages.warning(request, _('Keine Einträge ausgewählt.'))
return redirect('requests_dashboard')
deleted_count = 0
invalid_count = 0
for token in selected:
try:
kind, raw_id = token.split(':', 1)
request_id = int(raw_id)
except (ValueError, TypeError):
invalid_count += 1
continue
model = None
if kind == 'onboarding':
model = OnboardingRequest
elif kind == 'offboarding':
model = OffboardingRequest
else:
invalid_count += 1
continue
obj = model.objects.filter(id=request_id).first()
if not obj:
continue
obj.delete()
deleted_count += 1
if deleted_count:
messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count})
if invalid_count:
messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count})
if not deleted_count and not invalid_count:
messages.info(request, _('Keine passenden Einträge gefunden.'))
return redirect('requests_dashboard')
search_query = request.GET.get('q', '').strip()
onboarding_qs = OnboardingRequest.objects.order_by('-created_at')
offboarding_qs = OffboardingRequest.objects.order_by('-created_at')
if search_query:
onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
onboarding_items = onboarding_qs[:50]
offboarding_items = offboarding_qs[:50]
rows = []
for obj in onboarding_items:
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
if intro_session and intro_session.exported_pdf_path:
intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}"
rows.append(
{
'id': obj.id,
'kind': 'Onboarding',
'kind_slug': 'onboarding',
'name': obj.full_name,
'work_email': obj.work_email,
'created_at': obj.created_at,
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None,
'intro_session': intro_session,
'status': 'PDF erstellt' if obj.generated_pdf_path else 'In Bearbeitung',
}
)
for obj in offboarding_items:
rows.append(
{
'id': obj.id,
'kind': 'Offboarding',
'kind_slug': 'offboarding',
'name': obj.full_name,
'work_email': obj.work_email,
'created_at': obj.created_at,
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
'intro_pdf_url': None,
'intro_session': None,
'status': 'PDF erstellt' if obj.generated_pdf_path else 'In Bearbeitung',
}
)
rows.sort(key=lambda x: x['created_at'], reverse=True)
today = timezone.localdate()
start_date = today - timedelta(days=13)
onboarding_daily = {}
offboarding_daily = {}
for i in range(14):
day = start_date + timedelta(days=i)
onboarding_daily[day] = 0
offboarding_daily[day] = 0
for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
onboarding_daily[timezone.localtime(dt).date()] += 1
for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
offboarding_daily[timezone.localtime(dt).date()] += 1
chart_points = []
max_total = 1
for i in range(14):
day = start_date + timedelta(days=i)
on_count = onboarding_daily[day]
off_count = offboarding_daily[day]
total = on_count + off_count
max_total = max(max_total, total)
chart_points.append(
{
'label': day.strftime('%d.%m'),
'onboarding': on_count,
'offboarding': off_count,
'total': total,
}
)
for point in chart_points:
point['height'] = max(8, int((point['total'] / max_total) * 84))
onboarding_total = onboarding_qs.count()
offboarding_total = offboarding_qs.count()
return render(
request,
'workflows/requests_dashboard.html',
{
'rows': rows[:60],
'search_query': search_query,
'onboarding_total': onboarding_total,
'offboarding_total': offboarding_total,
'combined_total': onboarding_total + offboarding_total,
'chart_points': chart_points,
},
)
@login_required
@ensure_csrf_cookie
def onboarding_create(request):
config = WorkflowConfig.objects.order_by('id').first()
legal_text = (
config.legal_text
if config and config.legal_text
else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.'
)
if request.method == 'POST':
form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email)
if form.is_valid():
obj = form.save()
obj.onboarded_by_name = _display_user_name(request.user)
obj.save(update_fields=['onboarded_by_name'])
process_onboarding_request.delay(obj.id)
return redirect(f"/onboarding/new/?saved=1&id={obj.id}")
else:
form = OnboardingRequestForm(requester_email=request.user.email)
onboarding_blocks = _build_onboarding_layout(form)
field_pages = getattr(form, '_field_page_keys', {})
onboarding_sections = _build_onboarding_sections(onboarding_blocks, field_pages)
return render(
request,
'workflows/onboarding_form.html',
{
'form': form,
'onboarding_blocks': onboarding_blocks,
'onboarding_sections': onboarding_sections,
'onboarding_inline_checks': ONBOARDING_INLINE_CHECKS,
'onboarding_checkbox_lists': ONBOARDING_CHECKBOX_LISTS,
'legal_text': legal_text,
'saved': request.GET.get('saved') == '1',
'saved_request_id': request.GET.get('id', ''),
},
)
@login_required
def onboarding_success(request, request_id: int):
obj = get_object_or_404(OnboardingRequest, id=request_id)
pdf_url = None
if obj.generated_pdf_path:
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
@login_required
@user_passes_test(_is_staff)
@require_POST
def generate_onboarding_intro_pdf(request, request_id: int):
obj = get_object_or_404(OnboardingRequest, id=request_id)
pdf_path = _generate_onboarding_intro_pdf(obj)
obj.intro_pdf_path = str(pdf_path)
obj.save(update_fields=['intro_pdf_path'])
messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.'))
return redirect('requests_dashboard')
@login_required
@user_passes_test(_is_staff)
@require_POST
def generate_onboarding_intro_session_pdf(request, request_id: int):
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
pdf_path = _generate_onboarding_intro_session_pdf(session, admin_signature_name=_display_user_name(request.user))
session.exported_pdf_path = str(pdf_path)
session.save(update_fields=['exported_pdf_path'])
messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.'))
return redirect('onboarding_intro_session_page', request_id=request_id)
@login_required
@user_passes_test(_is_staff)
def onboarding_intro_session_page(request, request_id: int):
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
sections = build_intro_sections_for_request(onboarding)
if request.method == 'POST':
checked_ids = set(request.POST.getlist('checked_items'))
checklist_state = {}
for section in sections:
for item in section['items']:
checklist_state[item['id']] = item['id'] in checked_ids
action = (request.POST.get('session_action') or 'save').strip()
session.checklist_state = checklist_state
session.notes = (request.POST.get('notes') or '').strip()
if action == 'reset':
session.checklist_state = {}
session.notes = ''
session.status = 'draft'
session.completed_at = None
session.completed_by_name = ''
session.exported_pdf_path = ''
session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path'])
messages.success(request, _('Einweisung wurde zurückgesetzt.'))
return redirect('onboarding_intro_session_page', request_id=request_id)
if action == 'complete':
session.status = 'completed'
session.completed_at = timezone.now()
session.completed_by_name = _display_user_name(request.user)
messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.'))
else:
session.status = 'draft'
session.completed_at = None
session.completed_by_name = ''
messages.success(request, _('Einweisung wurde als Entwurf gespeichert.'))
session.save()
return redirect('onboarding_intro_session_page', request_id=request_id)
checked_map = session.checklist_state or {}
checked_count = 0
total_count = 0
for section in sections:
for item in section['items']:
item['checked'] = bool(checked_map.get(item['id']))
total_count += 1
if item['checked']:
checked_count += 1
salutation = (onboarding.get_gender_display() or '').strip()
display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name
progress_percent = int((checked_count / total_count) * 100) if total_count else 0
return render(
request,
'workflows/onboarding_intro_session.html',
{
'onboarding': onboarding,
'session': session,
'display_name': display_name,
'sections': sections,
'checked_count': checked_count,
'total_count': total_count,
'progress_percent': progress_percent,
'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None,
},
)
@login_required
@ensure_csrf_cookie
def offboarding_create(request):
profile_id = request.GET.get('profile')
search_query = request.GET.get('q', '').strip()
selected_profile = None
if profile_id:
selected_profile = EmployeeProfile.objects.filter(id=profile_id).first()
search_results = []
if search_query:
search_results = list(
EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10]
) + list(
EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10]
)
# preserve order while removing duplicates
seen = set()
unique = []
for r in search_results:
if r.id not in seen:
unique.append(r)
seen.add(r.id)
search_results = unique[:10]
if request.method == 'POST':
form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile)
if form.is_valid():
obj = form.save(commit=False)
if selected_profile:
obj.employee_profile = selected_profile
requester_email = (request.user.email or '').strip().lower()
if requester_email and requester_email.endswith('@tub.co'):
obj.requested_by_email = requester_email
else:
obj.requested_by_email = settings.DEFAULT_FROM_EMAIL
obj.requested_by_name = _display_user_name(request.user)
obj.save()
process_offboarding_request.delay(obj.id)
return redirect(f"/offboarding/new/?saved=1&id={obj.id}")
else:
form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query})
return render(
request,
'workflows/offboarding_form.html',
{
'form': form,
'search_results': search_results,
'selected_profile': selected_profile,
'search_query': search_query,
'saved': request.GET.get('saved') == '1',
'saved_request_id': request.GET.get('id', ''),
},
)
@login_required
def offboarding_success(request, request_id: int):
obj = get_object_or_404(OffboardingRequest, id=request_id)
pdf_url = None
if obj.generated_pdf_path:
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
@login_required
@user_passes_test(_is_staff)
def form_builder_page(request):
form_type = request.GET.get('form_type', 'onboarding')
if form_type not in DEFAULT_FIELD_ORDER:
form_type = 'onboarding'
option_category = request.GET.get('option_category', 'department')
option_categories = [c[0] for c in FormOption.CATEGORY_CHOICES]
if option_category not in option_categories:
option_category = option_categories[0]
if request.method == 'POST':
delete_option_id = request.POST.get('delete_option_id', '').strip()
if delete_option_id:
option = FormOption.objects.filter(id=delete_option_id).first()
if not option:
messages.error(request, 'Option nicht gefunden.')
else:
option_category = option.category
option.delete()
messages.success(request, 'Option wurde gelöscht.')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}")
action = request.POST.get('builder_action', '')
if action == 'add_option':
category = request.POST.get('category', '').strip()
label = request.POST.get('label', '').strip()
value = request.POST.get('value', '').strip()
if category not in option_categories:
messages.error(request, 'Ungültige Kategorie.')
elif not label:
messages.error(request, 'Bitte einen Namen für die Option angeben.')
else:
next_sort = (
FormOption.objects.filter(category=category).order_by('-sort_order').values_list('sort_order', flat=True).first()
)
FormOption.objects.create(
category=category,
label=label,
value=value or label,
sort_order=(next_sort + 1) if next_sort is not None else 0,
is_active=True,
)
messages.success(request, 'Option wurde hinzugefügt.')
option_category = category
elif action == 'save_options':
option_ids = request.POST.getlist('option_ids')
for pos, raw_id in enumerate(option_ids):
option = FormOption.objects.filter(id=raw_id).first()
if not option:
continue
next_label = request.POST.get(f'label_{option.id}', '').strip() or option.label
option.label = next_label
option.value = request.POST.get(f'value_{option.id}', '').strip() or next_label
option.is_active = request.POST.get(f'active_{option.id}') == 'on'
option.sort_order = pos
try:
option.save(update_fields=['label', 'value', 'is_active', 'sort_order'])
except IntegrityError:
messages.error(request, f'Doppelte Bezeichnung in Kategorie: {next_label}')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option.category}")
option_category = option.category
messages.success(request, 'Optionen wurden gespeichert.')
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}")
default_names = list(DEFAULT_FIELD_ORDER.get(form_type, []))
existing_names = list(
OnboardingRequestForm.base_fields.keys()
if form_type == 'onboarding'
else OffboardingRequestForm.base_fields.keys()
)
for name in existing_names:
if name not in default_names:
default_names.append(name)
ensure_form_field_configs(form_type, default_names)
configs = list(
FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name')
)
labels = _form_field_labels(form_type)
locked = LOCKED_FIELD_RULES.get(form_type, set())
if form_type == 'onboarding':
columns = [
{
'key': key,
'title': ONBOARDING_PAGE_LABELS.get(key, key),
'items': [],
}
for key in ONBOARDING_PAGE_ORDER
]
column_by_key = {c['key']: c for c in columns}
fallback = 'abschluss'
for cfg in configs:
page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(cfg.field_name, fallback)
if page_key not in column_by_key:
page_key = fallback
column_by_key[page_key]['items'].append(
{
'field_name': cfg.field_name,
'label': labels.get(cfg.field_name, cfg.field_name),
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
}
)
else:
columns = [
{
'key': 'all',
'title': 'Offboarding Felder',
'items': [
{
'field_name': cfg.field_name,
'label': labels.get(cfg.field_name, cfg.field_name),
'is_visible': cfg.is_visible,
'is_required': cfg.is_required,
'locked': cfg.field_name in locked,
}
for cfg in configs
],
}
]
return render(
request,
'workflows/form_builder.html',
{
'form_type': form_type,
'columns': columns,
'form_types': [('onboarding', 'Onboarding'), ('offboarding', 'Offboarding')],
'option_categories': FormOption.CATEGORY_CHOICES,
'selected_option_category': option_category,
'option_items': FormOption.objects.filter(category=option_category).order_by('sort_order', 'label'),
},
)
@login_required
@user_passes_test(_is_staff)
def intro_builder_page(request):
if request.method == 'POST':
delete_id = (request.POST.get('delete_item_id') or '').strip()
if delete_id:
item = IntroChecklistItem.objects.filter(id=delete_id).first()
if item:
item.delete()
messages.success(request, 'Checklistenpunkt wurde gelöscht.')
else:
messages.error(request, 'Checklistenpunkt nicht gefunden.')
return redirect('intro_builder_page')
action = (request.POST.get('builder_action') or '').strip()
if action == 'add_item':
section = (request.POST.get('section') or '').strip()
label = (request.POST.get('label') or '').strip()
if section not in {k for k, _ in IntroChecklistItem.SECTION_CHOICES}:
messages.error(request, 'Ungültiger Abschnitt.')
return redirect('intro_builder_page')
if not label:
messages.error(request, 'Bitte eine Bezeichnung für den Checklistenpunkt angeben.')
return redirect('intro_builder_page')
next_sort = (
IntroChecklistItem.objects.filter(section=section).order_by('-sort_order').values_list('sort_order', flat=True).first()
)
IntroChecklistItem.objects.create(
section=section,
label=label,
sort_order=(next_sort + 1) if next_sort is not None else 0,
is_active=True,
condition_operator='always',
)
messages.success(request, 'Checklistenpunkt wurde hinzugefügt.')
return redirect('intro_builder_page')
if action == 'save_items':
item_ids = request.POST.getlist('item_ids')
valid_sections = {k for k, _ in IntroChecklistItem.SECTION_CHOICES}
valid_ops = {k for k, _ in IntroChecklistItem.OPERATOR_CHOICES}
for pos, raw_id in enumerate(item_ids):
item = IntroChecklistItem.objects.filter(id=raw_id).first()
if not item:
continue
section = (request.POST.get(f'section_{item.id}') or item.section).strip()
if section not in valid_sections:
section = item.section
operator = (request.POST.get(f'operator_{item.id}') or item.condition_operator).strip()
if operator not in valid_ops:
operator = 'always'
item.section = section
item.label = (request.POST.get(f'label_{item.id}') or item.label).strip() or item.label
item.is_active = request.POST.get(f'active_{item.id}') == 'on'
item.condition_field = (request.POST.get(f'field_{item.id}') or '').strip()
item.condition_operator = operator
item.condition_value = (request.POST.get(f'value_{item.id}') or '').strip()
item.sort_order = pos
item.save(
update_fields=[
'section',
'label',
'is_active',
'condition_field',
'condition_operator',
'condition_value',
'sort_order',
]
)
messages.success(request, 'Einweisungs-Checkliste wurde gespeichert.')
return redirect('intro_builder_page')
condition_field_choices = [
('', 'Keine Bedingung'),
('needed_devices', 'Benötigte Geräte und Gegenstände'),
('needed_software', 'Benötigte Software'),
('needed_accesses', 'Benötigte Zugänge'),
('needed_workspace_groups', 'Benötigte Gruppen im Workspace'),
('needed_resources', 'Benötigte Ressourcen'),
('additional_hardware', 'Zusätzliche Hardware'),
('additional_software', 'Zusätzliche Software'),
('additional_access_text', 'Weitere Zugänge (Freitext)'),
('group_mailboxes_required', 'Gruppenpostfächer erforderlich'),
('order_business_cards', 'Visitenkarten bestellt'),
('phone_number', 'Direktwahl vorhanden'),
('successor_name', 'Nachfolge vorhanden'),
('department', 'Abteilung'),
]
items = list(IntroChecklistItem.objects.all().order_by('section', 'sort_order', 'label'))
return render(
request,
'workflows/intro_builder.html',
{
'items': items,
'section_choices': IntroChecklistItem.SECTION_CHOICES,
'operator_choices': IntroChecklistItem.OPERATOR_CHOICES,
'condition_field_choices': condition_field_choices,
},
)
@login_required
@user_passes_test(_is_staff)
def integrations_setup_page(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
kind = (request.GET.get('kind') or 'nextcloud').strip().lower()
if kind not in {'nextcloud', 'mail', 'emails'}:
kind = 'nextcloud'
templates = list(NotificationTemplate.objects.all().order_by('key'))
return render(
request,
'workflows/integrations_setup.html',
{
'workflow_config': config,
'kind': kind,
'templates': templates,
'notification_rules': NotificationRule.objects.all().order_by('event_type', 'sort_order', 'id'),
'rule_event_choices': NotificationRule.EVENT_CHOICES,
'rule_operator_choices': NotificationRule.OPERATOR_CHOICES,
'template_choices': NotificationTemplate.TEMPLATE_CHOICES,
},
)
@login_required
@user_passes_test(_is_staff)
def welcome_emails_page(request):
rows = ScheduledWelcomeEmail.objects.select_related('onboarding_request').order_by('-send_at', '-id')[:200]
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
welcome_template = NotificationTemplate.objects.filter(key='onboarding_welcome').first()
default_welcome = DEFAULT_NOTIFICATION_TEMPLATES.get('onboarding_welcome', {})
default_subject = (default_welcome.get('subject') or 'Willkommen bei TUB/CO, {{ FULL_NAME }}').strip()
default_body = (default_welcome.get('body') or 'Hallo {{ FULL_NAME }}, willkommen bei TUB/CO.').strip()
subject_value = (welcome_template.subject_template if welcome_template else '').strip() or default_subject
body_value = (welcome_template.body_template if welcome_template else '').strip() or default_body
return render(
request,
'workflows/welcome_emails.html',
{
'rows': rows,
'workflow_config': config,
'welcome_template': welcome_template,
'welcome_subject_value': subject_value,
'welcome_body_value': body_value,
'welcome_keywords': ['{{ FULL_NAME }}', '{{ VORNAME }}', '{{ NACHNAME }}', '{{ DEPARTMENT }}', '{{ CONTRACT_START }}', '{{ EMAIL }}', '{{ REQUESTED_BY }}'],
},
)
@login_required
@user_passes_test(_is_staff)
@require_POST
def trigger_welcome_email_now(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status == 'cancelled':
messages.error(request, f'Welcome E-Mail #{schedule_id} ist abgebrochen und kann nicht gesendet werden.')
return redirect('welcome_emails_page')
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde sofort angestoßen.')
return redirect('welcome_emails_page')
@login_required
@user_passes_test(_is_staff)
@require_POST
def save_welcome_email_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
delay_days = int(request.POST.get('welcome_email_delay_days', config.welcome_email_delay_days or 5))
except ValueError:
messages.error(request, 'Ungültige Zahl bei der Welcome-Verzögerung.')
return redirect('welcome_emails_page')
config.welcome_email_delay_days = max(0, delay_days)
config.welcome_sender_email = request.POST.get('welcome_sender_email', '').strip()
config.welcome_include_pdf = request.POST.get('welcome_include_pdf') == 'on'
config.save(update_fields=['welcome_email_delay_days', 'welcome_sender_email', 'welcome_include_pdf'])
subject = request.POST.get('welcome_subject')
body = request.POST.get('welcome_body')
if subject is not None or body is not None:
default_welcome = DEFAULT_NOTIFICATION_TEMPLATES.get('onboarding_welcome', {})
default_subject = (default_welcome.get('subject') or 'Willkommen bei TUB/CO, {{ FULL_NAME }}').strip()
default_body = (default_welcome.get('body') or 'Hallo {{ FULL_NAME }}, willkommen bei TUB/CO.').strip()
subject_clean = (subject or '').strip() or default_subject
body_clean = (body or '').strip() or default_body
template, _ = NotificationTemplate.objects.get_or_create(
key='onboarding_welcome',
defaults={
'subject_template': subject_clean,
'body_template': body_clean,
'is_active': True,
},
)
changes = []
if template.subject_template != subject_clean:
template.subject_template = subject_clean
changes.append('subject_template')
if template.body_template != body_clean:
template.body_template = body_clean
changes.append('body_template')
if not template.is_active:
template.is_active = True
changes.append('is_active')
if changes:
template.save(update_fields=changes)
messages.success(request, 'Welcome-E-Mail Einstellungen wurden gespeichert.')
return redirect('welcome_emails_page')
def _revoke_celery_task(task_id: str) -> None:
if not task_id:
return
try:
current_app.control.revoke(task_id, terminate=False)
except Exception:
return
def _parse_selected_schedule_ids(raw: str) -> list[int]:
if not raw:
return []
parsed: list[int] = []
seen: set[int] = set()
for token in raw.split(','):
token = token.strip()
if not token:
continue
try:
schedule_id = int(token)
except ValueError:
continue
if schedule_id in seen:
continue
seen.add(schedule_id)
parsed.append(schedule_id)
return parsed
@login_required
@user_passes_test(_is_staff)
@require_POST
def bulk_welcome_email_action(request):
action = (request.POST.get('bulk_action') or '').strip().lower()
selected_ids = _parse_selected_schedule_ids(request.POST.get('selected_ids', ''))
if action not in {'pause', 'send_now', 'delete'}:
messages.error(request, 'Ungültige Bulk-Aktion.')
return redirect('welcome_emails_page')
if not selected_ids:
messages.warning(request, 'Keine Welcome-Einträge ausgewählt.')
return redirect('welcome_emails_page')
rows = list(ScheduledWelcomeEmail.objects.filter(id__in=selected_ids).order_by('id'))
if not rows:
messages.warning(request, 'Keine passenden Welcome-Einträge gefunden.')
return redirect('welcome_emails_page')
success_count = 0
skipped_count = 0
for scheduled in rows:
if action == 'pause':
if scheduled.status in {'sent', 'cancelled'}:
skipped_count += 1
continue
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'paused'
scheduled.save(update_fields=['status', 'updated_at'])
success_count += 1
continue
if action == 'send_now':
if scheduled.status == 'cancelled':
skipped_count += 1
continue
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
success_count += 1
continue
if action == 'delete':
if scheduled.status == 'scheduled':
_revoke_celery_task(scheduled.celery_task_id)
scheduled.delete()
success_count += 1
action_label = {
'pause': 'pausiert',
'send_now': 'sofort angestoßen',
'delete': 'gelöscht',
}[action]
if success_count:
messages.success(request, f'{success_count} Welcome-Eintrag/Einträge {action_label}.')
if skipped_count:
messages.warning(request, f'{skipped_count} Eintrag/Einträge wurden übersprungen (Status nicht geeignet).')
return redirect('welcome_emails_page')
@login_required
@user_passes_test(_is_staff)
@require_POST
def pause_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status in {'sent', 'cancelled'}:
messages.error(request, f'Welcome E-Mail #{schedule_id} kann nicht pausiert werden.')
return redirect('welcome_emails_page')
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'paused'
scheduled.save(update_fields=['status', 'updated_at'])
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde pausiert.')
return redirect('welcome_emails_page')
@login_required
@user_passes_test(_is_staff)
@require_POST
def resume_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status != 'paused':
messages.error(request, f'Welcome E-Mail #{schedule_id} ist nicht pausiert.')
return redirect('welcome_emails_page')
eta = scheduled.send_at if timezone.now() < scheduled.send_at else None
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=eta)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.status = 'scheduled'
scheduled.last_error = ''
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde fortgesetzt.')
return redirect('welcome_emails_page')
@login_required
@user_passes_test(_is_staff)
@require_POST
def cancel_welcome_email(request, schedule_id: int):
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
if not scheduled:
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
return redirect('welcome_emails_page')
if scheduled.status == 'sent':
messages.error(request, f'Welcome E-Mail #{schedule_id} wurde bereits gesendet.')
return redirect('welcome_emails_page')
_revoke_celery_task(scheduled.celery_task_id)
scheduled.status = 'cancelled'
scheduled.last_error = ''
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde abgebrochen.')
return redirect('welcome_emails_page')
@login_required
@user_passes_test(_is_staff)
@require_POST
def form_builder_save_order(request):
try:
payload = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
return JsonResponse({'ok': False, 'error': 'Ungültige JSON-Daten.'}, status=400)
form_type = payload.get('form_type')
if form_type not in DEFAULT_FIELD_ORDER:
return JsonResponse({'ok': False, 'error': 'Ungültiger Formulartyp.'}, status=400)
columns = payload.get('columns')
if not isinstance(columns, dict):
return JsonResponse({'ok': False, 'error': 'Spalten-Daten fehlen.'}, status=400)
configs = list(FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name'))
allowed_names = {cfg.field_name for cfg in configs}
seen = set()
ordered_names = []
if form_type == 'onboarding':
allowed_columns = ONBOARDING_PAGE_ORDER
else:
allowed_columns = ['all']
name_to_cfg = {cfg.field_name: cfg for cfg in configs}
sort_order = 0
for column_key in allowed_columns:
names = columns.get(column_key, [])
if not isinstance(names, list):
return JsonResponse({'ok': False, 'error': f'Ungültige Spalte: {column_key}'}, status=400)
for name in names:
if not isinstance(name, str):
continue
if name not in allowed_names or name in seen:
continue
seen.add(name)
ordered_names.append(name)
cfg = name_to_cfg[name]
cfg.sort_order = sort_order
sort_order += 1
if form_type == 'onboarding':
cfg.page_key = column_key
else:
cfg.page_key = ''
missing = [cfg.field_name for cfg in configs if cfg.field_name not in seen]
for name in missing:
cfg = name_to_cfg[name]
cfg.sort_order = sort_order
sort_order += 1
if form_type == 'onboarding':
cfg.page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(name, 'abschluss')
else:
cfg.page_key = ''
FormFieldConfig.objects.bulk_update(configs, ['sort_order', 'page_key'])
return JsonResponse({'ok': True, 'saved_count': len(configs)})
@login_required
@user_passes_test(_is_staff)
@require_POST
def send_test_email(request):
mode = 'TEST_MODE_ON' if is_email_test_mode() else 'TEST_MODE_OFF'
redirect_email = get_email_test_redirect()
send_system_email(
subject=f'SMTP test from onboarding/offboarding v2 ({mode})',
body=(
'This is a test email. If you see this, SMTP is configured correctly.\n'
f'EMAIL_TEST_MODE={is_email_test_mode()}\n'
f'EMAIL_TEST_REDIRECT={redirect_email}\n'
),
to=[settings.TEST_NOTIFICATION_EMAIL],
)
messages.success(request, f'SMTP-Testmail wurde gesendet ({mode}).')
return redirect('home')
@login_required
@user_passes_test(_is_staff)
@require_POST
def nextcloud_test_upload(request):
filename = f"nextcloud_test_{timezone.now().strftime('%Y%m%d_%H%M%S')}.txt"
content = (
"Nextcloud test upload from onboarding/offboarding system.\n"
f"Time: {timezone.now().isoformat()}\n"
f"User: {request.user.username}\n"
)
temp_path = None
try:
with NamedTemporaryFile('w', suffix='.txt', delete=False, encoding='utf-8') as tf:
tf.write(content)
temp_path = Path(tf.name)
ok = upload_to_nextcloud(temp_path, filename)
if ok:
messages.success(request, f'Nextcloud-Testupload erfolgreich: {filename}')
else:
messages.error(request, 'Nextcloud-Testupload fehlgeschlagen. Bitte Konfiguration prüfen.')
except Exception as exc:
messages.error(request, f'Nextcloud-Testupload fehlgeschlagen: {exc}')
finally:
if temp_path and temp_path.exists():
temp_path.unlink(missing_ok=True)
return redirect('home')
@login_required
@user_passes_test(_is_staff)
@require_POST
def toggle_nextcloud_enabled(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
currently_enabled = is_nextcloud_enabled()
config.nextcloud_enabled_override = not currently_enabled
config.save(update_fields=['nextcloud_enabled_override'])
state = 'aktiviert' if config.nextcloud_enabled_override else 'deaktiviert'
messages.success(request, f'Nextcloud Upload wurde {state}.')
return redirect('home')
@login_required
@user_passes_test(_is_staff)
@require_POST
def toggle_email_mode(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
currently_test_mode = is_email_test_mode()
config.email_test_mode_override = not currently_test_mode
config.save(update_fields=['email_test_mode_override'])
state = 'Testmodus (Umleitung)' if config.email_test_mode_override else 'Produktionsmodus'
messages.success(request, f'E-Mail-Modus wurde auf {state} gesetzt.')
return redirect('home')
@login_required
@user_passes_test(_is_staff)
@require_POST
def save_integrations_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
except ValueError:
messages.error(request, 'Ungültige Zahl bei Sync-Intervall oder SMTP Port.')
return redirect('home')
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
config.sync_interval_seconds = max(10, sync_interval)
config.imap_server = request.POST.get('imap_server', '').strip()
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
config.smtp_server = request.POST.get('smtp_server', '').strip()
config.smtp_port = max(1, smtp_port)
config.email_account = request.POST.get('email_account', '').strip()
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
if nextcloud_password:
config.nextcloud_password_override = nextcloud_password
email_password = request.POST.get('email_password', '').strip()
if email_password:
config.email_password = email_password
config.save()
messages.success(request, 'Integrations-Einstellungen wurden gespeichert.')
return redirect('home')
@login_required
@user_passes_test(_is_staff)
@require_POST
def save_nextcloud_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
except ValueError:
messages.error(request, 'Ungültige Zahl beim Sync-Intervall.')
return redirect('home')
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
config.sync_interval_seconds = max(10, sync_interval)
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
if nextcloud_password:
config.nextcloud_password_override = nextcloud_password
config.save()
messages.success(request, 'Nextcloud-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=nextcloud')
@login_required
@user_passes_test(_is_staff)
@require_POST
def save_mail_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
try:
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
except ValueError:
messages.error(request, 'Ungültige Zahl beim SMTP Port.')
return redirect('home')
config.imap_server = request.POST.get('imap_server', '').strip()
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
config.smtp_server = request.POST.get('smtp_server', '').strip()
config.smtp_port = max(1, smtp_port)
config.email_account = request.POST.get('email_account', '').strip()
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
email_password = request.POST.get('email_password', '').strip()
if email_password:
config.email_password = email_password
config.save()
messages.success(request, 'Mail-Einstellungen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=mail')
@login_required
@user_passes_test(_is_staff)
@require_POST
def save_email_routing_settings(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
config.it_onboarding_email = request.POST.get('it_onboarding_email', '').strip()
config.general_info_email = request.POST.get('general_info_email', '').strip()
config.business_card_email = request.POST.get('business_card_email', '').strip()
config.hr_works_email = request.POST.get('hr_works_email', '').strip()
config.key_notification_email = request.POST.get('key_notification_email', '').strip()
config.save(
update_fields=[
'it_onboarding_email',
'general_info_email',
'business_card_email',
'hr_works_email',
'key_notification_email',
]
)
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
for key in known_keys:
subject = request.POST.get(f'subject_{key}')
body = request.POST.get(f'body_{key}')
if subject is None and body is None:
continue
subject = (subject or '').strip()
body = (body or '').strip()
if not subject and not body:
continue
obj, _ = NotificationTemplate.objects.get_or_create(
key=key,
defaults={
'subject_template': subject or f'[{key}]',
'body_template': body or '-',
'is_active': True,
},
)
changed = []
if subject and obj.subject_template != subject:
obj.subject_template = subject
changed.append('subject_template')
if body and obj.body_template != body:
obj.body_template = body
changed.append('body_template')
if changed:
obj.save(update_fields=changed)
messages.success(request, 'E-Mail Routing und Vorlagen wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=emails')
@login_required
@user_passes_test(_is_staff)
@require_POST
def save_notification_rules(request):
rule_ids = request.POST.getlist('rule_ids')
for position, raw_id in enumerate(rule_ids):
rule = NotificationRule.objects.filter(id=raw_id).first()
if not rule:
continue
if request.POST.get(f'delete_{rule.id}') == 'on':
rule.delete()
continue
rule.name = request.POST.get(f'name_{rule.id}', '').strip() or rule.name
event_type = request.POST.get(f'event_type_{rule.id}', '').strip()
if event_type in {'onboarding', 'offboarding'}:
rule.event_type = event_type
operator = request.POST.get(f'operator_{rule.id}', '').strip()
if operator in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
rule.operator = operator
rule.field_name = request.POST.get(f'field_name_{rule.id}', '').strip()
rule.expected_value = request.POST.get(f'expected_value_{rule.id}', '').strip()
rule.recipients = request.POST.get(f'recipients_{rule.id}', '').strip()
rule.template_key = request.POST.get(f'template_key_{rule.id}', '').strip()
rule.custom_subject = request.POST.get(f'custom_subject_{rule.id}', '').strip()
rule.custom_body = request.POST.get(f'custom_body_{rule.id}', '').strip()
rule.include_pdf_attachment = request.POST.get(f'include_pdf_{rule.id}') == 'on'
rule.is_active = request.POST.get(f'active_{rule.id}') == 'on'
rule.sort_order = position
rule.save()
new_name = request.POST.get('new_name', '').strip()
new_recipients = request.POST.get('new_recipients', '').strip()
if new_name and new_recipients:
new_event = request.POST.get('new_event_type', 'onboarding').strip()
if new_event not in {'onboarding', 'offboarding'}:
new_event = 'onboarding'
new_operator = request.POST.get('new_operator', 'always').strip()
if new_operator not in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
new_operator = 'always'
NotificationRule.objects.create(
name=new_name,
event_type=new_event,
field_name=request.POST.get('new_field_name', '').strip(),
operator=new_operator,
expected_value=request.POST.get('new_expected_value', '').strip(),
recipients=new_recipients,
template_key=request.POST.get('new_template_key', '').strip(),
custom_subject=request.POST.get('new_custom_subject', '').strip(),
custom_body=request.POST.get('new_custom_body', '').strip(),
include_pdf_attachment=request.POST.get('new_include_pdf') == 'on',
is_active=True,
sort_order=NotificationRule.objects.filter(event_type=new_event).count() + 1,
)
messages.success(request, 'Benachrichtigungsregeln wurden gespeichert.')
return redirect('/admin-tools/integrations/?kind=emails')
@login_required
@user_passes_test(_is_staff)
@require_POST
def delete_request_from_dashboard(request, kind: str, request_id: int):
if kind == 'onboarding':
obj = get_object_or_404(OnboardingRequest, id=request_id)
elif kind == 'offboarding':
obj = get_object_or_404(OffboardingRequest, id=request_id)
else:
messages.error(request, f'Unbekannter Typ: {kind}')
return redirect('requests_dashboard')
obj.delete()
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.')
return redirect('requests_dashboard')