2355 lines
98 KiB
Python
2355 lines
98 KiB
Python
from pathlib import Path
|
|
from datetime import timedelta
|
|
from tempfile import NamedTemporaryFile
|
|
import json
|
|
from functools import wraps
|
|
|
|
from celery import current_app
|
|
from django.conf import settings
|
|
from django.db import connection
|
|
from django.db import IntegrityError
|
|
from django.db.models import Q
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.contrib import messages
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.auth.tokens import default_token_generator
|
|
from django.http import JsonResponse
|
|
from django.views.decorators.http import require_POST
|
|
from django.views.decorators.csrf import ensure_csrf_cookie
|
|
from django.utils import timezone
|
|
from django.utils.encoding import force_bytes
|
|
from django.utils.http import urlsafe_base64_encode
|
|
from django.utils.translation import gettext as _, gettext_lazy
|
|
from django.utils.translation import get_language, override
|
|
from django.urls import reverse
|
|
|
|
from .backup_ops import create_backup_bundle, delete_backup_bundle, list_backup_bundles, verify_backup_bundle
|
|
from .forms import OffboardingRequestForm, OnboardingRequestForm, UserManagementCreateForm
|
|
from .form_builder import (
|
|
DEFAULT_FIELD_ORDER,
|
|
LOCKED_FIELD_RULES,
|
|
ONBOARDING_DEFAULT_PAGE,
|
|
ONBOARDING_PAGE_LABELS,
|
|
ONBOARDING_PAGE_ORDER,
|
|
ensure_form_field_configs,
|
|
)
|
|
from .models import AdminAuditLog, EmployeeProfile, FormFieldConfig, FormOption, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, SystemEmailConfig, WorkflowConfig
|
|
from .emailing import send_system_email
|
|
from .roles import ROLE_GROUP_NAMES, ROLE_LABELS, ROLE_SUPER_ADMIN, assign_user_role, get_user_role_key, get_user_role_label, user_has_capability
|
|
from .services import get_email_test_redirect, is_email_test_mode, is_nextcloud_enabled, upload_to_nextcloud
|
|
from .tasks import (
|
|
DEFAULT_NOTIFICATION_TEMPLATES,
|
|
_generate_onboarding_intro_pdf,
|
|
_generate_onboarding_intro_session_pdf,
|
|
build_intro_sections_for_request,
|
|
process_offboarding_request,
|
|
process_onboarding_request,
|
|
send_scheduled_welcome_email,
|
|
)
|
|
|
|
|
|
def _redirect_back(request, fallback: str):
|
|
target = (request.POST.get('next') or request.GET.get('next') or '').strip()
|
|
if target.startswith('/'):
|
|
return redirect(target)
|
|
referer = (request.META.get('HTTP_REFERER') or '').strip()
|
|
if referer.startswith('http://127.0.0.1') or referer.startswith('http://localhost') or referer.startswith('/'):
|
|
return redirect(referer)
|
|
return redirect(fallback)
|
|
|
|
ONBOARDING_GROUPS = {
|
|
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
|
|
'employment-end-box': ['employment_end_date'],
|
|
'group-mailboxes-box': ['group_mailboxes'],
|
|
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
|
|
'extra-software-box': ['additional_software_multi', 'additional_software'],
|
|
'extra-access-box': ['additional_access_text'],
|
|
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
|
|
'phone-box': ['phone_number_choice'],
|
|
}
|
|
|
|
ONBOARDING_HIDDEN_BY_DEFAULT = {
|
|
'business-card-box',
|
|
'employment-end-box',
|
|
'group-mailboxes-box',
|
|
'extra-hardware-box',
|
|
'extra-software-box',
|
|
'extra-access-box',
|
|
'successor-box',
|
|
}
|
|
|
|
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
|
|
ONBOARDING_CHECKBOX_LISTS = {
|
|
'needed_devices_multi',
|
|
'additional_hardware_multi',
|
|
'needed_software_multi',
|
|
'additional_software_multi',
|
|
'needed_accesses_multi',
|
|
'needed_workspace_groups_multi',
|
|
'needed_resources_multi',
|
|
}
|
|
ONBOARDING_SECTION_ORDER = ['stammdaten', 'vertrag', 'itsetup', 'abschluss']
|
|
ONBOARDING_SECTION_META = {
|
|
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
|
|
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
|
|
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
|
|
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
|
|
}
|
|
|
|
|
|
def healthz(request):
|
|
db_ok = True
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute('SELECT 1')
|
|
cursor.fetchone()
|
|
except Exception:
|
|
db_ok = False
|
|
|
|
status_code = 200 if db_ok else 503
|
|
return JsonResponse(
|
|
{
|
|
'status': 'ok' if db_ok else 'degraded',
|
|
'service': 'onoff_v2',
|
|
'db': 'ok' if db_ok else 'error',
|
|
'time': timezone.now().isoformat(),
|
|
},
|
|
status=status_code,
|
|
)
|
|
|
|
|
|
def _require_capability(capability: str):
|
|
def decorator(view_func):
|
|
@wraps(view_func)
|
|
@login_required
|
|
def wrapped(request, *args, **kwargs):
|
|
if not user_has_capability(request.user, capability):
|
|
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
|
|
return redirect('home')
|
|
return view_func(request, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return decorator
|
|
|
|
|
|
def _display_user_name(user) -> str:
|
|
first_name = (getattr(user, 'first_name', '') or '').strip()
|
|
last_name = (getattr(user, 'last_name', '') or '').strip()
|
|
full_name = f'{first_name} {last_name}'.strip()
|
|
if full_name:
|
|
return full_name
|
|
username = (getattr(user, 'username', '') or '').strip()
|
|
if username:
|
|
return username
|
|
return (getattr(user, 'email', '') or '').strip()
|
|
|
|
|
|
def _audit(
|
|
request,
|
|
action: str,
|
|
*,
|
|
target_type: str = '',
|
|
target_id: int | None = None,
|
|
target_label: str = '',
|
|
details: dict | None = None,
|
|
) -> None:
|
|
if not getattr(request, 'user', None) or not request.user.is_authenticated:
|
|
return
|
|
AdminAuditLog.objects.create(
|
|
actor=request.user,
|
|
actor_display=_display_user_name(request.user),
|
|
action=action,
|
|
target_type=target_type,
|
|
target_id=target_id,
|
|
target_label=target_label,
|
|
details=details or {},
|
|
)
|
|
|
|
|
|
def _form_field_labels(form_type: str) -> dict[str, str]:
|
|
if form_type == 'onboarding':
|
|
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
|
|
if form_type == 'offboarding':
|
|
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
|
|
return {}
|
|
|
|
|
|
def _request_target_label(obj, kind: str | None = None) -> str:
|
|
request_kind = (kind or '').strip()
|
|
if not request_kind:
|
|
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
|
|
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
|
|
email = (getattr(obj, 'work_email', '') or '').strip()
|
|
created_at = getattr(obj, 'created_at', None)
|
|
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
|
|
parts = [request_kind.capitalize(), name]
|
|
if email:
|
|
parts.append(f'<{email}>')
|
|
if date_label:
|
|
parts.append(date_label)
|
|
return ' | '.join(parts)
|
|
|
|
|
|
def _request_status_label(status_key: str, language_code: str | None = None) -> str:
|
|
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
|
|
with override(lang):
|
|
labels = {
|
|
'submitted': _('Eingereicht'),
|
|
'processing': _('In Bearbeitung'),
|
|
'completed': _('Abgeschlossen'),
|
|
'failed': _('Fehlgeschlagen'),
|
|
}
|
|
return labels.get(status_key, status_key)
|
|
|
|
|
|
def _audit_action_label(action: str) -> str:
|
|
labels = {
|
|
'requests_deleted': _('Vorgänge gelöscht'),
|
|
'request_deleted': _('Vorgang gelöscht'),
|
|
'request_retried': _('Vorgang erneut angestoßen'),
|
|
'intro_pdf_generated': _('Einweisungs-PDF erzeugt'),
|
|
'intro_live_pdf_generated': _('Live-Protokoll erzeugt'),
|
|
'intro_session_reset': _('Einweisung zurückgesetzt'),
|
|
'intro_session_saved': _('Einweisung als Entwurf gespeichert'),
|
|
'intro_session_completed': _('Einweisung abgeschlossen'),
|
|
'form_option_deleted': _('Formularoption gelöscht'),
|
|
'form_options_saved': _('Formularoptionen gespeichert'),
|
|
'form_field_texts_saved': _('Feldtexte gespeichert'),
|
|
'form_layout_saved': _('Formularlayout gespeichert'),
|
|
'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'),
|
|
'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'),
|
|
'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'),
|
|
'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'),
|
|
'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'),
|
|
'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'),
|
|
'welcome_email_paused': _('Welcome E-Mail pausiert'),
|
|
'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'),
|
|
'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'),
|
|
'smtp_test_sent': _('SMTP-Test gesendet'),
|
|
'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'),
|
|
'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'),
|
|
'email_mode_toggled': _('E-Mail-Modus umgeschaltet'),
|
|
'integrations_saved': _('Integrationen gespeichert'),
|
|
'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'),
|
|
'mail_settings_saved': _('Mail-Einstellungen gespeichert'),
|
|
'email_routing_saved': _('E-Mail-Routing gespeichert'),
|
|
'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'),
|
|
'user_created': _('Benutzer erstellt'),
|
|
'user_updated': _('Benutzer aktualisiert'),
|
|
'user_password_reset_sent': _('Passwort-Reset-Link versendet'),
|
|
'user_deleted': _('Benutzer gelöscht'),
|
|
'backup_created': _('Backup erstellt'),
|
|
'backup_verified': _('Backup verifiziert'),
|
|
'backup_deleted': _('Backup gelöscht'),
|
|
'backup_settings_saved': _('Backup-Einstellungen gespeichert'),
|
|
}
|
|
return labels.get(action, action.replace('_', ' ').strip().capitalize())
|
|
|
|
|
|
def _translate_choice_list(choices):
|
|
return [(value, str(label)) for value, label in choices]
|
|
|
|
|
|
def _build_onboarding_layout(form) -> list[dict]:
|
|
ordered_names = list(form.fields.keys())
|
|
group_by_field = {}
|
|
for group_id, group_fields in ONBOARDING_GROUPS.items():
|
|
for name in group_fields:
|
|
group_by_field[name] = group_id
|
|
|
|
rendered_groups = set()
|
|
consumed = set()
|
|
blocks = []
|
|
|
|
for field_name in ordered_names:
|
|
if field_name in consumed:
|
|
continue
|
|
|
|
group_id = group_by_field.get(field_name)
|
|
if group_id:
|
|
if group_id in rendered_groups:
|
|
continue
|
|
group_fields = [
|
|
form[name]
|
|
for name in ONBOARDING_GROUPS[group_id]
|
|
if name in form.fields
|
|
]
|
|
if not group_fields:
|
|
continue
|
|
blocks.append(
|
|
{
|
|
'kind': 'group',
|
|
'id': group_id,
|
|
'hidden_default': group_id in ONBOARDING_HIDDEN_BY_DEFAULT,
|
|
'fields': group_fields,
|
|
}
|
|
)
|
|
rendered_groups.add(group_id)
|
|
consumed.update([f.name for f in group_fields])
|
|
continue
|
|
|
|
blocks.append({'kind': 'field', 'field': form[field_name]})
|
|
consumed.add(field_name)
|
|
|
|
return blocks
|
|
|
|
|
|
def _section_for_block(block: dict, field_pages: dict[str, str]) -> str:
|
|
if block['kind'] == 'field':
|
|
return field_pages.get(block['field'].name, 'abschluss')
|
|
fields = block.get('fields') or []
|
|
if not fields:
|
|
return 'abschluss'
|
|
return field_pages.get(fields[0].name, 'abschluss')
|
|
|
|
|
|
def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str]) -> list[dict]:
|
|
grouped = {key: [] for key in ONBOARDING_SECTION_ORDER}
|
|
for block in blocks:
|
|
section_key = _section_for_block(block, field_pages)
|
|
if section_key not in grouped:
|
|
section_key = 'abschluss'
|
|
grouped[section_key].append(block)
|
|
return [
|
|
{
|
|
'key': key,
|
|
'title': ONBOARDING_SECTION_META[key]['title'],
|
|
'subtitle': ONBOARDING_SECTION_META[key]['subtitle'],
|
|
'blocks': grouped[key],
|
|
}
|
|
for key in ONBOARDING_SECTION_ORDER
|
|
]
|
|
|
|
|
|
@login_required
|
|
def home(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
return render(
|
|
request,
|
|
'workflows/home.html',
|
|
{
|
|
'nextcloud_enabled': is_nextcloud_enabled(),
|
|
'email_test_mode': is_email_test_mode(),
|
|
'workflow_config': config,
|
|
'role_label': get_user_role_label(request.user),
|
|
},
|
|
)
|
|
|
|
|
|
def _user_management_rows():
|
|
user_model = get_user_model()
|
|
role_order = {
|
|
ROLE_SUPER_ADMIN: 0,
|
|
'admin': 1,
|
|
'it_staff': 2,
|
|
'staff': 3,
|
|
}
|
|
rows = []
|
|
for user in user_model.objects.all().order_by('-is_active', 'username'):
|
|
role_key = get_user_role_key(user)
|
|
rows.append(
|
|
{
|
|
'user': user,
|
|
'role_key': role_key,
|
|
'role_label': str(ROLE_LABELS[role_key]),
|
|
'role_sort': role_order.get(role_key, 99),
|
|
'display_name': _display_user_name(user),
|
|
}
|
|
)
|
|
rows.sort(key=lambda item: (not item['user'].is_active, item['role_sort'], item['user'].username.lower()))
|
|
return rows
|
|
|
|
|
|
def _render_user_management(request, create_form=None, status_code: int = 200):
|
|
return render(
|
|
request,
|
|
'workflows/user_management.html',
|
|
{
|
|
'create_form': create_form or UserManagementCreateForm(),
|
|
'rows': _user_management_rows(),
|
|
'role_choices': [(key, str(ROLE_LABELS[key])) for key in ROLE_GROUP_NAMES],
|
|
},
|
|
status=status_code,
|
|
)
|
|
|
|
|
|
def _super_admin_user_count() -> int:
|
|
user_model = get_user_model()
|
|
return sum(1 for user in user_model.objects.all() if get_user_role_key(user) == ROLE_SUPER_ADMIN and user.is_active)
|
|
|
|
|
|
def _would_remove_last_super_admin(user, new_role_key: str | None = None, new_is_active: bool | None = None, deleting: bool = False) -> bool:
|
|
if get_user_role_key(user) != ROLE_SUPER_ADMIN or not user.is_active:
|
|
return False
|
|
if _super_admin_user_count() > 1:
|
|
return False
|
|
if deleting:
|
|
return True
|
|
if new_role_key is not None and new_role_key != ROLE_SUPER_ADMIN:
|
|
return True
|
|
if new_is_active is not None and not new_is_active:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _send_user_access_email(request, target_user, *, invitation: bool) -> None:
|
|
email = (target_user.email or '').strip()
|
|
if not email:
|
|
raise ValueError(_('Für diesen Benutzer ist keine E-Mail-Adresse hinterlegt.'))
|
|
|
|
uid = urlsafe_base64_encode(force_bytes(target_user.pk))
|
|
token = default_token_generator.make_token(target_user)
|
|
reset_path = reverse('password_reset_confirm', kwargs={'uidb64': uid, 'token': token})
|
|
reset_url = request.build_absolute_uri(reset_path)
|
|
|
|
if invitation:
|
|
subject = _('Zugangseinladung für %(username)s') % {'username': target_user.username}
|
|
body = _(
|
|
'Hallo %(name)s,\n\n'
|
|
'für Sie wurde ein Benutzerkonto im TUBCO Onboarding- und Offboarding-Portal angelegt.\n'
|
|
'Bitte öffnen Sie den folgenden Link, um Ihr Passwort zu setzen:\n'
|
|
'%(url)s\n\n'
|
|
'Wenn Sie diese Einladung nicht erwartet haben, melden Sie sich bitte bei Ihrem Administrator.'
|
|
) % {
|
|
'name': _display_user_name(target_user),
|
|
'url': reset_url,
|
|
}
|
|
else:
|
|
subject = _('Passwort zurücksetzen für %(username)s') % {'username': target_user.username}
|
|
body = _(
|
|
'Hallo %(name)s,\n\n'
|
|
'für Ihr Konto wurde ein Link zum Zurücksetzen des Passworts erstellt.\n'
|
|
'Bitte öffnen Sie den folgenden Link:\n'
|
|
'%(url)s\n\n'
|
|
'Wenn Sie diese Anfrage nicht erwartet haben, können Sie diese E-Mail ignorieren.'
|
|
) % {
|
|
'name': _display_user_name(target_user),
|
|
'url': reset_url,
|
|
}
|
|
|
|
send_system_email(subject=subject, body=body, to=[email])
|
|
|
|
|
|
@_require_capability('manage_users')
|
|
def user_management_page(request):
|
|
return _render_user_management(request)
|
|
|
|
|
|
@_require_capability('manage_users')
|
|
@require_POST
|
|
def create_user_from_admin(request):
|
|
form = UserManagementCreateForm(request.POST)
|
|
if not form.is_valid():
|
|
messages.error(request, _('Benutzer konnte nicht erstellt werden. Bitte prüfen Sie die Eingaben.'))
|
|
return _render_user_management(request, create_form=form, status_code=400)
|
|
|
|
user = form.save()
|
|
_send_user_access_email(request, user, invitation=True)
|
|
_audit(
|
|
request,
|
|
'user_created',
|
|
target_type='user',
|
|
target_id=user.id,
|
|
target_label=_display_user_name(user),
|
|
details={'username': user.username, 'role': get_user_role_key(user), 'invitation_sent': True},
|
|
)
|
|
messages.success(request, _('Benutzer wurde erstellt und eingeladen: %(username)s') % {'username': user.username})
|
|
return redirect('user_management_page')
|
|
|
|
|
|
@_require_capability('manage_users')
|
|
@require_POST
|
|
def update_user_from_admin(request, user_id: int):
|
|
user_model = get_user_model()
|
|
target_user = get_object_or_404(user_model, id=user_id)
|
|
role_key = (request.POST.get('role_key') or '').strip()
|
|
is_active = request.POST.get('is_active') == 'on'
|
|
new_password = (request.POST.get('new_password') or '').strip()
|
|
|
|
if role_key not in ROLE_GROUP_NAMES:
|
|
messages.error(request, _('Ungültige Rolle.'))
|
|
return redirect('user_management_page')
|
|
|
|
if target_user == request.user and (role_key != ROLE_SUPER_ADMIN or not is_active):
|
|
messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst sperren oder herabstufen.'))
|
|
return redirect('user_management_page')
|
|
if _would_remove_last_super_admin(target_user, new_role_key=role_key, new_is_active=is_active):
|
|
messages.error(request, _('Der letzte aktive Super Admin kann nicht deaktiviert oder herabgestuft werden.'))
|
|
return redirect('user_management_page')
|
|
|
|
assign_user_role(target_user, role_key)
|
|
target_user.is_active = is_active
|
|
if new_password:
|
|
target_user.set_password(new_password)
|
|
target_user.save()
|
|
|
|
_audit(
|
|
request,
|
|
'user_updated',
|
|
target_type='user',
|
|
target_id=target_user.id,
|
|
target_label=_display_user_name(target_user),
|
|
details={'username': target_user.username, 'role': role_key, 'is_active': is_active, 'password_changed': bool(new_password)},
|
|
)
|
|
messages.success(request, _('Benutzer wurde aktualisiert: %(username)s') % {'username': target_user.username})
|
|
return redirect('user_management_page')
|
|
|
|
|
|
@_require_capability('manage_users')
|
|
@require_POST
|
|
def send_password_reset_from_admin(request, user_id: int):
|
|
user_model = get_user_model()
|
|
target_user = get_object_or_404(user_model, id=user_id)
|
|
try:
|
|
_send_user_access_email(request, target_user, invitation=False)
|
|
except ValueError as exc:
|
|
messages.error(request, str(exc))
|
|
return redirect('user_management_page')
|
|
_audit(
|
|
request,
|
|
'user_password_reset_sent',
|
|
target_type='user',
|
|
target_id=target_user.id,
|
|
target_label=_display_user_name(target_user),
|
|
details={'username': target_user.username, 'email': target_user.email},
|
|
)
|
|
messages.success(request, _('Passwort-Reset-Link wurde versendet: %(username)s') % {'username': target_user.username})
|
|
return redirect('user_management_page')
|
|
|
|
|
|
@_require_capability('manage_users')
|
|
@require_POST
|
|
def delete_user_from_admin(request, user_id: int):
|
|
user_model = get_user_model()
|
|
target_user = get_object_or_404(user_model, id=user_id)
|
|
|
|
if target_user == request.user:
|
|
messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst löschen.'))
|
|
return redirect('user_management_page')
|
|
if _would_remove_last_super_admin(target_user, deleting=True):
|
|
messages.error(request, _('Der letzte aktive Super Admin kann nicht gelöscht werden.'))
|
|
return redirect('user_management_page')
|
|
|
|
target_label = _display_user_name(target_user)
|
|
username = target_user.username
|
|
target_user.delete()
|
|
_audit(
|
|
request,
|
|
'user_deleted',
|
|
target_type='user',
|
|
target_label=target_label,
|
|
details={'username': username},
|
|
)
|
|
messages.success(request, _('Benutzer wurde gelöscht: %(username)s') % {'username': username})
|
|
return redirect('user_management_page')
|
|
|
|
|
|
@_require_capability('view_docs')
|
|
def handbook_page(request):
|
|
return render(request, 'workflows/handbook.html')
|
|
|
|
|
|
@_require_capability('view_docs')
|
|
def project_wiki_page(request):
|
|
return render(request, 'workflows/project_wiki.html')
|
|
|
|
|
|
@_require_capability('view_docs')
|
|
def developer_handbook_page(request):
|
|
return render(request, 'workflows/developer_handbook.html')
|
|
|
|
|
|
@_require_capability('view_docs')
|
|
def release_checklist_page(request):
|
|
return render(request, 'workflows/release_checklist.html')
|
|
|
|
|
|
@_require_capability('view_audit_log')
|
|
def audit_log_page(request):
|
|
action = (request.GET.get('action') or '').strip()
|
|
user_query = (request.GET.get('user') or '').strip()
|
|
date_from = (request.GET.get('date_from') or '').strip()
|
|
date_to = (request.GET.get('date_to') or '').strip()
|
|
|
|
rows_qs = AdminAuditLog.objects.select_related('actor').all()
|
|
|
|
if action:
|
|
rows_qs = rows_qs.filter(action=action)
|
|
if user_query:
|
|
rows_qs = rows_qs.filter(
|
|
Q(actor_display__icontains=user_query)
|
|
| Q(actor__username__icontains=user_query)
|
|
| Q(actor__email__icontains=user_query)
|
|
)
|
|
if date_from:
|
|
rows_qs = rows_qs.filter(created_at__date__gte=date_from)
|
|
if date_to:
|
|
rows_qs = rows_qs.filter(created_at__date__lte=date_to)
|
|
|
|
rows = list(rows_qs[:300])
|
|
action_choices = (
|
|
AdminAuditLog.objects.order_by('action').values_list('action', flat=True).distinct()
|
|
)
|
|
return render(
|
|
request,
|
|
'workflows/audit_log.html',
|
|
{
|
|
'rows': rows,
|
|
'action_choices': action_choices,
|
|
'selected_action': action,
|
|
'user_query': user_query,
|
|
'date_from': date_from,
|
|
'date_to': date_to,
|
|
},
|
|
)
|
|
|
|
|
|
@_require_capability('manage_backups')
|
|
def backup_recovery_page(request):
|
|
return render(
|
|
request,
|
|
'workflows/backup_recovery.html',
|
|
{
|
|
'rows': list_backup_bundles(),
|
|
},
|
|
)
|
|
|
|
|
|
@_require_capability('manage_backups')
|
|
@require_POST
|
|
def create_backup_from_admin(request):
|
|
try:
|
|
result = create_backup_bundle()
|
|
_audit(
|
|
request,
|
|
'backup_created',
|
|
target_type='backup_bundle',
|
|
target_label=result['name'],
|
|
details={'path': result['path']},
|
|
)
|
|
messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']})
|
|
except Exception as exc:
|
|
messages.error(request, _('Backup konnte nicht erstellt werden: %(error)s') % {'error': exc})
|
|
return redirect('backup_recovery_page')
|
|
|
|
|
|
@_require_capability('manage_backups')
|
|
@require_POST
|
|
def verify_backup_from_admin(request, backup_name: str):
|
|
try:
|
|
result = verify_backup_bundle(backup_name)
|
|
_audit(
|
|
request,
|
|
'backup_verified',
|
|
target_type='backup_bundle',
|
|
target_label=backup_name,
|
|
details={'summary': result['summary']},
|
|
)
|
|
messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']})
|
|
except Exception as exc:
|
|
messages.error(request, _('Backup-Verifikation fehlgeschlagen: %(error)s') % {'error': exc})
|
|
return redirect('backup_recovery_page')
|
|
|
|
|
|
@_require_capability('manage_backups')
|
|
@require_POST
|
|
def delete_backup_from_admin(request, backup_name: str):
|
|
try:
|
|
result = delete_backup_bundle(backup_name)
|
|
_audit(
|
|
request,
|
|
'backup_deleted',
|
|
target_type='backup_bundle',
|
|
target_label=backup_name,
|
|
details={},
|
|
)
|
|
messages.success(request, _('Backup wurde gelöscht: %(name)s') % {'name': result['name']})
|
|
except Exception as exc:
|
|
messages.error(request, _('Backup konnte nicht gelöscht werden: %(error)s') % {'error': exc})
|
|
return redirect('backup_recovery_page')
|
|
|
|
|
|
@_require_capability('access_requests_dashboard')
|
|
def request_timeline_page(request, kind: str, request_id: int):
|
|
if kind == 'onboarding':
|
|
obj = get_object_or_404(OnboardingRequest, id=request_id)
|
|
elif kind == 'offboarding':
|
|
obj = get_object_or_404(OffboardingRequest, id=request_id)
|
|
else:
|
|
messages.error(request, f'Unbekannter Typ: {kind}')
|
|
return redirect('requests_dashboard')
|
|
|
|
request_label = _request_target_label(obj, kind)
|
|
audit_rows = list(
|
|
AdminAuditLog.objects.select_related('actor')
|
|
.filter(target_type__in=[kind, 'request'])
|
|
.filter(Q(target_id=request_id) | Q(target_label__icontains=(obj.full_name or '').strip()))
|
|
.order_by('-created_at', '-id')[:200]
|
|
)
|
|
|
|
timeline_rows = [
|
|
{
|
|
'created_at': obj.created_at,
|
|
'kind': 'system',
|
|
'title': _('Anfrage erstellt'),
|
|
'summary': request_label,
|
|
'meta': _('Status: %(status)s') % {'status': obj.get_processing_status_display()},
|
|
}
|
|
]
|
|
|
|
contract_start = getattr(obj, 'contract_start', None)
|
|
if contract_start:
|
|
timeline_rows.append(
|
|
{
|
|
'created_at': timezone.make_aware(timezone.datetime.combine(contract_start, timezone.datetime.min.time())),
|
|
'kind': 'milestone',
|
|
'title': _('Vertragsbeginn'),
|
|
'summary': str(contract_start),
|
|
'meta': _('Geplanter Start'),
|
|
}
|
|
)
|
|
|
|
handover_date = getattr(obj, 'handover_date', None)
|
|
if handover_date:
|
|
timeline_rows.append(
|
|
{
|
|
'created_at': timezone.make_aware(timezone.datetime.combine(handover_date, timezone.datetime.min.time())),
|
|
'kind': 'milestone',
|
|
'title': _('Geräteübergabe / Hardware-Abholung'),
|
|
'summary': str(handover_date),
|
|
'meta': _('Geplanter Hardware-Termin'),
|
|
}
|
|
)
|
|
|
|
if getattr(obj, 'generated_pdf_path', ''):
|
|
timeline_rows.append(
|
|
{
|
|
'created_at': obj.created_at,
|
|
'kind': 'document',
|
|
'title': _('PDF verfügbar'),
|
|
'summary': Path(obj.generated_pdf_path).name,
|
|
'meta': '',
|
|
'url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}",
|
|
}
|
|
)
|
|
|
|
for row in audit_rows:
|
|
timeline_rows.append(
|
|
{
|
|
'created_at': row.created_at,
|
|
'kind': 'audit',
|
|
'title': _audit_action_label(row.action),
|
|
'summary': row.target_label or row.target_type or '-',
|
|
'meta': row.actor_display or '-',
|
|
'details': row.details,
|
|
}
|
|
)
|
|
|
|
if kind == 'onboarding':
|
|
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
|
|
if intro_session:
|
|
timeline_rows.append(
|
|
{
|
|
'created_at': intro_session.updated_at,
|
|
'kind': 'session',
|
|
'title': _('Einweisungssitzung'),
|
|
'summary': intro_session.get_status_display(),
|
|
'meta': intro_session.completed_by_name or '-',
|
|
'url': (f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" if intro_session.exported_pdf_path else ''),
|
|
}
|
|
)
|
|
welcome_email = ScheduledWelcomeEmail.objects.filter(onboarding_request=obj).first()
|
|
if welcome_email:
|
|
timeline_rows.append(
|
|
{
|
|
'created_at': welcome_email.updated_at,
|
|
'kind': 'email',
|
|
'title': _('Welcome E-Mail'),
|
|
'summary': welcome_email.get_status_display(),
|
|
'meta': welcome_email.recipient_email,
|
|
}
|
|
)
|
|
|
|
timeline_rows.sort(key=lambda item: item['created_at'])
|
|
|
|
return render(
|
|
request,
|
|
'workflows/request_timeline.html',
|
|
{
|
|
'request_kind': kind,
|
|
'request_obj': obj,
|
|
'request_label': request_label,
|
|
'timeline_rows': timeline_rows,
|
|
'contract_start': getattr(obj, 'contract_start', None),
|
|
'handover_date': getattr(obj, 'handover_date', None),
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def requests_dashboard(request):
|
|
if not user_has_capability(request.user, 'access_requests_dashboard'):
|
|
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
|
|
return redirect('home')
|
|
|
|
if request.method == 'POST':
|
|
if not user_has_capability(request.user, 'delete_requests'):
|
|
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
|
|
return redirect('requests_dashboard')
|
|
|
|
selected = request.POST.getlist('selected_requests')
|
|
single_delete = (request.POST.get('single_delete') or '').strip()
|
|
if single_delete:
|
|
selected = [single_delete]
|
|
|
|
if not selected:
|
|
messages.warning(request, _('Keine Einträge ausgewählt.'))
|
|
return redirect('requests_dashboard')
|
|
|
|
deleted_count = 0
|
|
invalid_count = 0
|
|
deleted_labels = []
|
|
for token in selected:
|
|
try:
|
|
kind, raw_id = token.split(':', 1)
|
|
request_id = int(raw_id)
|
|
except (ValueError, TypeError):
|
|
invalid_count += 1
|
|
continue
|
|
|
|
model = None
|
|
if kind == 'onboarding':
|
|
model = OnboardingRequest
|
|
elif kind == 'offboarding':
|
|
model = OffboardingRequest
|
|
else:
|
|
invalid_count += 1
|
|
continue
|
|
|
|
obj = model.objects.filter(id=request_id).first()
|
|
if not obj:
|
|
continue
|
|
deleted_labels.append(_request_target_label(obj, kind))
|
|
obj.delete()
|
|
deleted_count += 1
|
|
|
|
if deleted_count:
|
|
_audit(
|
|
request,
|
|
'requests_deleted',
|
|
target_type='request',
|
|
target_label='Dashboard bulk/single delete',
|
|
details={
|
|
'deleted_count': deleted_count,
|
|
'invalid_count': invalid_count,
|
|
'selected': selected,
|
|
'request_labels': deleted_labels,
|
|
},
|
|
)
|
|
messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count})
|
|
if invalid_count:
|
|
messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count})
|
|
if not deleted_count and not invalid_count:
|
|
messages.info(request, _('Keine passenden Einträge gefunden.'))
|
|
return redirect('requests_dashboard')
|
|
|
|
search_query = request.GET.get('q', '').strip()
|
|
type_filter = (request.GET.get('type') or '').strip().lower()
|
|
status_filter = (request.GET.get('status') or '').strip().lower()
|
|
department_filter = (request.GET.get('department') or '').strip()
|
|
date_from = (request.GET.get('date_from') or '').strip()
|
|
date_to = (request.GET.get('date_to') or '').strip()
|
|
|
|
onboarding_qs = OnboardingRequest.objects.order_by('-created_at')
|
|
offboarding_qs = OffboardingRequest.objects.order_by('-created_at')
|
|
all_onboarding = OnboardingRequest.objects.all()
|
|
all_offboarding = OffboardingRequest.objects.all()
|
|
|
|
if search_query:
|
|
onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
|
|
offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query))
|
|
if status_filter in {'submitted', 'processing', 'completed', 'failed'}:
|
|
onboarding_qs = onboarding_qs.filter(processing_status=status_filter)
|
|
offboarding_qs = offboarding_qs.filter(processing_status=status_filter)
|
|
if department_filter:
|
|
onboarding_qs = onboarding_qs.filter(department=department_filter)
|
|
offboarding_qs = offboarding_qs.filter(department=department_filter)
|
|
if date_from:
|
|
onboarding_qs = onboarding_qs.filter(created_at__date__gte=date_from)
|
|
offboarding_qs = offboarding_qs.filter(created_at__date__gte=date_from)
|
|
if date_to:
|
|
onboarding_qs = onboarding_qs.filter(created_at__date__lte=date_to)
|
|
offboarding_qs = offboarding_qs.filter(created_at__date__lte=date_to)
|
|
|
|
if type_filter == 'onboarding':
|
|
offboarding_qs = offboarding_qs.none()
|
|
elif type_filter == 'offboarding':
|
|
onboarding_qs = onboarding_qs.none()
|
|
|
|
onboarding_items = onboarding_qs[:50]
|
|
offboarding_items = offboarding_qs[:50]
|
|
language_code = (
|
|
request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME)
|
|
or getattr(request, 'LANGUAGE_CODE', '')
|
|
or get_language()
|
|
or 'de'
|
|
).split('-')[0].lower()
|
|
|
|
rows = []
|
|
for obj in onboarding_items:
|
|
intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first()
|
|
if intro_session and intro_session.exported_pdf_path:
|
|
intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}"
|
|
rows.append(
|
|
{
|
|
'id': obj.id,
|
|
'kind': 'Onboarding',
|
|
'kind_slug': 'onboarding',
|
|
'name': obj.full_name,
|
|
'work_email': obj.work_email,
|
|
'created_at': obj.created_at,
|
|
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
|
|
'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None,
|
|
'intro_session': intro_session,
|
|
'status': _request_status_label(obj.processing_status, language_code),
|
|
'status_key': obj.processing_status,
|
|
'last_error': obj.last_error,
|
|
}
|
|
)
|
|
for obj in offboarding_items:
|
|
rows.append(
|
|
{
|
|
'id': obj.id,
|
|
'kind': 'Offboarding',
|
|
'kind_slug': 'offboarding',
|
|
'name': obj.full_name,
|
|
'work_email': obj.work_email,
|
|
'created_at': obj.created_at,
|
|
'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None,
|
|
'intro_pdf_url': None,
|
|
'intro_session': None,
|
|
'status': _request_status_label(obj.processing_status, language_code),
|
|
'status_key': obj.processing_status,
|
|
'last_error': obj.last_error,
|
|
}
|
|
)
|
|
|
|
rows.sort(key=lambda x: x['created_at'], reverse=True)
|
|
|
|
today = timezone.localdate()
|
|
start_date = today - timedelta(days=13)
|
|
onboarding_daily = {}
|
|
offboarding_daily = {}
|
|
for i in range(14):
|
|
day = start_date + timedelta(days=i)
|
|
onboarding_daily[day] = 0
|
|
offboarding_daily[day] = 0
|
|
|
|
for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
|
|
onboarding_daily[timezone.localtime(dt).date()] += 1
|
|
for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True):
|
|
offboarding_daily[timezone.localtime(dt).date()] += 1
|
|
|
|
chart_points = []
|
|
max_total = 1
|
|
for i in range(14):
|
|
day = start_date + timedelta(days=i)
|
|
on_count = onboarding_daily[day]
|
|
off_count = offboarding_daily[day]
|
|
total = on_count + off_count
|
|
max_total = max(max_total, total)
|
|
chart_points.append(
|
|
{
|
|
'label': day.strftime('%d.%m'),
|
|
'onboarding': on_count,
|
|
'offboarding': off_count,
|
|
'total': total,
|
|
}
|
|
)
|
|
|
|
for point in chart_points:
|
|
point['height'] = max(8, int((point['total'] / max_total) * 84))
|
|
|
|
onboarding_total = onboarding_qs.count()
|
|
offboarding_total = offboarding_qs.count()
|
|
departments = sorted(
|
|
{
|
|
value.strip()
|
|
for value in list(all_onboarding.exclude(department='').values_list('department', flat=True))
|
|
+ list(all_offboarding.exclude(department='').values_list('department', flat=True))
|
|
if value and value.strip()
|
|
},
|
|
key=str.lower,
|
|
)
|
|
status_choices = [
|
|
{'value': 'submitted', 'label': _request_status_label('submitted', language_code)},
|
|
{'value': 'processing', 'label': _request_status_label('processing', language_code)},
|
|
{'value': 'completed', 'label': _request_status_label('completed', language_code)},
|
|
{'value': 'failed', 'label': _request_status_label('failed', language_code)},
|
|
]
|
|
has_filters = any([search_query, type_filter, status_filter, department_filter, date_from, date_to])
|
|
column_count = 4
|
|
if user_has_capability(request.user, 'delete_requests'):
|
|
column_count += 1
|
|
if user_has_capability(request.user, 'run_intro_session') or user_has_capability(request.user, 'generate_intro_pdfs'):
|
|
column_count += 1
|
|
if user_has_capability(request.user, 'access_requests_dashboard'):
|
|
column_count += 1
|
|
return render(
|
|
request,
|
|
'workflows/requests_dashboard.html',
|
|
{
|
|
'rows': rows[:60],
|
|
'search_query': search_query,
|
|
'selected_type': type_filter,
|
|
'selected_status': status_filter,
|
|
'selected_department': department_filter,
|
|
'date_from': date_from,
|
|
'date_to': date_to,
|
|
'departments': departments,
|
|
'status_choices': status_choices,
|
|
'has_filters': has_filters,
|
|
'column_count': column_count,
|
|
'onboarding_total': onboarding_total,
|
|
'offboarding_total': offboarding_total,
|
|
'combined_total': onboarding_total + offboarding_total,
|
|
'chart_points': chart_points,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
@ensure_csrf_cookie
|
|
def onboarding_create(request):
|
|
config = WorkflowConfig.objects.order_by('id').first()
|
|
legal_text = (
|
|
config.legal_text
|
|
if config and config.legal_text
|
|
else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.'
|
|
)
|
|
|
|
if request.method == 'POST':
|
|
form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email)
|
|
if form.is_valid():
|
|
obj = form.save()
|
|
obj.onboarded_by_name = _display_user_name(request.user)
|
|
obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0])
|
|
obj.save(update_fields=['onboarded_by_name', 'preferred_language'])
|
|
process_onboarding_request.delay(obj.id)
|
|
return redirect(f"/onboarding/new/?saved=1&id={obj.id}")
|
|
else:
|
|
form = OnboardingRequestForm(requester_email=request.user.email)
|
|
|
|
onboarding_blocks = _build_onboarding_layout(form)
|
|
field_pages = getattr(form, '_field_page_keys', {})
|
|
onboarding_sections = _build_onboarding_sections(onboarding_blocks, field_pages)
|
|
|
|
return render(
|
|
request,
|
|
'workflows/onboarding_form.html',
|
|
{
|
|
'form': form,
|
|
'onboarding_blocks': onboarding_blocks,
|
|
'onboarding_sections': onboarding_sections,
|
|
'onboarding_inline_checks': ONBOARDING_INLINE_CHECKS,
|
|
'onboarding_checkbox_lists': ONBOARDING_CHECKBOX_LISTS,
|
|
'legal_text': legal_text,
|
|
'saved': request.GET.get('saved') == '1',
|
|
'saved_request_id': request.GET.get('id', ''),
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def onboarding_success(request, request_id: int):
|
|
obj = get_object_or_404(OnboardingRequest, id=request_id)
|
|
pdf_url = None
|
|
if obj.generated_pdf_path:
|
|
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
|
|
return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
|
|
|
|
|
|
@_require_capability('generate_intro_pdfs')
|
|
@require_POST
|
|
def generate_onboarding_intro_pdf(request, request_id: int):
|
|
obj = get_object_or_404(OnboardingRequest, id=request_id)
|
|
pdf_path = _generate_onboarding_intro_pdf(obj, language_code=get_language())
|
|
obj.intro_pdf_path = str(pdf_path)
|
|
obj.save(update_fields=['intro_pdf_path'])
|
|
_audit(request, 'intro_pdf_generated', target_type='onboarding', target_id=obj.id, target_label=obj.full_name)
|
|
messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.'))
|
|
return redirect('requests_dashboard')
|
|
|
|
|
|
@_require_capability('generate_intro_pdfs')
|
|
@require_POST
|
|
def generate_onboarding_intro_session_pdf(request, request_id: int):
|
|
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
|
|
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
|
|
pdf_path = _generate_onboarding_intro_session_pdf(
|
|
session,
|
|
admin_signature_name=_display_user_name(request.user),
|
|
language_code=get_language(),
|
|
)
|
|
session.exported_pdf_path = str(pdf_path)
|
|
session.save(update_fields=['exported_pdf_path'])
|
|
_audit(request, 'intro_live_pdf_generated', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name)
|
|
messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.'))
|
|
return redirect('onboarding_intro_session_page', request_id=request_id)
|
|
|
|
|
|
@_require_capability('run_intro_session')
|
|
def onboarding_intro_session_page(request, request_id: int):
|
|
onboarding = get_object_or_404(OnboardingRequest, id=request_id)
|
|
session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding)
|
|
sections = build_intro_sections_for_request(onboarding, language_code=get_language())
|
|
|
|
if request.method == 'POST':
|
|
checked_ids = set(request.POST.getlist('checked_items'))
|
|
checklist_state = {}
|
|
for section in sections:
|
|
for item in section['items']:
|
|
checklist_state[item['id']] = item['id'] in checked_ids
|
|
|
|
action = (request.POST.get('session_action') or 'save').strip()
|
|
session.checklist_state = checklist_state
|
|
session.notes = (request.POST.get('notes') or '').strip()
|
|
if action == 'reset':
|
|
session.checklist_state = {}
|
|
session.notes = ''
|
|
session.status = 'draft'
|
|
session.completed_at = None
|
|
session.completed_by_name = ''
|
|
session.exported_pdf_path = ''
|
|
session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path'])
|
|
_audit(request, 'intro_session_reset', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name)
|
|
messages.success(request, _('Einweisung wurde zurückgesetzt.'))
|
|
return redirect('onboarding_intro_session_page', request_id=request_id)
|
|
if action == 'complete':
|
|
session.status = 'completed'
|
|
session.completed_at = timezone.now()
|
|
session.completed_by_name = _display_user_name(request.user)
|
|
_audit(
|
|
request,
|
|
'intro_session_completed',
|
|
target_type='onboarding',
|
|
target_id=onboarding.id,
|
|
target_label=onboarding.full_name,
|
|
details={'checked_count': len([value for value in checklist_state.values() if value])},
|
|
)
|
|
messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.'))
|
|
else:
|
|
session.status = 'draft'
|
|
session.completed_at = None
|
|
session.completed_by_name = ''
|
|
_audit(
|
|
request,
|
|
'intro_session_saved',
|
|
target_type='onboarding',
|
|
target_id=onboarding.id,
|
|
target_label=onboarding.full_name,
|
|
details={'checked_count': len([value for value in checklist_state.values() if value])},
|
|
)
|
|
messages.success(request, _('Einweisung wurde als Entwurf gespeichert.'))
|
|
session.save()
|
|
return redirect('onboarding_intro_session_page', request_id=request_id)
|
|
|
|
checked_map = session.checklist_state or {}
|
|
checked_count = 0
|
|
total_count = 0
|
|
for section in sections:
|
|
for item in section['items']:
|
|
item['checked'] = bool(checked_map.get(item['id']))
|
|
total_count += 1
|
|
if item['checked']:
|
|
checked_count += 1
|
|
|
|
salutation = (onboarding.get_gender_display() or '').strip()
|
|
display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name
|
|
progress_percent = int((checked_count / total_count) * 100) if total_count else 0
|
|
|
|
return render(
|
|
request,
|
|
'workflows/onboarding_intro_session.html',
|
|
{
|
|
'onboarding': onboarding,
|
|
'session': session,
|
|
'display_name': display_name,
|
|
'sections': sections,
|
|
'checked_count': checked_count,
|
|
'total_count': total_count,
|
|
'progress_percent': progress_percent,
|
|
'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
@ensure_csrf_cookie
|
|
def offboarding_create(request):
|
|
profile_id = request.GET.get('profile')
|
|
search_query = request.GET.get('q', '').strip()
|
|
selected_profile = None
|
|
|
|
if profile_id:
|
|
selected_profile = EmployeeProfile.objects.filter(id=profile_id).first()
|
|
|
|
search_results = []
|
|
if search_query:
|
|
search_results = list(
|
|
EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10]
|
|
) + list(
|
|
EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10]
|
|
)
|
|
# preserve order while removing duplicates
|
|
seen = set()
|
|
unique = []
|
|
for r in search_results:
|
|
if r.id not in seen:
|
|
unique.append(r)
|
|
seen.add(r.id)
|
|
search_results = unique[:10]
|
|
|
|
if request.method == 'POST':
|
|
form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile)
|
|
if form.is_valid():
|
|
obj = form.save(commit=False)
|
|
if selected_profile:
|
|
obj.employee_profile = selected_profile
|
|
requester_email = (request.user.email or '').strip().lower()
|
|
if requester_email and requester_email.endswith('@tub.co'):
|
|
obj.requested_by_email = requester_email
|
|
else:
|
|
obj.requested_by_email = settings.DEFAULT_FROM_EMAIL
|
|
obj.requested_by_name = _display_user_name(request.user)
|
|
obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0])
|
|
obj.save()
|
|
process_offboarding_request.delay(obj.id)
|
|
return redirect(f"/offboarding/new/?saved=1&id={obj.id}")
|
|
else:
|
|
form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query})
|
|
|
|
return render(
|
|
request,
|
|
'workflows/offboarding_form.html',
|
|
{
|
|
'form': form,
|
|
'search_results': search_results,
|
|
'selected_profile': selected_profile,
|
|
'search_query': search_query,
|
|
'saved': request.GET.get('saved') == '1',
|
|
'saved_request_id': request.GET.get('id', ''),
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def offboarding_success(request, request_id: int):
|
|
obj = get_object_or_404(OffboardingRequest, id=request_id)
|
|
pdf_url = None
|
|
if obj.generated_pdf_path:
|
|
pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}"
|
|
return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url})
|
|
|
|
|
|
@_require_capability('manage_builders')
|
|
def form_builder_page(request):
|
|
language_code = get_language()
|
|
form_type = request.GET.get('form_type', 'onboarding')
|
|
if form_type not in DEFAULT_FIELD_ORDER:
|
|
form_type = 'onboarding'
|
|
option_category = request.GET.get('option_category', 'department')
|
|
option_categories = [c[0] for c in FormOption.CATEGORY_CHOICES]
|
|
if option_category not in option_categories:
|
|
option_category = option_categories[0]
|
|
|
|
if request.method == 'POST':
|
|
delete_option_id = request.POST.get('delete_option_id', '').strip()
|
|
if delete_option_id:
|
|
option = FormOption.objects.filter(id=delete_option_id).first()
|
|
if not option:
|
|
messages.error(request, 'Option nicht gefunden.')
|
|
else:
|
|
option_category = option.category
|
|
deleted_label = option.label
|
|
deleted_id = option.id
|
|
option.delete()
|
|
_audit(request, 'form_option_deleted', target_type='form_option', target_id=deleted_id, target_label=deleted_label)
|
|
messages.success(request, 'Option wurde gelöscht.')
|
|
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}")
|
|
|
|
action = request.POST.get('builder_action', '')
|
|
if action == 'add_option':
|
|
category = request.POST.get('category', '').strip()
|
|
label = request.POST.get('label', '').strip()
|
|
label_en = request.POST.get('label_en', '').strip()
|
|
value = request.POST.get('value', '').strip()
|
|
if category not in option_categories:
|
|
messages.error(request, 'Ungültige Kategorie.')
|
|
elif not label:
|
|
messages.error(request, 'Bitte einen Namen für die Option angeben.')
|
|
else:
|
|
next_sort = (
|
|
FormOption.objects.filter(category=category).order_by('-sort_order').values_list('sort_order', flat=True).first()
|
|
)
|
|
FormOption.objects.create(
|
|
# Global form option catalog entry
|
|
category=category,
|
|
label=label,
|
|
label_en=label_en,
|
|
value=value or label,
|
|
sort_order=(next_sort + 1) if next_sort is not None else 0,
|
|
is_active=True,
|
|
)
|
|
_audit(
|
|
request,
|
|
'form_option_added',
|
|
target_type='form_option',
|
|
target_label=label,
|
|
details={'category': category, 'label_en': label_en, 'value': value or label},
|
|
)
|
|
messages.success(request, 'Option wurde hinzugefügt.')
|
|
option_category = category
|
|
|
|
elif action == 'save_options':
|
|
option_ids = request.POST.getlist('option_ids')
|
|
for pos, raw_id in enumerate(option_ids):
|
|
option = FormOption.objects.filter(id=raw_id).first()
|
|
if not option:
|
|
continue
|
|
next_label = request.POST.get(f'label_{option.id}', '').strip() or option.label
|
|
option.label = next_label
|
|
option.label_en = request.POST.get(f'label_en_{option.id}', '').strip()
|
|
option.value = request.POST.get(f'value_{option.id}', '').strip() or next_label
|
|
option.is_active = request.POST.get(f'active_{option.id}') == 'on'
|
|
option.sort_order = pos
|
|
try:
|
|
option.save(update_fields=['label', 'label_en', 'value', 'is_active', 'sort_order'])
|
|
except IntegrityError:
|
|
messages.error(request, f'Doppelte Bezeichnung in Kategorie: {next_label}')
|
|
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option.category}")
|
|
option_category = option.category
|
|
_audit(request, 'form_options_saved', target_type='form_option', target_label=option_category, details={'count': len(option_ids)})
|
|
messages.success(request, 'Optionen wurden gespeichert.')
|
|
|
|
elif action == 'save_field_texts':
|
|
field_ids = request.POST.getlist('field_ids')
|
|
for raw_id in field_ids:
|
|
cfg = FormFieldConfig.objects.filter(id=raw_id, form_type=form_type).first()
|
|
if not cfg:
|
|
continue
|
|
cfg.label_override = (request.POST.get(f'label_override_{cfg.id}') or '').strip()
|
|
cfg.label_override_en = (request.POST.get(f'label_override_en_{cfg.id}') or '').strip()
|
|
cfg.help_text_override = (request.POST.get(f'help_text_override_{cfg.id}') or '').strip()
|
|
cfg.help_text_override_en = (request.POST.get(f'help_text_override_en_{cfg.id}') or '').strip()
|
|
cfg.save(update_fields=['label_override', 'label_override_en', 'help_text_override', 'help_text_override_en'])
|
|
_audit(request, 'form_field_texts_saved', target_type='form_config', target_label=form_type, details={'count': len(field_ids)})
|
|
messages.success(request, 'Feldtexte wurden gespeichert.')
|
|
|
|
return redirect(f"/admin-tools/form-builder/?form_type={form_type}&option_category={option_category}")
|
|
|
|
default_names = list(DEFAULT_FIELD_ORDER.get(form_type, []))
|
|
existing_names = list(
|
|
OnboardingRequestForm.base_fields.keys()
|
|
if form_type == 'onboarding'
|
|
else OffboardingRequestForm.base_fields.keys()
|
|
)
|
|
for name in existing_names:
|
|
if name not in default_names:
|
|
default_names.append(name)
|
|
|
|
ensure_form_field_configs(form_type, default_names)
|
|
|
|
configs = list(
|
|
FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name')
|
|
)
|
|
labels = _form_field_labels(form_type)
|
|
locked = LOCKED_FIELD_RULES.get(form_type, set())
|
|
|
|
if form_type == 'onboarding':
|
|
columns = [
|
|
{
|
|
'key': key,
|
|
'title': ONBOARDING_PAGE_LABELS.get(key, key),
|
|
'items': [],
|
|
}
|
|
for key in ONBOARDING_PAGE_ORDER
|
|
]
|
|
column_by_key = {c['key']: c for c in columns}
|
|
fallback = 'abschluss'
|
|
for cfg in configs:
|
|
page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(cfg.field_name, fallback)
|
|
if page_key not in column_by_key:
|
|
page_key = fallback
|
|
column_by_key[page_key]['items'].append(
|
|
{
|
|
'field_name': cfg.field_name,
|
|
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
|
|
'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name),
|
|
'label_en': cfg.label_override_en,
|
|
'is_visible': cfg.is_visible,
|
|
'is_required': cfg.is_required,
|
|
'locked': cfg.field_name in locked,
|
|
}
|
|
)
|
|
else:
|
|
columns = [
|
|
{
|
|
'key': 'all',
|
|
'title': 'Offboarding Felder',
|
|
'items': [
|
|
{
|
|
'field_name': cfg.field_name,
|
|
'label': cfg.translated_label_override(language_code) or labels.get(cfg.field_name, cfg.field_name),
|
|
'label_de': cfg.label_override or labels.get(cfg.field_name, cfg.field_name),
|
|
'label_en': cfg.label_override_en,
|
|
'is_visible': cfg.is_visible,
|
|
'is_required': cfg.is_required,
|
|
'locked': cfg.field_name in locked,
|
|
}
|
|
for cfg in configs
|
|
],
|
|
}
|
|
]
|
|
|
|
return render(
|
|
request,
|
|
'workflows/form_builder.html',
|
|
{
|
|
'form_type': form_type,
|
|
'columns': columns,
|
|
'form_types': [('onboarding', 'Onboarding'), ('offboarding', 'Offboarding')],
|
|
'option_categories': _translate_choice_list(FormOption.CATEGORY_CHOICES),
|
|
'selected_option_category': option_category,
|
|
'option_items': FormOption.objects.filter(category=option_category).order_by('sort_order', 'label'),
|
|
'field_text_items': configs,
|
|
},
|
|
)
|
|
|
|
|
|
@_require_capability('manage_builders')
|
|
def intro_builder_page(request):
|
|
if request.method == 'POST':
|
|
delete_id = (request.POST.get('delete_item_id') or '').strip()
|
|
if delete_id:
|
|
item = IntroChecklistItem.objects.filter(id=delete_id).first()
|
|
if item:
|
|
deleted_label = item.label
|
|
deleted_id_int = item.id
|
|
item.delete()
|
|
_audit(request, 'intro_checklist_item_deleted', target_type='intro_checklist_item', target_id=deleted_id_int, target_label=deleted_label)
|
|
messages.success(request, 'Checklistenpunkt wurde gelöscht.')
|
|
else:
|
|
messages.error(request, 'Checklistenpunkt nicht gefunden.')
|
|
return redirect('intro_builder_page')
|
|
|
|
action = (request.POST.get('builder_action') or '').strip()
|
|
if action == 'add_item':
|
|
section = (request.POST.get('section') or '').strip()
|
|
label = (request.POST.get('label') or '').strip()
|
|
label_en = (request.POST.get('label_en') or '').strip()
|
|
if section not in {k for k, _ in IntroChecklistItem.SECTION_CHOICES}:
|
|
messages.error(request, 'Ungültiger Abschnitt.')
|
|
return redirect('intro_builder_page')
|
|
if not label:
|
|
messages.error(request, 'Bitte eine Bezeichnung für den Checklistenpunkt angeben.')
|
|
return redirect('intro_builder_page')
|
|
next_sort = (
|
|
IntroChecklistItem.objects.filter(section=section).order_by('-sort_order').values_list('sort_order', flat=True).first()
|
|
)
|
|
IntroChecklistItem.objects.create(
|
|
section=section,
|
|
label=label,
|
|
label_en=label_en,
|
|
sort_order=(next_sort + 1) if next_sort is not None else 0,
|
|
is_active=True,
|
|
condition_operator='always',
|
|
)
|
|
_audit(request, 'intro_checklist_item_added', target_type='intro_checklist_item', target_label=label, details={'section': section, 'label_en': label_en})
|
|
messages.success(request, 'Checklistenpunkt wurde hinzugefügt.')
|
|
return redirect('intro_builder_page')
|
|
|
|
if action == 'save_items':
|
|
item_ids = request.POST.getlist('item_ids')
|
|
valid_sections = {k for k, _ in IntroChecklistItem.SECTION_CHOICES}
|
|
valid_ops = {k for k, _ in IntroChecklistItem.OPERATOR_CHOICES}
|
|
for pos, raw_id in enumerate(item_ids):
|
|
item = IntroChecklistItem.objects.filter(id=raw_id).first()
|
|
if not item:
|
|
continue
|
|
section = (request.POST.get(f'section_{item.id}') or item.section).strip()
|
|
if section not in valid_sections:
|
|
section = item.section
|
|
operator = (request.POST.get(f'operator_{item.id}') or item.condition_operator).strip()
|
|
if operator not in valid_ops:
|
|
operator = 'always'
|
|
item.section = section
|
|
item.label = (request.POST.get(f'label_{item.id}') or item.label).strip() or item.label
|
|
item.label_en = (request.POST.get(f'label_en_{item.id}') or '').strip()
|
|
item.is_active = request.POST.get(f'active_{item.id}') == 'on'
|
|
item.condition_field = (request.POST.get(f'field_{item.id}') or '').strip()
|
|
item.condition_operator = operator
|
|
item.condition_value = (request.POST.get(f'value_{item.id}') or '').strip()
|
|
item.sort_order = pos
|
|
item.save(
|
|
update_fields=[
|
|
'section',
|
|
'label',
|
|
'label_en',
|
|
'is_active',
|
|
'condition_field',
|
|
'condition_operator',
|
|
'condition_value',
|
|
'sort_order',
|
|
]
|
|
)
|
|
_audit(request, 'intro_checklist_saved', target_type='intro_checklist_item', details={'count': len(item_ids)})
|
|
messages.success(request, 'Einweisungs-Checkliste wurde gespeichert.')
|
|
return redirect('intro_builder_page')
|
|
|
|
condition_field_choices = [
|
|
('', 'Keine Bedingung'),
|
|
('needed_devices', 'Benötigte Geräte und Gegenstände'),
|
|
('needed_software', 'Benötigte Software'),
|
|
('needed_accesses', 'Benötigte Zugänge'),
|
|
('needed_workspace_groups', 'Benötigte Gruppen im Workspace'),
|
|
('needed_resources', 'Benötigte Ressourcen'),
|
|
('additional_hardware', 'Zusätzliche Hardware'),
|
|
('additional_software', 'Zusätzliche Software'),
|
|
('additional_access_text', 'Weitere Zugänge (Freitext)'),
|
|
('group_mailboxes_required', 'Gruppenpostfächer erforderlich'),
|
|
('order_business_cards', 'Visitenkarten bestellt'),
|
|
('phone_number', 'Direktwahl vorhanden'),
|
|
('successor_name', 'Nachfolge vorhanden'),
|
|
('department', 'Abteilung'),
|
|
]
|
|
|
|
items = list(IntroChecklistItem.objects.all().order_by('section', 'sort_order', 'label'))
|
|
return render(
|
|
request,
|
|
'workflows/intro_builder.html',
|
|
{
|
|
'items': items,
|
|
'section_choices': _translate_choice_list(IntroChecklistItem.SECTION_CHOICES),
|
|
'operator_choices': _translate_choice_list(IntroChecklistItem.OPERATOR_CHOICES),
|
|
'condition_field_choices': condition_field_choices,
|
|
},
|
|
)
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
def integrations_setup_page(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
kind = (request.GET.get('kind') or 'nextcloud').strip().lower()
|
|
if kind not in {'nextcloud', 'mail', 'emails', 'rules', 'backup'}:
|
|
kind = 'nextcloud'
|
|
templates = list(NotificationTemplate.objects.all().order_by('key'))
|
|
system_email_config = (
|
|
SystemEmailConfig.objects.filter(is_active=True).order_by('-updated_at').first()
|
|
or SystemEmailConfig.objects.filter(name='Default SMTP').first()
|
|
)
|
|
return render(
|
|
request,
|
|
'workflows/integrations_setup.html',
|
|
{
|
|
'workflow_config': config,
|
|
'system_email_config': system_email_config,
|
|
'nextcloud_enabled': is_nextcloud_enabled(),
|
|
'email_test_mode': is_email_test_mode(),
|
|
'kind': kind,
|
|
'templates': templates,
|
|
'notification_rules': NotificationRule.objects.all().order_by('event_type', 'sort_order', 'id'),
|
|
'rule_event_choices': NotificationRule.EVENT_CHOICES,
|
|
'rule_operator_choices': NotificationRule.OPERATOR_CHOICES,
|
|
'template_choices': NotificationTemplate.TEMPLATE_CHOICES,
|
|
'remote_backup_target_choices': WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES,
|
|
},
|
|
)
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
def welcome_emails_page(request):
|
|
rows = ScheduledWelcomeEmail.objects.select_related('onboarding_request').order_by('-send_at', '-id')[:200]
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
welcome_template = NotificationTemplate.objects.filter(key='onboarding_welcome').first()
|
|
default_welcome = DEFAULT_NOTIFICATION_TEMPLATES.get('onboarding_welcome', {})
|
|
default_subject = (default_welcome.get('subject') or 'Willkommen bei TUB/CO, {{ FULL_NAME }}').strip()
|
|
default_body = (default_welcome.get('body') or 'Hallo {{ FULL_NAME }}, willkommen bei TUB/CO.').strip()
|
|
default_subject_en = (default_welcome.get('subject_en') or 'Welcome to TUB/CO, {{ FULL_NAME }}').strip()
|
|
default_body_en = (default_welcome.get('body_en') or 'Hello {{ FULL_NAME }}, welcome to TUB/CO.').strip()
|
|
subject_value = (welcome_template.subject_template if welcome_template else '').strip() or default_subject
|
|
body_value = (welcome_template.body_template if welcome_template else '').strip() or default_body
|
|
subject_value_en = (welcome_template.subject_template_en if welcome_template else '').strip() or default_subject_en
|
|
body_value_en = (welcome_template.body_template_en if welcome_template else '').strip() or default_body_en
|
|
return render(
|
|
request,
|
|
'workflows/welcome_emails.html',
|
|
{
|
|
'rows': rows,
|
|
'workflow_config': config,
|
|
'welcome_template': welcome_template,
|
|
'welcome_subject_value': subject_value,
|
|
'welcome_body_value': body_value,
|
|
'welcome_subject_value_en': subject_value_en,
|
|
'welcome_body_value_en': body_value_en,
|
|
'welcome_keywords': ['{{ FULL_NAME }}', '{{ VORNAME }}', '{{ NACHNAME }}', '{{ DEPARTMENT }}', '{{ CONTRACT_START }}', '{{ EMAIL }}', '{{ REQUESTED_BY }}'],
|
|
},
|
|
)
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
@require_POST
|
|
def trigger_welcome_email_now(request, schedule_id: int):
|
|
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
|
|
if not scheduled:
|
|
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
|
|
return redirect('welcome_emails_page')
|
|
if scheduled.status == 'cancelled':
|
|
messages.error(request, f'Welcome E-Mail #{schedule_id} ist abgebrochen und kann nicht gesendet werden.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
|
|
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
|
|
scheduled.status = 'scheduled'
|
|
scheduled.last_error = ''
|
|
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
|
|
_audit(request, 'welcome_email_triggered_now', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
|
|
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde sofort angestoßen.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
@require_POST
|
|
def save_welcome_email_settings(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
try:
|
|
delay_days = int(request.POST.get('welcome_email_delay_days', config.welcome_email_delay_days or 5))
|
|
except ValueError:
|
|
messages.error(request, 'Ungültige Zahl bei der Welcome-Verzögerung.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
config.welcome_email_delay_days = max(0, delay_days)
|
|
config.welcome_sender_email = request.POST.get('welcome_sender_email', '').strip()
|
|
config.welcome_include_pdf = request.POST.get('welcome_include_pdf') == 'on'
|
|
config.save(update_fields=['welcome_email_delay_days', 'welcome_sender_email', 'welcome_include_pdf'])
|
|
|
|
subject = request.POST.get('welcome_subject')
|
|
body = request.POST.get('welcome_body')
|
|
subject_en = request.POST.get('welcome_subject_en')
|
|
body_en = request.POST.get('welcome_body_en')
|
|
if subject is not None or body is not None or subject_en is not None or body_en is not None:
|
|
default_welcome = DEFAULT_NOTIFICATION_TEMPLATES.get('onboarding_welcome', {})
|
|
default_subject = (default_welcome.get('subject') or 'Willkommen bei TUB/CO, {{ FULL_NAME }}').strip()
|
|
default_body = (default_welcome.get('body') or 'Hallo {{ FULL_NAME }}, willkommen bei TUB/CO.').strip()
|
|
default_subject_en = (default_welcome.get('subject_en') or 'Welcome to TUB/CO, {{ FULL_NAME }}').strip()
|
|
default_body_en = (default_welcome.get('body_en') or 'Hello {{ FULL_NAME }}, welcome to TUB/CO.').strip()
|
|
subject_clean = (subject or '').strip() or default_subject
|
|
body_clean = (body or '').strip() or default_body
|
|
subject_clean_en = (subject_en or '').strip() or default_subject_en
|
|
body_clean_en = (body_en or '').strip() or default_body_en
|
|
template, _ = NotificationTemplate.objects.get_or_create(
|
|
key='onboarding_welcome',
|
|
defaults={
|
|
'subject_template': subject_clean,
|
|
'body_template': body_clean,
|
|
'subject_template_en': subject_clean_en,
|
|
'body_template_en': body_clean_en,
|
|
'is_active': True,
|
|
},
|
|
)
|
|
changes = []
|
|
if template.subject_template != subject_clean:
|
|
template.subject_template = subject_clean
|
|
changes.append('subject_template')
|
|
if template.body_template != body_clean:
|
|
template.body_template = body_clean
|
|
changes.append('body_template')
|
|
if template.subject_template_en != subject_clean_en:
|
|
template.subject_template_en = subject_clean_en
|
|
changes.append('subject_template_en')
|
|
if template.body_template_en != body_clean_en:
|
|
template.body_template_en = body_clean_en
|
|
changes.append('body_template_en')
|
|
if not template.is_active:
|
|
template.is_active = True
|
|
changes.append('is_active')
|
|
if changes:
|
|
template.save(update_fields=changes)
|
|
|
|
_audit(
|
|
request,
|
|
'welcome_email_settings_saved',
|
|
target_type='welcome_email_settings',
|
|
target_label='onboarding_welcome',
|
|
details={
|
|
'delay_days': config.welcome_email_delay_days,
|
|
'sender_email': config.welcome_sender_email,
|
|
'include_pdf': config.welcome_include_pdf,
|
|
},
|
|
)
|
|
messages.success(request, 'Welcome-E-Mail Einstellungen wurden gespeichert.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
|
|
def _revoke_celery_task(task_id: str) -> None:
|
|
if not task_id:
|
|
return
|
|
try:
|
|
current_app.control.revoke(task_id, terminate=False)
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _parse_selected_schedule_ids(raw: str) -> list[int]:
|
|
if not raw:
|
|
return []
|
|
parsed: list[int] = []
|
|
seen: set[int] = set()
|
|
for token in raw.split(','):
|
|
token = token.strip()
|
|
if not token:
|
|
continue
|
|
try:
|
|
schedule_id = int(token)
|
|
except ValueError:
|
|
continue
|
|
if schedule_id in seen:
|
|
continue
|
|
seen.add(schedule_id)
|
|
parsed.append(schedule_id)
|
|
return parsed
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
@require_POST
|
|
def bulk_welcome_email_action(request):
|
|
action = (request.POST.get('bulk_action') or '').strip().lower()
|
|
selected_ids = _parse_selected_schedule_ids(request.POST.get('selected_ids', ''))
|
|
|
|
if action not in {'pause', 'send_now', 'delete'}:
|
|
messages.error(request, 'Ungültige Bulk-Aktion.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
if not selected_ids:
|
|
messages.warning(request, 'Keine Welcome-Einträge ausgewählt.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
rows = list(ScheduledWelcomeEmail.objects.filter(id__in=selected_ids).order_by('id'))
|
|
if not rows:
|
|
messages.warning(request, 'Keine passenden Welcome-Einträge gefunden.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
success_count = 0
|
|
skipped_count = 0
|
|
|
|
for scheduled in rows:
|
|
if action == 'pause':
|
|
if scheduled.status in {'sent', 'cancelled'}:
|
|
skipped_count += 1
|
|
continue
|
|
_revoke_celery_task(scheduled.celery_task_id)
|
|
scheduled.status = 'paused'
|
|
scheduled.save(update_fields=['status', 'updated_at'])
|
|
success_count += 1
|
|
continue
|
|
|
|
if action == 'send_now':
|
|
if scheduled.status == 'cancelled':
|
|
skipped_count += 1
|
|
continue
|
|
async_result = send_scheduled_welcome_email.delay(scheduled.id, True)
|
|
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
|
|
scheduled.status = 'scheduled'
|
|
scheduled.last_error = ''
|
|
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
|
|
success_count += 1
|
|
continue
|
|
|
|
if action == 'delete':
|
|
if scheduled.status == 'scheduled':
|
|
_revoke_celery_task(scheduled.celery_task_id)
|
|
scheduled.delete()
|
|
success_count += 1
|
|
|
|
action_label = {
|
|
'pause': 'pausiert',
|
|
'send_now': 'sofort angestoßen',
|
|
'delete': 'gelöscht',
|
|
}[action]
|
|
if success_count:
|
|
_audit(
|
|
request,
|
|
'welcome_email_bulk_action',
|
|
target_type='welcome_email',
|
|
target_label=action,
|
|
details={'selected_ids': selected_ids, 'success_count': success_count, 'skipped_count': skipped_count},
|
|
)
|
|
messages.success(request, f'{success_count} Welcome-Eintrag/Einträge {action_label}.')
|
|
if skipped_count:
|
|
messages.warning(request, f'{skipped_count} Eintrag/Einträge wurden übersprungen (Status nicht geeignet).')
|
|
return redirect('welcome_emails_page')
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
@require_POST
|
|
def pause_welcome_email(request, schedule_id: int):
|
|
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
|
|
if not scheduled:
|
|
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
|
|
return redirect('welcome_emails_page')
|
|
if scheduled.status in {'sent', 'cancelled'}:
|
|
messages.error(request, f'Welcome E-Mail #{schedule_id} kann nicht pausiert werden.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
_revoke_celery_task(scheduled.celery_task_id)
|
|
scheduled.status = 'paused'
|
|
scheduled.save(update_fields=['status', 'updated_at'])
|
|
_audit(request, 'welcome_email_paused', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
|
|
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde pausiert.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
@require_POST
|
|
def resume_welcome_email(request, schedule_id: int):
|
|
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
|
|
if not scheduled:
|
|
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
|
|
return redirect('welcome_emails_page')
|
|
if scheduled.status != 'paused':
|
|
messages.error(request, f'Welcome E-Mail #{schedule_id} ist nicht pausiert.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
eta = scheduled.send_at if timezone.now() < scheduled.send_at else None
|
|
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=eta)
|
|
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
|
|
scheduled.status = 'scheduled'
|
|
scheduled.last_error = ''
|
|
scheduled.save(update_fields=['celery_task_id', 'status', 'last_error', 'updated_at'])
|
|
_audit(request, 'welcome_email_resumed', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
|
|
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde fortgesetzt.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
|
|
@_require_capability('manage_welcome_emails')
|
|
@require_POST
|
|
def cancel_welcome_email(request, schedule_id: int):
|
|
scheduled = ScheduledWelcomeEmail.objects.filter(id=schedule_id).first()
|
|
if not scheduled:
|
|
messages.error(request, f'Geplanter Welcome-Eintrag #{schedule_id} nicht gefunden.')
|
|
return redirect('welcome_emails_page')
|
|
if scheduled.status == 'sent':
|
|
messages.error(request, f'Welcome E-Mail #{schedule_id} wurde bereits gesendet.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
_revoke_celery_task(scheduled.celery_task_id)
|
|
scheduled.status = 'cancelled'
|
|
scheduled.last_error = ''
|
|
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
|
|
_audit(request, 'welcome_email_cancelled', target_type='welcome_email', target_id=scheduled.id, target_label=scheduled.recipient_email)
|
|
messages.success(request, f'Welcome E-Mail #{schedule_id} wurde abgebrochen.')
|
|
return redirect('welcome_emails_page')
|
|
|
|
|
|
@_require_capability('manage_builders')
|
|
@require_POST
|
|
def form_builder_save_order(request):
|
|
try:
|
|
payload = json.loads(request.body.decode('utf-8'))
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
return JsonResponse({'ok': False, 'error': 'Ungültige JSON-Daten.'}, status=400)
|
|
|
|
form_type = payload.get('form_type')
|
|
if form_type not in DEFAULT_FIELD_ORDER:
|
|
return JsonResponse({'ok': False, 'error': 'Ungültiger Formulartyp.'}, status=400)
|
|
|
|
columns = payload.get('columns')
|
|
if not isinstance(columns, dict):
|
|
return JsonResponse({'ok': False, 'error': 'Spalten-Daten fehlen.'}, status=400)
|
|
|
|
configs = list(FormFieldConfig.objects.filter(form_type=form_type).order_by('sort_order', 'field_name'))
|
|
allowed_names = {cfg.field_name for cfg in configs}
|
|
seen = set()
|
|
ordered_names = []
|
|
|
|
if form_type == 'onboarding':
|
|
allowed_columns = ONBOARDING_PAGE_ORDER
|
|
else:
|
|
allowed_columns = ['all']
|
|
|
|
name_to_cfg = {cfg.field_name: cfg for cfg in configs}
|
|
sort_order = 0
|
|
|
|
for column_key in allowed_columns:
|
|
names = columns.get(column_key, [])
|
|
if not isinstance(names, list):
|
|
return JsonResponse({'ok': False, 'error': f'Ungültige Spalte: {column_key}'}, status=400)
|
|
|
|
for name in names:
|
|
if not isinstance(name, str):
|
|
continue
|
|
if name not in allowed_names or name in seen:
|
|
continue
|
|
seen.add(name)
|
|
ordered_names.append(name)
|
|
cfg = name_to_cfg[name]
|
|
cfg.sort_order = sort_order
|
|
sort_order += 1
|
|
if form_type == 'onboarding':
|
|
cfg.page_key = column_key
|
|
else:
|
|
cfg.page_key = ''
|
|
|
|
missing = [cfg.field_name for cfg in configs if cfg.field_name not in seen]
|
|
for name in missing:
|
|
cfg = name_to_cfg[name]
|
|
cfg.sort_order = sort_order
|
|
sort_order += 1
|
|
if form_type == 'onboarding':
|
|
cfg.page_key = cfg.page_key or ONBOARDING_DEFAULT_PAGE.get(name, 'abschluss')
|
|
else:
|
|
cfg.page_key = ''
|
|
|
|
FormFieldConfig.objects.bulk_update(configs, ['sort_order', 'page_key'])
|
|
_audit(request, 'form_layout_saved', target_type='form_config', target_label=form_type, details={'saved_count': len(configs)})
|
|
return JsonResponse({'ok': True, 'saved_count': len(configs)})
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def send_test_email(request):
|
|
mode = 'TEST_MODE_ON' if is_email_test_mode() else 'TEST_MODE_OFF'
|
|
redirect_email = get_email_test_redirect()
|
|
send_system_email(
|
|
subject=f'SMTP test from onboarding/offboarding v2 ({mode})',
|
|
body=(
|
|
'This is a test email. If you see this, SMTP is configured correctly.\n'
|
|
f'EMAIL_TEST_MODE={is_email_test_mode()}\n'
|
|
f'EMAIL_TEST_REDIRECT={redirect_email}\n'
|
|
),
|
|
to=[settings.TEST_NOTIFICATION_EMAIL],
|
|
)
|
|
_audit(request, 'smtp_test_sent', target_type='system_email', target_label=settings.TEST_NOTIFICATION_EMAIL, details={'email_test_mode': is_email_test_mode()})
|
|
messages.success(request, f'SMTP-Testmail wurde gesendet ({mode}).')
|
|
return _redirect_back(request, 'home')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def nextcloud_test_upload(request):
|
|
filename = f"nextcloud_test_{timezone.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
|
content = (
|
|
"Nextcloud test upload from onboarding/offboarding system.\n"
|
|
f"Time: {timezone.now().isoformat()}\n"
|
|
f"User: {request.user.username}\n"
|
|
)
|
|
|
|
temp_path = None
|
|
try:
|
|
with NamedTemporaryFile('w', suffix='.txt', delete=False, encoding='utf-8') as tf:
|
|
tf.write(content)
|
|
temp_path = Path(tf.name)
|
|
|
|
ok = upload_to_nextcloud(temp_path, filename)
|
|
if ok:
|
|
_audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'success'})
|
|
messages.success(request, f'Nextcloud-Testupload erfolgreich: {filename}')
|
|
else:
|
|
_audit(request, 'nextcloud_test_upload', target_type='nextcloud', target_label=filename, details={'result': 'error'})
|
|
messages.error(request, 'Nextcloud-Testupload fehlgeschlagen. Bitte Konfiguration prüfen.')
|
|
except Exception as exc:
|
|
messages.error(request, f'Nextcloud-Testupload fehlgeschlagen: {exc}')
|
|
finally:
|
|
if temp_path and temp_path.exists():
|
|
temp_path.unlink(missing_ok=True)
|
|
|
|
return _redirect_back(request, 'home')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def toggle_nextcloud_enabled(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
currently_enabled = is_nextcloud_enabled()
|
|
config.nextcloud_enabled_override = not currently_enabled
|
|
config.save(update_fields=['nextcloud_enabled_override'])
|
|
_audit(request, 'nextcloud_mode_toggled', target_type='workflow_config', target_label='nextcloud', details={'enabled': config.nextcloud_enabled_override})
|
|
|
|
state = 'aktiviert' if config.nextcloud_enabled_override else 'deaktiviert'
|
|
messages.success(request, f'Nextcloud Upload wurde {state}.')
|
|
return _redirect_back(request, 'home')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def toggle_email_mode(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
currently_test_mode = is_email_test_mode()
|
|
config.email_test_mode_override = not currently_test_mode
|
|
config.save(update_fields=['email_test_mode_override'])
|
|
_audit(request, 'email_mode_toggled', target_type='workflow_config', target_label='email_mode', details={'test_mode': config.email_test_mode_override})
|
|
|
|
state = 'Testmodus (Umleitung)' if config.email_test_mode_override else 'Produktionsmodus'
|
|
messages.success(request, f'E-Mail-Modus wurde auf {state} gesetzt.')
|
|
return _redirect_back(request, 'home')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_integrations_settings(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
try:
|
|
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
|
|
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
|
|
except ValueError:
|
|
messages.error(request, 'Ungültige Zahl bei Sync-Intervall oder SMTP Port.')
|
|
return redirect('home')
|
|
|
|
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
|
|
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
|
|
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
|
|
config.sync_interval_seconds = max(10, sync_interval)
|
|
|
|
config.imap_server = request.POST.get('imap_server', '').strip()
|
|
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
|
|
config.smtp_server = request.POST.get('smtp_server', '').strip()
|
|
config.smtp_port = max(1, smtp_port)
|
|
config.email_account = request.POST.get('email_account', '').strip()
|
|
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
|
|
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
|
|
|
|
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
|
|
if nextcloud_password:
|
|
config.nextcloud_password_override = nextcloud_password
|
|
|
|
email_password = request.POST.get('email_password', '').strip()
|
|
if email_password:
|
|
config.email_password = email_password
|
|
|
|
config.save()
|
|
_audit(request, 'integrations_saved', target_type='workflow_config', target_label='all_integrations')
|
|
messages.success(request, 'Integrations-Einstellungen wurden gespeichert.')
|
|
return redirect('home')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_nextcloud_settings(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
try:
|
|
sync_interval = int(request.POST.get('sync_interval_seconds', config.sync_interval_seconds or 60))
|
|
except ValueError:
|
|
messages.error(request, 'Ungültige Zahl beim Sync-Intervall.')
|
|
return redirect('home')
|
|
|
|
config.nextcloud_base_url_override = request.POST.get('nextcloud_base_url_override', '').strip()
|
|
config.nextcloud_username_override = request.POST.get('nextcloud_username_override', '').strip()
|
|
config.nextcloud_directory_override = request.POST.get('nextcloud_directory_override', '').strip()
|
|
config.sync_interval_seconds = max(10, sync_interval)
|
|
|
|
nextcloud_password = request.POST.get('nextcloud_password_override', '').strip()
|
|
if nextcloud_password:
|
|
config.nextcloud_password_override = nextcloud_password
|
|
|
|
config.save()
|
|
_audit(request, 'nextcloud_settings_saved', target_type='workflow_config', target_label='nextcloud')
|
|
messages.success(request, 'Nextcloud-Einstellungen wurden gespeichert.')
|
|
return redirect('/admin-tools/integrations/?kind=nextcloud')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_workflow_rules(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
try:
|
|
handover_lead_days = int(
|
|
request.POST.get(
|
|
'device_handover_lead_days',
|
|
config.device_handover_lead_days or 5,
|
|
)
|
|
)
|
|
except ValueError:
|
|
messages.error(request, 'Ungültige Zahl beim Hardware-Vorlauf.')
|
|
return redirect('/admin-tools/integrations/?kind=rules')
|
|
|
|
config.device_handover_lead_days = max(0, handover_lead_days)
|
|
config.save(update_fields=['device_handover_lead_days'])
|
|
_audit(
|
|
request,
|
|
'workflow_rules_saved',
|
|
target_type='workflow_config',
|
|
target_label='workflow_rules',
|
|
details={
|
|
'device_handover_lead_days': config.device_handover_lead_days,
|
|
},
|
|
)
|
|
messages.success(request, 'Workflow-Regeln wurden gespeichert.')
|
|
return redirect('/admin-tools/integrations/?kind=rules')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_backup_settings(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
target_type = (request.POST.get('remote_backup_target_type') or config.remote_backup_target_type or 'nextcloud').strip().lower()
|
|
if target_type not in {choice for choice, _ in WorkflowConfig.REMOTE_BACKUP_TARGET_CHOICES}:
|
|
target_type = 'nextcloud'
|
|
remote_backup_enabled = request.POST.get('remote_backup_enabled') == 'on'
|
|
remote_backup_nextcloud_directory = request.POST.get('remote_backup_nextcloud_directory', '').strip()
|
|
primary_nextcloud_directory = (
|
|
(config.nextcloud_directory_override or '').strip()
|
|
or settings.NEXTCLOUD_DIRECTORY.strip()
|
|
).strip('/')
|
|
|
|
if remote_backup_enabled and target_type == 'nextcloud':
|
|
if not remote_backup_nextcloud_directory:
|
|
messages.error(request, 'Bitte ein separates Nextcloud Backup-Verzeichnis angeben.')
|
|
return redirect('/admin-tools/integrations/?kind=backup')
|
|
if remote_backup_nextcloud_directory.strip('/') == primary_nextcloud_directory:
|
|
messages.error(request, 'Das Backup-Verzeichnis muss vom normalen Nextcloud Dokumentenordner getrennt sein.')
|
|
return redirect('/admin-tools/integrations/?kind=backup')
|
|
|
|
config.remote_backup_enabled = remote_backup_enabled
|
|
config.remote_backup_target_type = target_type
|
|
config.remote_backup_nextcloud_directory = remote_backup_nextcloud_directory
|
|
config.remote_backup_s3_bucket = request.POST.get('remote_backup_s3_bucket', '').strip()
|
|
config.remote_backup_nfs_path = request.POST.get('remote_backup_nfs_path', '').strip()
|
|
config.save(
|
|
update_fields=[
|
|
'device_handover_lead_days',
|
|
'remote_backup_enabled',
|
|
'remote_backup_target_type',
|
|
'remote_backup_nextcloud_directory',
|
|
'remote_backup_s3_bucket',
|
|
'remote_backup_nfs_path',
|
|
]
|
|
)
|
|
_audit(
|
|
request,
|
|
'backup_settings_saved',
|
|
target_type='workflow_config',
|
|
target_label='backup_settings',
|
|
details={
|
|
'remote_backup_enabled': config.remote_backup_enabled,
|
|
'remote_backup_target_type': config.remote_backup_target_type,
|
|
'remote_backup_nextcloud_directory': config.remote_backup_nextcloud_directory,
|
|
},
|
|
)
|
|
messages.success(request, 'Backup-Einstellungen wurden gespeichert.')
|
|
return redirect('/admin-tools/integrations/?kind=backup')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_mail_settings(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
try:
|
|
smtp_port = int(request.POST.get('smtp_port', config.smtp_port or 465))
|
|
except ValueError:
|
|
messages.error(request, 'Ungültige Zahl beim SMTP Port.')
|
|
return redirect('home')
|
|
|
|
config.imap_server = request.POST.get('imap_server', '').strip()
|
|
config.mailbox = request.POST.get('mailbox', '').strip() or 'INBOX'
|
|
config.smtp_server = request.POST.get('smtp_server', '').strip()
|
|
config.smtp_port = max(1, smtp_port)
|
|
config.email_account = request.POST.get('email_account', '').strip()
|
|
config.smtp_use_ssl = request.POST.get('smtp_use_ssl') == 'on'
|
|
config.smtp_use_tls = request.POST.get('smtp_use_tls') == 'on'
|
|
|
|
email_password = request.POST.get('email_password', '').strip()
|
|
if email_password:
|
|
config.email_password = email_password
|
|
|
|
config.save()
|
|
smtp_cfg, _ = SystemEmailConfig.objects.get_or_create(name='Default SMTP')
|
|
SystemEmailConfig.objects.exclude(id=smtp_cfg.id).update(is_active=False)
|
|
smtp_cfg.is_active = True
|
|
smtp_cfg.host = config.smtp_server
|
|
smtp_cfg.port = config.smtp_port
|
|
smtp_cfg.username = config.email_account
|
|
if email_password:
|
|
smtp_cfg.password = email_password
|
|
smtp_cfg.use_ssl = config.smtp_use_ssl
|
|
smtp_cfg.use_tls = config.smtp_use_tls
|
|
smtp_cfg.from_email = request.POST.get('from_email', '').strip()
|
|
smtp_cfg.save()
|
|
_audit(request, 'mail_settings_saved', target_type='workflow_config', target_label='mail')
|
|
messages.success(request, 'Mail-Einstellungen wurden gespeichert.')
|
|
return redirect('/admin-tools/integrations/?kind=mail')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_email_routing_settings(request):
|
|
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
|
|
config.it_onboarding_email = request.POST.get('it_onboarding_email', '').strip()
|
|
config.general_info_email = request.POST.get('general_info_email', '').strip()
|
|
config.business_card_email = request.POST.get('business_card_email', '').strip()
|
|
config.hr_works_email = request.POST.get('hr_works_email', '').strip()
|
|
config.key_notification_email = request.POST.get('key_notification_email', '').strip()
|
|
config.save(
|
|
update_fields=[
|
|
'it_onboarding_email',
|
|
'general_info_email',
|
|
'business_card_email',
|
|
'hr_works_email',
|
|
'key_notification_email',
|
|
]
|
|
)
|
|
|
|
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
|
|
for key in known_keys:
|
|
subject = request.POST.get(f'subject_{key}')
|
|
body = request.POST.get(f'body_{key}')
|
|
subject_en = request.POST.get(f'subject_en_{key}')
|
|
body_en = request.POST.get(f'body_en_{key}')
|
|
if subject is None and body is None and subject_en is None and body_en is None:
|
|
continue
|
|
subject = (subject or '').strip()
|
|
body = (body or '').strip()
|
|
subject_en = (subject_en or '').strip()
|
|
body_en = (body_en or '').strip()
|
|
if not subject and not body and not subject_en and not body_en:
|
|
continue
|
|
obj, _ = NotificationTemplate.objects.get_or_create(
|
|
key=key,
|
|
defaults={
|
|
'subject_template': subject or f'[{key}]',
|
|
'body_template': body or '-',
|
|
'subject_template_en': subject_en,
|
|
'body_template_en': body_en,
|
|
'is_active': True,
|
|
},
|
|
)
|
|
changed = []
|
|
if subject and obj.subject_template != subject:
|
|
obj.subject_template = subject
|
|
changed.append('subject_template')
|
|
if body and obj.body_template != body:
|
|
obj.body_template = body
|
|
changed.append('body_template')
|
|
if obj.subject_template_en != subject_en:
|
|
obj.subject_template_en = subject_en
|
|
changed.append('subject_template_en')
|
|
if obj.body_template_en != body_en:
|
|
obj.body_template_en = body_en
|
|
changed.append('body_template_en')
|
|
if not obj.is_active:
|
|
obj.is_active = True
|
|
changed.append('is_active')
|
|
if changed:
|
|
obj.save(update_fields=changed)
|
|
|
|
_audit(request, 'email_routing_saved', target_type='workflow_config', target_label='email_routing')
|
|
messages.success(request, 'E-Mail Routing und Vorlagen wurden gespeichert.')
|
|
return redirect('/admin-tools/integrations/?kind=emails')
|
|
|
|
|
|
@_require_capability('manage_integrations')
|
|
@require_POST
|
|
def save_notification_rules(request):
|
|
rule_ids = request.POST.getlist('rule_ids')
|
|
for position, raw_id in enumerate(rule_ids):
|
|
rule = NotificationRule.objects.filter(id=raw_id).first()
|
|
if not rule:
|
|
continue
|
|
if request.POST.get(f'delete_{rule.id}') == 'on':
|
|
rule.delete()
|
|
continue
|
|
|
|
rule.name = request.POST.get(f'name_{rule.id}', '').strip() or rule.name
|
|
event_type = request.POST.get(f'event_type_{rule.id}', '').strip()
|
|
if event_type in {'onboarding', 'offboarding'}:
|
|
rule.event_type = event_type
|
|
operator = request.POST.get(f'operator_{rule.id}', '').strip()
|
|
if operator in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
|
|
rule.operator = operator
|
|
rule.field_name = request.POST.get(f'field_name_{rule.id}', '').strip()
|
|
rule.expected_value = request.POST.get(f'expected_value_{rule.id}', '').strip()
|
|
rule.recipients = request.POST.get(f'recipients_{rule.id}', '').strip()
|
|
rule.template_key = request.POST.get(f'template_key_{rule.id}', '').strip()
|
|
rule.custom_subject = request.POST.get(f'custom_subject_{rule.id}', '').strip()
|
|
rule.custom_body = request.POST.get(f'custom_body_{rule.id}', '').strip()
|
|
rule.custom_subject_en = request.POST.get(f'custom_subject_en_{rule.id}', '').strip()
|
|
rule.custom_body_en = request.POST.get(f'custom_body_en_{rule.id}', '').strip()
|
|
rule.include_pdf_attachment = request.POST.get(f'include_pdf_{rule.id}') == 'on'
|
|
rule.is_active = request.POST.get(f'active_{rule.id}') == 'on'
|
|
rule.sort_order = position
|
|
rule.save()
|
|
|
|
new_name = request.POST.get('new_name', '').strip()
|
|
new_recipients = request.POST.get('new_recipients', '').strip()
|
|
if new_name and new_recipients:
|
|
new_event = request.POST.get('new_event_type', 'onboarding').strip()
|
|
if new_event not in {'onboarding', 'offboarding'}:
|
|
new_event = 'onboarding'
|
|
new_operator = request.POST.get('new_operator', 'always').strip()
|
|
if new_operator not in {x[0] for x in NotificationRule.OPERATOR_CHOICES}:
|
|
new_operator = 'always'
|
|
NotificationRule.objects.create(
|
|
name=new_name,
|
|
event_type=new_event,
|
|
field_name=request.POST.get('new_field_name', '').strip(),
|
|
operator=new_operator,
|
|
expected_value=request.POST.get('new_expected_value', '').strip(),
|
|
recipients=new_recipients,
|
|
template_key=request.POST.get('new_template_key', '').strip(),
|
|
custom_subject=request.POST.get('new_custom_subject', '').strip(),
|
|
custom_body=request.POST.get('new_custom_body', '').strip(),
|
|
custom_subject_en=request.POST.get('new_custom_subject_en', '').strip(),
|
|
custom_body_en=request.POST.get('new_custom_body_en', '').strip(),
|
|
include_pdf_attachment=request.POST.get('new_include_pdf') == 'on',
|
|
is_active=True,
|
|
sort_order=NotificationRule.objects.filter(event_type=new_event).count() + 1,
|
|
)
|
|
|
|
_audit(request, 'notification_rules_saved', target_type='notification_rule')
|
|
messages.success(request, 'Benachrichtigungsregeln wurden gespeichert.')
|
|
return redirect('/admin-tools/integrations/?kind=emails')
|
|
|
|
|
|
@_require_capability('delete_requests')
|
|
@require_POST
|
|
def delete_request_from_dashboard(request, kind: str, request_id: int):
|
|
if kind == 'onboarding':
|
|
obj = get_object_or_404(OnboardingRequest, id=request_id)
|
|
elif kind == 'offboarding':
|
|
obj = get_object_or_404(OffboardingRequest, id=request_id)
|
|
else:
|
|
messages.error(request, f'Unbekannter Typ: {kind}')
|
|
return redirect('requests_dashboard')
|
|
|
|
target_label = _request_target_label(obj, kind)
|
|
obj.delete()
|
|
_audit(request, 'request_deleted', target_type=kind, target_id=request_id, target_label=target_label)
|
|
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.')
|
|
return redirect('requests_dashboard')
|
|
|
|
|
|
@_require_capability('retry_requests')
|
|
@require_POST
|
|
def retry_request_from_dashboard(request, kind: str, request_id: int):
|
|
if kind == 'onboarding':
|
|
obj = get_object_or_404(OnboardingRequest, id=request_id)
|
|
obj.processing_status = 'submitted'
|
|
obj.last_error = ''
|
|
obj.save(update_fields=['processing_status', 'last_error'])
|
|
process_onboarding_request.delay(obj.id)
|
|
_audit(request, 'request_retried', target_type='onboarding', target_id=obj.id, target_label=_request_target_label(obj, 'onboarding'))
|
|
elif kind == 'offboarding':
|
|
obj = get_object_or_404(OffboardingRequest, id=request_id)
|
|
obj.processing_status = 'submitted'
|
|
obj.last_error = ''
|
|
obj.save(update_fields=['processing_status', 'last_error'])
|
|
process_offboarding_request.delay(obj.id)
|
|
_audit(request, 'request_retried', target_type='offboarding', target_id=obj.id, target_label=_request_target_label(obj, 'offboarding'))
|
|
else:
|
|
messages.error(request, f'Unbekannter Typ: {kind}')
|
|
return redirect('requests_dashboard')
|
|
|
|
messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde erneut angestoßen.')
|
|
return redirect('requests_dashboard')
|