diff --git a/backend/workflows/email_workflows.py b/backend/workflows/email_workflows.py new file mode 100644 index 0000000..61c65df --- /dev/null +++ b/backend/workflows/email_workflows.py @@ -0,0 +1,188 @@ +from datetime import timedelta +from pathlib import Path + +from django.conf import settings +from django.utils import timezone +from jinja2 import Template + +from .branding import get_default_notification_templates +from .emailing import send_system_email +from .forms import OnboardingRequestForm +from .models import NotificationRule, NotificationTemplate, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig +from .services import get_email_test_redirect, is_email_test_mode + + +def resolve_workflow_emails() -> tuple[str, str, str, str, str]: + config = WorkflowConfig.objects.order_by('id').first() + it_email = config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL + general_info_email = config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL + business_card_email = config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL + hr_works_email = config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL + key_email = config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL + return it_email, general_info_email, business_card_email, hr_works_email, key_email + + +def send_workflow_email( + subject: str, + body: str, + to: list[str], + attachments: list[Path] | None = None, + from_email: str | None = None, +) -> None: + recipients = [r for r in to if r] + if not recipients: + return + + effective_to = recipients + effective_body = body + if is_email_test_mode(): + effective_to = [get_email_test_redirect()] + effective_body = ( + '[TEST MODE] Diese E-Mail wurde umgeleitet.\n' + f"Originale Empfänger: {', '.join(recipients)}\n\n{body}" + ) + + send_system_email( + subject=subject, + body=effective_body, + to=effective_to, + attachments=[str(a) for a in (attachments or [])], + from_email=from_email, + ) + + +def render_notification_template(template_key: str, context: dict, language_code: str | None = None) -> tuple[str, str]: + lang = (language_code or 'de').split('-')[0] + db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first() + if db_template: + subject_template = db_template.translated_subject_template(lang) + body_template = db_template.translated_body_template(lang) + else: + fallback = get_default_notification_templates()[template_key] + subject_template = fallback.get(f'subject_{lang}', '') or fallback['subject'] + body_template = fallback.get(f'body_{lang}', '') or fallback['body'] + + subject = Template(subject_template).render(context).strip() + body = Template(body_template).render(context).strip() + return subject, body + + +def parse_recipients(raw: str) -> list[str]: + if not raw: + return [] + cleaned = raw.replace(';', ',').replace('\n', ',') + return [x.strip() for x in cleaned.split(',') if x.strip()] + + +def as_bool(value) -> bool: + if isinstance(value, bool): + return value + if value is None: + return False + text = str(value).strip().lower() + return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'} + + +def rule_matches(rule: NotificationRule, request_obj) -> bool: + if rule.operator == 'always': + return True + + raw_value = getattr(request_obj, rule.field_name, '') + actual = '' if raw_value is None else str(raw_value) + expected = (rule.expected_value or '').strip() + + if rule.operator == 'contains': + return expected.lower() in actual.lower() + if rule.operator == 'equals': + return actual.strip().lower() == expected.lower() + if rule.operator == 'is_true': + return as_bool(raw_value) + if rule.operator == 'is_false': + return not as_bool(raw_value) + return False + + +def send_templated_email( + template_key: str, + to: list[str], + context: dict, + attachments: list[Path] | None = None, + from_email: str | None = None, + language_code: str | None = None, +) -> None: + subject, body = render_notification_template(template_key, context, language_code=language_code) + send_workflow_email(subject=subject, body=body, to=to, attachments=attachments, from_email=from_email) + + +def apply_notification_rules( + event_type: str, + request_obj, + context: dict, + pdf_path: Path | None = None, +) -> None: + language_code = (getattr(request_obj, 'preferred_language', '') or 'de').split('-')[0] + rules = NotificationRule.objects.filter(event_type=event_type, is_active=True).order_by('sort_order', 'id') + for rule in rules: + if not rule_matches(rule, request_obj): + continue + + recipients = parse_recipients(rule.recipients) + if not recipients: + continue + + attachments = [pdf_path] if (pdf_path and rule.include_pdf_attachment) else None + template_key = (rule.template_key or '').strip() + known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES} + + if template_key and template_key in known_keys: + send_templated_email( + template_key=template_key, + context=context, + to=recipients, + attachments=attachments, + language_code=language_code, + ) + continue + + subject = rule.translated_custom_subject(language_code) + body = rule.translated_custom_body(language_code) + if not subject and not body: + continue + + subject_rendered = Template(subject or f'[{event_type}] Regelmail').render(context).strip() + body_rendered = Template(body or '-').render(context).strip() + send_workflow_email( + subject=subject_rendered, + body=body_rendered, + to=recipients, + attachments=attachments, + ) + + +def schedule_welcome_email(request_obj: OnboardingRequest, *, send_scheduled_welcome_email_task) -> None: + recipient = (request_obj.work_email or '').strip().lower() + if not recipient: + return + config = WorkflowConfig.objects.order_by('id').first() + delay_days = 5 + if config: + delay_days = max(0, int(config.welcome_email_delay_days or 5)) + send_at = timezone.now() + timedelta(days=delay_days) + scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create( + onboarding_request=request_obj, + defaults={ + 'recipient_email': recipient, + 'send_at': send_at, + 'status': 'scheduled', + 'last_error': '', + 'sent_at': None, + }, + ) + try: + async_result = send_scheduled_welcome_email_task.apply_async(args=[scheduled.id], eta=send_at) + scheduled.celery_task_id = async_result.id or '' + scheduled.save(update_fields=['celery_task_id', 'updated_at']) + except Exception as exc: + scheduled.status = 'failed' + scheduled.last_error = f'Scheduling failed: {exc}' + scheduled.save(update_fields=['status', 'last_error', 'updated_at']) diff --git a/backend/workflows/notification_dispatch.py b/backend/workflows/notification_dispatch.py new file mode 100644 index 0000000..37a8837 --- /dev/null +++ b/backend/workflows/notification_dispatch.py @@ -0,0 +1,29 @@ +from django.utils.translation import gettext as _ + +from .notifications import notify_user_by_email + + +def notify_request_result(*, recipient_email: str, title: str, body: str, level: str, event_key: str) -> None: + notify_user_by_email( + email=recipient_email, + title=title, + body=body, + level=level, + link_url='/requests/', + event_key=event_key, + ) + + +def notify_welcome_email_result(*, recipient_email: str, full_name: str, body: str, level: str, event_key: str) -> None: + notify_user_by_email( + email=recipient_email, + title=( + _('Welcome E-Mail gesendet: %(name)s') % {'name': full_name} + if event_key == 'welcome_email_success' + else _('Welcome E-Mail fehlgeschlagen: %(name)s') % {'name': full_name} + ), + body=body, + level=level, + link_url='/admin-tools/welcome-emails/', + event_key=event_key, + ) diff --git a/backend/workflows/observability_views.py b/backend/workflows/observability_views.py index 93c073e..91f4866 100644 --- a/backend/workflows/observability_views.py +++ b/backend/workflows/observability_views.py @@ -8,7 +8,6 @@ from django.utils.translation import gettext as _ from .backup_ops import create_backup_bundle, latest_backup_health_snapshot, list_backup_bundles, verify_backup_bundle from .models import AdminAuditLog, AsyncTaskLog, UserNotification, UserProfile -from .notifications import notify_user from .roles import user_has_capability @@ -96,9 +95,9 @@ def backup_recovery_page_impl(request): ) -def create_backup_from_admin_impl(request, *, audit_fn): +def create_backup_from_admin_impl(request, *, audit_fn, notify_user_fn, create_backup_bundle_fn): try: - result = create_backup_bundle() + result = create_backup_bundle_fn() audit_fn( request, 'backup_created', @@ -106,7 +105,7 @@ def create_backup_from_admin_impl(request, *, audit_fn): target_label=result['name'], details={'path': result['path']}, ) - notify_user( + notify_user_fn( user=request.user, title=_('Backup erstellt: %(name)s') % {'name': result['name']}, body=_('Das Backup-Bundle wurde erfolgreich erstellt.'), @@ -116,7 +115,7 @@ def create_backup_from_admin_impl(request, *, audit_fn): ) messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']}) except Exception as exc: - notify_user( + notify_user_fn( user=request.user, title=_('Backup fehlgeschlagen'), body=str(exc), @@ -128,9 +127,9 @@ def create_backup_from_admin_impl(request, *, audit_fn): return redirect('backup_recovery_page') -def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn): +def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn, notify_user_fn, verify_backup_bundle_fn): try: - result = verify_backup_bundle(backup_name) + result = verify_backup_bundle_fn(backup_name) audit_fn( request, 'backup_verified', @@ -138,7 +137,7 @@ def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn): target_label=backup_name, details={'summary': result['summary']}, ) - notify_user( + notify_user_fn( user=request.user, title=_('Backup verifiziert: %(name)s') % {'name': result['name']}, body=result.get('summary') or _('Das Backup wurde erfolgreich verifiziert.'), @@ -148,7 +147,7 @@ def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn): ) messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']}) except Exception as exc: - notify_user( + notify_user_fn( user=request.user, title=_('Backup-Verifikation fehlgeschlagen'), body=str(exc), diff --git a/backend/workflows/tasks.py b/backend/workflows/tasks.py index 36cde91..008e197 100644 --- a/backend/workflows/tasks.py +++ b/backend/workflows/tasks.py @@ -13,9 +13,9 @@ from jinja2 import Template from pypdf import PageObject, PdfReader, PdfWriter from xhtml2pdf import pisa -from .branding import get_branding_email_copy, get_company_contact_copy, get_default_notification_templates, get_portal_letterhead_path +from .branding import get_branding_email_copy, get_company_contact_copy, get_portal_letterhead_path +from . import email_workflows, notification_dispatch, pdf_rendering from .models import AsyncTaskLog, EmployeeProfile, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig -from .emailing import send_system_email from .services import upload_to_nextcloud from .services import get_email_test_redirect, is_email_test_mode from .forms import ( @@ -28,7 +28,6 @@ from .forms import ( SOFTWARE_EXTRA_CHOICES, WORKSPACE_GROUP_CHOICES, ) -from .notifications import notify_user_by_email from .pdf_sections import build_pdf_sections # These templates are the product-level defaults for fresh deployments. @@ -254,27 +253,21 @@ DEFAULT_NOTIFICATION_TEMPLATES = { def _notify_request_result(*, recipient_email: str, title: str, body: str, level: str, event_key: str) -> None: - notify_user_by_email( - email=recipient_email, + return notification_dispatch.notify_request_result( + recipient_email=recipient_email, title=title, body=body, level=level, - link_url='/requests/', event_key=event_key, ) def _notify_welcome_email_result(*, recipient_email: str, full_name: str, body: str, level: str, event_key: str) -> None: - notify_user_by_email( - email=recipient_email, - title=( - _('Welcome E-Mail gesendet: %(name)s') % {'name': full_name} - if event_key == 'welcome_email_success' - else _('Welcome E-Mail fehlgeschlagen: %(name)s') % {'name': full_name} - ), + return notification_dispatch.notify_welcome_email_result( + recipient_email=recipient_email, + full_name=full_name, body=body, level=level, - link_url='/admin-tools/welcome-emails/', event_key=event_key, ) @@ -300,211 +293,35 @@ def _finish_task_log(task_log: AsyncTaskLog | None, *, status: str, error_messag task_log.save(update_fields=['status', 'error_message', 'finished_at']) def _split_name(full_name: str) -> tuple[str, str]: - parts = full_name.split() - if not parts: - return '', '' - return parts[0], ' '.join(parts[1:]) + return pdf_rendering._split_name(full_name) def _safe_filename_fragment(text: str, fallback: str = 'document') -> str: - value = re.sub(r'[^A-Za-z0-9._-]+', '_', (text or '').strip()).strip('._') - return value[:120] if value else fallback + return pdf_rendering._safe_filename_fragment(text, fallback=fallback) def _resolve_user_display_name(email: str) -> str: - email = (email or '').strip().lower() - if not email: - return '' - user_model = get_user_model() - user = user_model.objects.filter(email__iexact=email).first() - if not user: - return '' - first_name = (getattr(user, 'first_name', '') or '').strip() - last_name = (getattr(user, 'last_name', '') or '').strip() - full_name = f'{first_name} {last_name}'.strip() - if full_name: - return full_name - return (getattr(user, 'username', '') or '').strip() + return pdf_rendering._resolve_user_display_name(email) def _chunk_list(data_list: list[str], chunk_size: int = 3) -> list[list[str]]: - items = [i.strip() for i in data_list if i and i.strip()] - chunks = [] - for i in range(0, len(items), chunk_size): - chunks.append(items[i : i + chunk_size]) - return chunks + return pdf_rendering._chunk_list(data_list, chunk_size=chunk_size) def _split_multiline(text: str) -> list[str]: - return [line.strip() for line in (text or '').split('\n') if line.strip()] + return pdf_rendering._split_multiline(text) def _chunk_choice_labels(choices: list[tuple[str, str]], chunk_size: int = 3) -> list[list[str]]: - labels = [label for _, label in choices] - return _chunk_list(labels, chunk_size=chunk_size) + return pdf_rendering._chunk_choice_labels(choices, chunk_size=chunk_size) def _normalized_lang(language_code: str | None) -> str: - return (language_code or 'de').split('-')[0].lower() or 'de' + return pdf_rendering._normalized_lang(language_code) def _pdf_texts(language_code: str | None = None) -> dict[str, str]: - lang = _normalized_lang(language_code) - texts = { - 'de': { - 'lang': 'de', - 'not_available': 'Keine Angabe', - 'not_available_short': '-', - 'yes': 'Ja', - 'no': 'Nein', - 'onboarding_title': 'Onboarding-Unterlagen', - 'onboarding_staff_data': 'Personaldaten', - 'name': 'Name', - 'department': 'Abteilung', - 'job_title': 'Berufsbezeichnung', - 'work_email': 'Dienstliche E-Mail', - 'employment_type': 'Beschäftigungsverhältnis', - 'contract_start': 'Vertragsbeginn', - 'contract_end': 'Vertragsende', - 'handover_date': 'Übergabedatum', - 'equipment_access': 'Ausstattung und Zugänge', - 'devices': 'Benötigte Geräte und Gegenstände', - 'workspace_groups': 'Benötigte Gruppen im Workspace', - 'software': 'Benötigte Software', - 'accesses': 'Benötigte Zugänge', - 'resources': 'Benötigte Ressourcen', - 'group_mailboxes_required': 'Gruppenpostfächer erforderlich', - 'additional_hardware_needed': 'Darüber hinaus wird weitere Hardware benötigt', - 'additional_software_needed': 'Wird zusätzliche Software benötigt', - 'additional_access_needed': 'Darüber hinaus werden weitere Zugänge benötigt', - 'additional_details': 'Zusätzliche Angaben', - 'business_cards': 'Visitenkarten', - 'email': 'E-Mail', - 'phone': 'Telefon', - 'additional_hardware_other': 'Weitere Hardware (Freitext)', - 'successor_phone': 'Nachfolge und Telefon', - 'successor_of': 'Nachfolge von', - 'inherit_phone_number': 'Telefon von Vorgänger übernehmen', - 'direct_extension': 'Direktwahl', - 'notes': 'Notizen', - 'confirmation': 'Bestätigung', - 'requested_by_name': 'Angefordert von (Name)', - 'requested_by_email': 'Angefordert von (E-Mail)', - 'signature': 'Unterschrift', - 'signature_alt': 'Unterschrift', - 'onboarding_note': 'Hinweis: Dieses Formular dient als interne Prozessgrundlage für das Onboarding.', - 'offboarding_title': 'Offboarding-Unterlagen', - 'employee_info': 'Mitarbeitenden-Informationen', - 'last_working_day': 'Letzter Arbeitstag', - 'offboarding_requester': 'Offboarding-Anfordernde Person', - 'it_hardware_status': 'IT-Hardware-Status (aus Onboarding)', - 'hardware_check': 'Hardware-Check', - 'no_onboarding_hardware': 'Keine Onboarding-Hardwaredaten gefunden.', - 'manual_return_overview': 'Manuelle Rückgabeübersicht', - 'manual_return_note': 'Es wurden keine gespeicherten Onboarding-Daten zu dieser Person gefunden. Die folgenden Listen dienen als manuelle Rückgabe- und Prüfübersicht.', - 'returned_devices': 'Zurückgegebene Geräte und Artikel', - 'returned_software': 'Zurückgegebene bzw. deaktivierte Software', - 'removed_workspace_groups': 'Entfernte Gruppen im Workspace', - 'removed_accesses': 'Entfernte Zugänge', - 'returned_extra_it': 'Zurückgegebene zusätzliche Hardware / Software', - 'it_signatures': 'IT-Section: Signaturen', - 'it_checked_by': 'IT geprüft am durch:', - 'it_signature': 'IT-Unterschrift:', - 'return_complete': 'Rückgabe vollständig:', - 'offboarding_note': 'Hinweis: Dieses Formular dient als interne Prozessgrundlage für das Offboarding.', - 'intro_title': 'Einweisungs- und Übergabeprotokoll', - 'intro_sub': 'Gesprächsleitfaden für die persönliche Einführung neuer Mitarbeitender.', - 'base_data': 'Basisdaten', - 'start_date': 'Startdatum', - 'introduced_by': 'Einweisung durch', - 'intro_note': 'Dieses Dokument dient als Gesprächsleitfaden für die persönliche Einweisung. Die Felder können während des Termins manuell abgehakt und anschließend unterschrieben werden.', - 'employee_signature': 'Unterschrift Mitarbeitende Person:', - 'trainer_signature': 'Unterschrift Einweisende Person:', - 'intro_completed_at': 'Einweisung durchgeführt am:', - 'open_questions': 'Rückfragen offen / Nacharbeit erforderlich:', - 'live_intro_title': 'Einweisungsprotokoll', - 'live_intro_sub': 'Export des aktuellen Live-Status aus der webbasierten Einweisung.', - 'employment_start': 'Vertragsbeginn', - 'employee_signature_block': 'Unterschrift Mitarbeitende Person', - }, - 'en': { - 'lang': 'en', - 'not_available': 'Not provided', - 'not_available_short': '-', - 'yes': 'Yes', - 'no': 'No', - 'onboarding_title': 'Onboarding Documents', - 'onboarding_staff_data': 'Employee Details', - 'name': 'Name', - 'department': 'Department', - 'job_title': 'Job title', - 'work_email': 'Work email', - 'employment_type': 'Employment type', - 'contract_start': 'Contract start', - 'contract_end': 'Contract end', - 'handover_date': 'Handover date', - 'equipment_access': 'Equipment and access', - 'devices': 'Required devices and items', - 'workspace_groups': 'Required workspace groups', - 'software': 'Required software', - 'accesses': 'Required accesses', - 'resources': 'Required resources', - 'group_mailboxes_required': 'Group mailboxes required', - 'additional_hardware_needed': 'Additional hardware required', - 'additional_software_needed': 'Additional software required', - 'additional_access_needed': 'Additional accesses required', - 'additional_details': 'Additional details', - 'business_cards': 'Business cards', - 'email': 'Email', - 'phone': 'Phone', - 'additional_hardware_other': 'Additional hardware (free text)', - 'successor_phone': 'Successor and phone', - 'successor_of': 'Successor to', - 'inherit_phone_number': 'Take over predecessor phone number', - 'direct_extension': 'Direct extension', - 'notes': 'Notes', - 'confirmation': 'Confirmation', - 'requested_by_name': 'Requested by (name)', - 'requested_by_email': 'Requested by (email)', - 'signature': 'Signature', - 'signature_alt': 'Signature', - 'onboarding_note': 'Note: This form serves as the internal process basis for onboarding.', - 'offboarding_title': 'Offboarding Documents', - 'employee_info': 'Employee information', - 'last_working_day': 'Last working day', - 'offboarding_requester': 'Offboarding requester', - 'it_hardware_status': 'IT hardware status (from onboarding)', - 'hardware_check': 'Hardware check', - 'no_onboarding_hardware': 'No onboarding hardware data found.', - 'manual_return_overview': 'Manual return overview', - 'manual_return_note': 'No stored onboarding data was found for this person. The following lists serve as a manual return and review overview.', - 'returned_devices': 'Returned devices and items', - 'returned_software': 'Returned or disabled software', - 'removed_workspace_groups': 'Removed workspace groups', - 'removed_accesses': 'Removed accesses', - 'returned_extra_it': 'Returned additional hardware / software', - 'it_signatures': 'IT section: signatures', - 'it_checked_by': 'Checked by IT on:', - 'it_signature': 'IT signature:', - 'return_complete': 'Return complete:', - 'offboarding_note': 'Note: This form serves as the internal process basis for offboarding.', - 'intro_title': 'Introduction and Handover Protocol', - 'intro_sub': 'Conversation guide for the personal introduction of new employees.', - 'base_data': 'Basic data', - 'start_date': 'Start date', - 'introduced_by': 'Introduction by', - 'intro_note': 'This document serves as a conversation guide for the personal introduction. The fields can be checked manually during the meeting and signed afterwards.', - 'employee_signature': 'Employee signature:', - 'trainer_signature': 'Trainer signature:', - 'intro_completed_at': 'Introduction completed on:', - 'open_questions': 'Open questions / follow-up required:', - 'live_intro_title': 'Introduction Protocol', - 'live_intro_sub': 'Export of the current live status from the web-based introduction.', - 'employment_start': 'Contract start', - 'employee_signature_block': 'Employee signature', - }, - } - return texts.get(lang, texts['de']) + return pdf_rendering._pdf_texts(language_code) MANUAL_ONBOARDING_FIELD_SECTIONS = [ @@ -562,137 +379,23 @@ MANUAL_ONBOARDING_FIELD_SECTIONS = [ def _manual_onboarding_field_sections() -> list[dict]: - fields = OnboardingRequestForm.base_fields - sections = [] - for title, field_names in MANUAL_ONBOARDING_FIELD_SECTIONS: - labels = [str(fields[name].label or name) for name in field_names if name in fields] - if not labels: - continue - sections.append({'title': title, 'rows': _chunk_list(labels, chunk_size=2)}) - return sections + return pdf_rendering._manual_onboarding_field_sections() def _resolve_workflow_emails() -> tuple[str, str, str, str, str]: - config = WorkflowConfig.objects.order_by('id').first() - it_email = (config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL) - general_info_email = (config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL) - business_card_email = (config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL) - hr_works_email = (config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL) - key_email = (config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL) - return it_email, general_info_email, business_card_email, hr_works_email, key_email + return email_workflows.resolve_workflow_emails() def _matches_intro_condition(request_obj: OnboardingRequest, item: IntroChecklistItem) -> bool: - operator = (item.condition_operator or 'always').strip() - field_name = (item.condition_field or '').strip() - expected = (item.condition_value or '').strip() - - if operator == 'always' or not field_name: - return True - - raw_value = getattr(request_obj, field_name, '') - if raw_value is None: - raw_value = '' - - if operator == 'is_true': - return bool(raw_value) - if operator == 'is_false': - return not bool(raw_value) - - text_value = str(raw_value).strip() - if operator == 'equals': - return text_value.lower() == expected.lower() - if operator == 'contains': - return expected.lower() in text_value.lower() - return True + return pdf_rendering._matches_intro_condition(request_obj, item) def _build_intro_sections_from_admin(request_obj: OnboardingRequest, language_code: str | None = None) -> dict[str, list[str]]: - items = list(IntroChecklistItem.objects.filter(is_active=True).order_by('section', 'sort_order', 'label')) - if not items: - return {} - - section_map = {key: [] for key, _label in IntroChecklistItem.SECTION_CHOICES} - for item in items: - if item.section not in section_map: - continue - if _matches_intro_condition(request_obj, item): - section_map[item.section].append(item.translated_label(language_code)) - return {key: values for key, values in section_map.items() if values} + return pdf_rendering._build_intro_sections_from_admin(request_obj, language_code=language_code) def build_intro_sections_for_request(request_obj: OnboardingRequest, language_code: str | None = None) -> list[dict]: - lang = _normalized_lang(language_code or get_language()) - with override(lang): - section_titles = { - 'workplace': _('Geräte und Arbeitsplatz'), - 'accounts': _('Konten und Berechtigungen'), - 'software': _('Software und Tools'), - 'process': _('Prozesse und Hinweise'), - } - devices = _split_multiline(request_obj.needed_devices) - software = _split_multiline(request_obj.needed_software) - accesses = _split_multiline(request_obj.needed_accesses) - groups = _split_multiline(request_obj.needed_workspace_groups) - resources = _split_multiline(request_obj.needed_resources) - extra_hardware = _split_multiline(request_obj.additional_hardware) - extra_software = _split_multiline(request_obj.additional_software) - group_mailboxes = _split_multiline(request_obj.group_mailboxes) - - workplace_items = [] - for item in devices: - workplace_items.append(_('%(item)s übergeben und Grundfunktionen erklärt') % {'item': item}) - for item in resources: - workplace_items.append(_('%(item)s gezeigt bzw. Nutzung erklärt') % {'item': item}) - if request_obj.phone_number: - workplace_items.append(_('Telefonnummer / Direktwahl erklärt: %(value)s') % {'value': request_obj.phone_number}) - if not workplace_items: - workplace_items.append(_('Arbeitsplatz, Geräte und allgemeine Nutzung besprochen')) - - account_items = [_('%(item)s Zugang erklärt') % {'item': item} for item in accesses] - account_items.extend([_('%(item)s Gruppe / Berechtigung erläutert') % {'item': item} for item in groups]) - if request_obj.work_email: - account_items.insert(0, _('Dienstliche E-Mail-Adresse erläutert: %(value)s') % {'value': request_obj.work_email}) - if group_mailboxes: - account_items.extend([_('Gruppenpostfach erklärt: %(item)s') % {'item': item} for item in group_mailboxes]) - if not account_items: - account_items.append(_('Zugänge, Konten und Anmeldelogik besprochen')) - - software_items = [_('%(item)s Einführung durchgeführt') % {'item': item} for item in software] - software_items.extend([_('%(item)s zusätzlich besprochen') % {'item': item} for item in extra_software]) - if not software_items: - software_items.append(_('Benötigte Standardsoftware und tägliche Nutzung erklärt')) - - process_items = [ - _('Passwortregeln und sicherer Umgang besprochen'), - _('Dateiablage, Nextcloud und Freigaben erklärt'), - _('Kommunikationswege und Support-Prozess erklärt'), - ] - if extra_hardware: - process_items.extend([_('%(item)s als zusätzliche Ausstattung besprochen') % {'item': item} for item in extra_hardware]) - if request_obj.additional_access_text: - process_items.extend([_('Zusätzlicher Zugang besprochen: %(item)s') % {'item': item} for item in _split_multiline(request_obj.additional_access_text)]) - if request_obj.successor_name: - process_items.append(_('Übergabe-/Nachfolgekontext besprochen: %(value)s') % {'value': request_obj.successor_name}) - - custom_intro_items = _build_intro_sections_from_admin(request_obj, lang) - intro_sections_raw = [ - ('workplace', section_titles['workplace'], workplace_items), - ('accounts', section_titles['accounts'], account_items), - ('software', section_titles['software'], software_items), - ('process', section_titles['process'], process_items), - ] - - sections = [] - for key, title, default_items in intro_sections_raw: - merged_items = list(default_items) - merged_items.extend(custom_intro_items.get(key, [])) - section_items = [] - for idx, label in enumerate(merged_items, start=1): - section_items.append({'id': f'{key}_{idx}', 'label': label}) - if section_items: - sections.append({'key': key, 'title': title, 'items': section_items}) - return sections + return pdf_rendering.build_intro_sections_for_request(request_obj, language_code=language_code) def _send_workflow_email( @@ -702,77 +405,33 @@ def _send_workflow_email( attachments: list[Path] | None = None, from_email: str | None = None, ) -> None: - recipients = [r for r in to if r] - if not recipients: - return - - effective_to = recipients - effective_body = body - if is_email_test_mode(): - effective_to = [get_email_test_redirect()] - effective_body = ( - "[TEST MODE] Diese E-Mail wurde umgeleitet.\n" - f"Originale Empfänger: {', '.join(recipients)}\n\n{body}" - ) - - send_system_email( + return email_workflows.send_workflow_email( subject=subject, - body=effective_body, - to=effective_to, - attachments=[str(a) for a in (attachments or [])], + body=body, + to=to, + attachments=attachments, from_email=from_email, ) def _render_notification_template(template_key: str, context: dict, language_code: str | None = None) -> tuple[str, str]: - lang = (language_code or 'de').split('-')[0] - db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first() - if db_template: - subject_template = db_template.translated_subject_template(lang) - body_template = db_template.translated_body_template(lang) - else: - fallback = get_default_notification_templates()[template_key] - subject_template = fallback.get(f'subject_{lang}', '') or fallback['subject'] - body_template = fallback.get(f'body_{lang}', '') or fallback['body'] - - subject = Template(subject_template).render(context).strip() - body = Template(body_template).render(context).strip() - return subject, body + return email_workflows.render_notification_template( + template_key, + context, + language_code=language_code, + ) def _parse_recipients(raw: str) -> list[str]: - if not raw: - return [] - cleaned = raw.replace(';', ',').replace('\n', ',') - return [x.strip() for x in cleaned.split(',') if x.strip()] + return email_workflows.parse_recipients(raw) def _as_bool(value) -> bool: - if isinstance(value, bool): - return value - if value is None: - return False - text = str(value).strip().lower() - return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'} + return email_workflows.as_bool(value) def _rule_matches(rule: NotificationRule, request_obj) -> bool: - if rule.operator == 'always': - return True - - raw_value = getattr(request_obj, rule.field_name, '') - actual = '' if raw_value is None else str(raw_value) - expected = (rule.expected_value or '').strip() - - if rule.operator == 'contains': - return expected.lower() in actual.lower() - if rule.operator == 'equals': - return actual.strip().lower() == expected.lower() - if rule.operator == 'is_true': - return _as_bool(raw_value) - if rule.operator == 'is_false': - return not _as_bool(raw_value) - return False + return email_workflows.rule_matches(rule, request_obj) def _apply_notification_rules( @@ -821,32 +480,10 @@ def _apply_notification_rules( def _schedule_welcome_email(request_obj: OnboardingRequest) -> None: - recipient = (request_obj.work_email or '').strip().lower() - if not recipient: - return - config = WorkflowConfig.objects.order_by('id').first() - delay_days = 5 - if config: - delay_days = max(0, int(config.welcome_email_delay_days or 5)) - send_at = timezone.now() + timedelta(days=delay_days) - scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create( - onboarding_request=request_obj, - defaults={ - 'recipient_email': recipient, - 'send_at': send_at, - 'status': 'scheduled', - 'last_error': '', - 'sent_at': None, - }, + return email_workflows.schedule_welcome_email( + request_obj, + send_scheduled_welcome_email_task=send_scheduled_welcome_email, ) - try: - async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=send_at) - scheduled.celery_task_id = async_result.id or '' - scheduled.save(update_fields=['celery_task_id', 'updated_at']) - except Exception as exc: - scheduled.status = 'failed' - scheduled.last_error = f'Scheduling failed: {exc}' - scheduled.save(update_fields=['status', 'last_error', 'updated_at']) def _send_templated_email( @@ -862,281 +499,23 @@ def _send_templated_email( def _render_html(template_path: Path, context: dict) -> str: - with template_path.open('r', encoding='utf-8') as handle: - template = Template(handle.read()) - return template.render(context) + return pdf_rendering._render_html(template_path, context) def _generate_content_pdf(html_content: str, output_pdf: Path) -> None: - page_style = ( - '' - ) - if '' in html_content: - html_content = html_content.replace('', f'{page_style}', 1) - else: - html_content = page_style + html_content - - output_pdf.parent.mkdir(parents=True, exist_ok=True) - with output_pdf.open('wb') as fp: - result = pisa.CreatePDF( - src=html_content, - dest=fp, - encoding='utf-8', - ) - if result.err: - raise RuntimeError(f'Failed to render PDF content for {output_pdf.name}') + return pdf_rendering._generate_content_pdf(html_content, output_pdf) def _overlay_with_letterhead(content_pdf: Path, letterhead_pdf: Path, output_pdf: Path) -> None: - letterhead_reader = PdfReader(str(letterhead_pdf)) - content_reader = PdfReader(str(content_pdf)) - writer = PdfWriter() - - letterhead_page = letterhead_reader.pages[0] - for page in content_reader.pages: - merged = PageObject.create_blank_page( - width=letterhead_page.mediabox.width, - height=letterhead_page.mediabox.height, - ) - merged.merge_page(letterhead_page) - merged.merge_page(page) - writer.add_page(merged) - - with output_pdf.open('wb') as fp: - writer.write(fp) + return pdf_rendering._overlay_with_letterhead(content_pdf, letterhead_pdf, output_pdf) def _generate_onboarding_pdf(request_obj: OnboardingRequest) -> Path: - lang = _normalized_lang(request_obj.preferred_language) - t = _pdf_texts(lang) - first_name, last_name = _split_name(request_obj.full_name) - safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_{request_obj.id}') - output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_letter_{safe_name}.pdf' - temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_{safe_name}.pdf' - - template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_template.html' - letterhead_path = get_portal_letterhead_path() - company_contact = get_company_contact_copy() - - devices = _split_multiline(request_obj.needed_devices) - software = _split_multiline(request_obj.needed_software) - accesses = _split_multiline(request_obj.needed_accesses) - groups = _split_multiline(request_obj.needed_workspace_groups) - resources = _split_multiline(request_obj.needed_resources) - group_mailboxes_list = _split_multiline(request_obj.group_mailboxes or '') - additional_hardware_list = _split_multiline(request_obj.additional_hardware or '') - additional_software_list = _split_multiline(request_obj.additional_software or '') - additional_access_list = _split_multiline(request_obj.additional_access_text or '') - - signature_src = '' - signature_note = t['not_available_short'] - if getattr(request_obj, 'signature_image', None): - try: - signature_path = Path(request_obj.signature_image.path).resolve() - with signature_path.open('rb') as sig_fp: - encoded = base64.b64encode(sig_fp.read()).decode('ascii') - mime_type = mimetypes.guess_type(signature_path.name)[0] or 'image/png' - signature_src = f"data:{mime_type};base64,{encoded}" - signature_note = 'Digital signature stored as image file.' if lang == 'en' else 'Digitale Signatur als Bilddatei hinterlegt.' - except Exception: - signature_src = '' - signature_note = request_obj.signature_url or t['not_available_short'] - elif request_obj.signature_url: - signature_note = request_obj.signature_url - - requester_email = request_obj.onboarded_by_email or t['not_available_short'] - requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or t['not_available_short'] - gender = (request_obj.get_gender_display() or t['not_available_short']).strip() or t['not_available_short'] - employment_type = (request_obj.employment_type or t['not_available_short']).strip() or t['not_available_short'] - employment_end = request_obj.employment_end_date or t['not_available_short'] - order_business_cards = bool(request_obj.order_business_cards) - group_mailboxes = (request_obj.group_mailboxes or '').strip() - additional_hardware_other = (request_obj.additional_hardware_other or '').strip() - additional_hardware = (request_obj.additional_hardware or '').strip() - additional_software = (request_obj.additional_software or '').strip() - additional_access_text = (request_obj.additional_access_text or '').strip() - successor_name = (request_obj.successor_name or '').strip() - additional_notes = (request_obj.additional_notes or '').strip() - phone_number = (request_obj.phone_number or '').strip() - display_name = f"{gender} {first_name} {last_name}".strip() if gender and gender != '-' else f"{first_name} {last_name}".strip() - - pdf_sections = build_pdf_sections('onboarding', request_obj, lang) - pdf_section_map = {section['key']: section for section in pdf_sections if section.get('has_content')} - pdf_field_map = {} - for section in pdf_sections: - for field in section.get('render_fields', []): - pdf_field_map[field['name']] = field - - def _field_text(name: str, fallback): - field = pdf_field_map.get(name) - if not field: - return fallback - value = field.get('display_value') - if isinstance(value, list): - return ' | '.join(value) if value else fallback - return value or fallback - - def _field_list(name: str) -> list[str]: - field = pdf_field_map.get(name) - if not field: - return [] - value = field.get('display_value') - if isinstance(value, list): - return value - text = str(value or '').strip() - return [text] if text else [] - - devices_visible = _field_list('needed_devices_multi') - groups_visible = _field_list('needed_workspace_groups_multi') - software_visible = _field_list('needed_software_multi') - accesses_visible = _field_list('needed_accesses_multi') - resources_visible = _field_list('needed_resources_multi') - group_mailboxes_visible = _field_list('group_mailboxes') - additional_hardware_visible = _field_list('additional_hardware_multi') - additional_software_visible = _field_list('additional_software_multi') - additional_access_visible = _field_list('additional_access_text') - job_title_visible = 'job_title' in pdf_field_map - itsetup_visible = 'itsetup' in pdf_section_map - - context = { - 'T': t, - 'PDF_LANG': lang, - 'PDF_SECTIONS': pdf_sections, - 'VORNAME': first_name, - 'NACHNAME': last_name, - 'DISPLAY_NAME': display_name or request_obj.full_name, - 'ANREDE': _field_text('gender', gender), - 'JOB_TITLE_VISIBLE': job_title_visible, - 'BERUFSBEZEICHNUNG': _field_text('job_title', t['not_available']), - 'ABTEILUNG': _field_text('department', t['not_available']), - 'EMAIL': _field_text('work_email', t['not_available']), - 'VERTRAGSBEGINN': _field_text('contract_start', request_obj.contract_start), - 'BESCHAEFTIGUNG': _field_text('employment_type', employment_type), - 'VERTRAGSENDE': _field_text('employment_end_date', employment_end), - 'UEBERGABEDATUM': _field_text('handover_date', request_obj.handover_date or t['not_available_short']), - 'ARBEITSGERAETE_TEXT': ' | '.join(devices_visible) if devices_visible else t['not_available'], - 'WORKSPACE_GROUPS_TEXT': ' | '.join(groups_visible) if groups_visible else t['not_available'], - 'SOFTWARE_TEXT': ' | '.join(software_visible) if software_visible else t['not_available'], - 'ZUGAENGE_TEXT': ' | '.join(accesses_visible) if accesses_visible else t['not_available'], - 'RESSOURCEN_TEXT': ' | '.join(resources_visible) if resources_visible else t['not_available'], - 'VISITENKARTE_BESTELLT': order_business_cards, - 'HAS_VISITENKARTE_DATEN': order_business_cards and ('business_card_name' in pdf_field_map or 'business_card_title' in pdf_field_map or 'business_card_email' in pdf_field_map or 'business_card_phone' in pdf_field_map) and any( - [ - _field_text('business_card_name', '').strip(), - _field_text('business_card_title', '').strip(), - _field_text('business_card_email', '').strip(), - _field_text('business_card_phone', '').strip(), - ] - ), - 'VISITENKARTE_NAME': _field_text('business_card_name', t['not_available_short']), - 'VISITENKARTE_TITEL': _field_text('business_card_title', t['not_available_short']), - 'VISITENKARTE_EMAIL': _field_text('business_card_email', t['not_available_short']), - 'VISITENKARTE_TELEFON': _field_text('business_card_phone', t['not_available_short']), - 'GROUP_MAILBOXES': _field_text('group_mailboxes', group_mailboxes or t['not_available']), - 'ADDITIONAL_HARDWARE_OTHER': _field_text('additional_hardware_other', additional_hardware_other or t['not_available']), - 'ADDITIONAL_HARDWARE': _field_text('additional_hardware_other', additional_hardware or t['not_available']), - 'ADDITIONAL_SOFTWARE': _field_text('additional_software', additional_software or t['not_available']), - 'ADDITIONAL_ACCESS_TEXT': _field_text('additional_access_text', additional_access_text or t['not_available']), - 'SUCCESSOR_NAME': _field_text('successor_name', successor_name or t['not_available']), - 'PHONE_NUMBER': _field_text('phone_number_choice', phone_number or t['not_available_short']), - 'INHERIT_PHONE_NUMBER': _field_text('inherit_phone_number_choice', t['yes'] if request_obj.inherit_phone_number else t['no']), - 'ADDITIONAL_NOTES': _field_text('additional_notes', additional_notes or t['not_available']), - 'GROUP_MAILBOXES_REQUIRED': 'group_mailboxes_required_choice' in pdf_field_map and bool(group_mailboxes_visible), - 'ADDITIONAL_HARDWARE_NEEDED': 'additional_hardware_needed_choice' in pdf_field_map and bool(additional_hardware_visible), - 'ADDITIONAL_SOFTWARE_NEEDED': 'additional_software_needed_choice' in pdf_field_map and bool(additional_software_visible), - 'ADDITIONAL_ACCESS_NEEDED': 'additional_access_needed_choice' in pdf_field_map and bool(additional_access_visible), - 'HAS_DEVICES': itsetup_visible and bool(devices_visible), - 'HAS_GROUPS': itsetup_visible and bool(groups_visible), - 'HAS_SOFTWARE': itsetup_visible and bool(software_visible), - 'HAS_ACCESSES': itsetup_visible and bool(accesses_visible), - 'HAS_RESOURCES': itsetup_visible and bool(resources_visible), - 'HAS_GROUP_MAILBOXES': bool(group_mailboxes_visible), - 'HAS_ADDITIONAL_HARDWARE': bool(additional_hardware_visible), - 'HAS_ADDITIONAL_SOFTWARE': bool(additional_software_visible), - 'HAS_ADDITIONAL_ACCESS': bool(additional_access_visible), - 'HAS_ADDITIONAL_HARDWARE_OTHER': bool(_field_text('additional_hardware_other', '').strip()), - 'HAS_SUCCESSOR_INFO': bool(_field_text('successor_name', '').strip()) or 'inherit_phone_number_choice' in pdf_field_map or bool(_field_text('phone_number_choice', '').strip()), - 'HAS_ADDITIONAL_NOTES': bool(_field_text('additional_notes', '').strip()), - 'GROUP_MAILBOXES_LIST': _chunk_list(group_mailboxes_visible), - 'ADDITIONAL_HARDWARE_LIST': _chunk_list(additional_hardware_visible), - 'ADDITIONAL_SOFTWARE_LIST': _chunk_list(additional_software_visible), - 'ADDITIONAL_ACCESS_LIST': _chunk_list(additional_access_visible), - 'ZUGAENGE_LIST': _chunk_list(groups_visible), - 'ARBEITSGERÄTE_LIST': _chunk_list(devices_visible), - 'SOFTWARE_LIST': _chunk_list(software_visible), - 'ACCOUNT_LIST': _chunk_list(accesses_visible), - 'STANDARD_RESSOURCEN': _chunk_list(resources_visible), - 'UNTERSCHRIFT': signature_src, - 'UNTERSCHRIFT_HINWEIS': signature_note, - 'REQUESTED_BY_NAME': requester_name, - 'REQUESTED_BY_EMAIL': requester_email, - 'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'], - 'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'], - 'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'], - 'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'], - 'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'], - } - - html = _render_html(template_path, context) - _generate_content_pdf(html, temp_pdf) - _overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf) - - if temp_pdf.exists(): - temp_pdf.unlink(missing_ok=True) - return output_pdf + return pdf_rendering._generate_onboarding_pdf(request_obj) def _generate_onboarding_intro_pdf(request_obj: OnboardingRequest, language_code: str | None = None) -> Path: - lang = _normalized_lang(language_code or request_obj.preferred_language) - t = _pdf_texts(lang) - safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_intro_{request_obj.id}') - output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_intro_{safe_name}.pdf' - temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_intro_{safe_name}.pdf' - - template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_intro_template.html' - letterhead_path = get_portal_letterhead_path() - company_contact = get_company_contact_copy() - - salutation = (request_obj.get_gender_display() or '').strip() - display_name = f"{salutation} {request_obj.full_name}".strip() if salutation else request_obj.full_name - intro_sections = [ - { - 'title': section['title'], - 'rows': _chunk_list([item['label'] for item in section['items']], chunk_size=2), - } - for section in build_intro_sections_for_request(request_obj, language_code=language_code) - ] - - requester_email = request_obj.onboarded_by_email or '-' - requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-' - - context = { - 'T': t, - 'DISPLAY_NAME': display_name, - 'ABTEILUNG': request_obj.department or t['not_available_short'], - 'BERUFSBEZEICHNUNG': request_obj.job_title or t['not_available_short'], - 'VERTRAGSBEGINN': request_obj.contract_start, - 'EMAIL': request_obj.work_email or t['not_available_short'], - 'REQUESTED_BY_NAME': requester_name, - 'REQUESTED_BY_EMAIL': requester_email, - 'INTRO_SECTIONS': intro_sections, - 'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'], - 'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'], - 'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'], - 'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'], - 'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'], - } - - html = _render_html(template_path, context) - _generate_content_pdf(html, temp_pdf) - _overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf) - - if temp_pdf.exists(): - temp_pdf.unlink(missing_ok=True) - return output_pdf + return pdf_rendering._generate_onboarding_intro_pdf(request_obj, language_code=language_code) def _generate_onboarding_intro_session_pdf( @@ -1144,147 +523,15 @@ def _generate_onboarding_intro_session_pdf( admin_signature_name: str = '-', language_code: str | None = None, ) -> Path: - request_obj = session.onboarding_request - lang = _normalized_lang(language_code or request_obj.preferred_language) - t = _pdf_texts(lang) - safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_intro_session_{request_obj.id}') - version = timezone.now().strftime('%Y%m%d%H%M%S') - output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_intro_session_{safe_name}_{version}.pdf' - temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_intro_session_{safe_name}_{version}.pdf' - - template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_intro_session_pdf.html' - letterhead_path = get_portal_letterhead_path() - company_contact = get_company_contact_copy() - - salutation = (request_obj.get_gender_display() or '').strip() - display_name = f"{salutation} {request_obj.full_name}".strip() if salutation else request_obj.full_name - - raw_sections = build_intro_sections_for_request(request_obj, language_code=language_code) - checked_map = session.checklist_state or {} - exported_sections = [] - checked_count = 0 - total_count = 0 - for section in raw_sections: - checked_items = [] - for item in section['items']: - checked = bool(checked_map.get(item['id'])) - total_count += 1 - if checked: - checked_count += 1 - checked_items.append({'label': item['label']}) - if checked_items: - exported_sections.append({ - 'title': section['title'], - 'rows': [checked_items[i:i + 2] for i in range(0, len(checked_items), 2)], - }) - - requester_email = request_obj.onboarded_by_email or '-' - requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-' - - context = { - 'T': t, - 'DISPLAY_NAME': display_name, - 'ABTEILUNG': request_obj.department or t['not_available_short'], - 'BERUFSBEZEICHNUNG': request_obj.job_title or t['not_available_short'], - 'VERTRAGSBEGINN': request_obj.contract_start, - 'EMAIL': request_obj.work_email or t['not_available_short'], - 'REQUESTED_BY_NAME': requester_name, - 'REQUESTED_BY_EMAIL': requester_email, - 'SESSION_STATUS': session.get_status_display(), - 'SESSION_COMPLETED_BY': session.completed_by_name or '-', - 'SESSION_COMPLETED_AT': session.completed_at or '-', - 'SESSION_UPDATED_AT': session.updated_at, - 'SESSION_NOTES': session.notes or t['not_available_short'], - 'INTRO_SECTIONS': exported_sections, - 'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'], - 'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'], - 'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'], - 'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'], - 'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'], - } - - html = _render_html(template_path, context) - _generate_content_pdf(html, temp_pdf) - _overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf) - - if temp_pdf.exists(): - temp_pdf.unlink(missing_ok=True) - return output_pdf + return pdf_rendering._generate_onboarding_intro_session_pdf( + session, + admin_signature_name=admin_signature_name, + language_code=language_code, + ) def _generate_offboarding_pdf(request_obj: OffboardingRequest) -> Path: - lang = _normalized_lang(request_obj.preferred_language) - t = _pdf_texts(lang) - safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'offboarding_{request_obj.id}') - output_pdf = settings.PDF_OUTPUT_DIR / f'offboarding_letter_{safe_name}.pdf' - temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_offboarding_{safe_name}.pdf' - - template_path = settings.PDF_TEMPLATES_DIR / 'offboarding_template.html' - letterhead_path = get_portal_letterhead_path() - company_contact = get_company_contact_copy() - latest_onboarding = ( - OnboardingRequest.objects.filter(work_email=request_obj.work_email) - .order_by('-created_at') - .first() - ) - has_onboarding_data = latest_onboarding is not None - onboarding_hardware = _split_multiline(latest_onboarding.needed_devices) if latest_onboarding else [] - selected_set = {item.lower() for item in onboarding_hardware} - hardware_catalog = [ - 'Laptop', - 'Docking-Station', - 'Tastatur und Maus', - 'Kopfhörer', - 'Tragetasche', - 'Monitor', - 'Schlüssel', - 'Tischtelefon', - ] - checklist = [{'label': item, 'selected': item.lower() in selected_set} for item in hardware_catalog] - extra_selected = [item for item in onboarding_hardware if item.lower() not in {x.lower() for x in hardware_catalog}] - for item in extra_selected: - checklist.append({'label': item, 'selected': True}) - - requester_email = request_obj.requested_by_email or t['not_available_short'] - requester_name = request_obj.requested_by_name or _resolve_user_display_name(request_obj.requested_by_email) or t['not_available_short'] - - context = { - 'T': t, - 'PDF_LANG': lang, - 'PDF_SECTIONS': build_pdf_sections('offboarding', request_obj, lang), - 'FULL_NAME': request_obj.full_name, - 'EMAIL': request_obj.work_email, - 'DEPARTMENT': request_obj.department or t['not_available_short'], - 'JOB_TITLE': request_obj.job_title or t['not_available_short'], - 'LAST_WORKING_DAY': request_obj.last_working_day, - 'NOTES': request_obj.notes or t['not_available_short'], - 'REQUESTED_BY': requester_email, - 'REQUESTED_BY_NAME': requester_name, - 'HAS_ONBOARDING_DATA': has_onboarding_data, - 'ONBOARDING_HARDWARE': onboarding_hardware, - 'HARDWARE_CHECKLIST': checklist, - 'MANUAL_FIELD_SECTIONS': _manual_onboarding_field_sections(), - 'MANUAL_DEVICES': _chunk_choice_labels(DEVICE_CHOICES), - 'MANUAL_SOFTWARE': _chunk_choice_labels(SOFTWARE_CHOICES), - 'MANUAL_ACCESSES': _chunk_choice_labels(ACCESS_CHOICES), - 'MANUAL_WORKSPACE_GROUPS': _chunk_choice_labels(WORKSPACE_GROUP_CHOICES), - 'MANUAL_RESOURCES': _chunk_choice_labels(RESOURCE_CHOICES), - 'MANUAL_EXTRA_HARDWARE': _chunk_choice_labels(HARDWARE_EXTRA_CHOICES), - 'MANUAL_EXTRA_SOFTWARE': _chunk_choice_labels(SOFTWARE_EXTRA_CHOICES), - 'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'], - 'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'], - 'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'], - 'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'], - 'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'], - } - - html = _render_html(template_path, context) - _generate_content_pdf(html, temp_pdf) - _overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf) - - if temp_pdf.exists(): - temp_pdf.unlink(missing_ok=True) - return output_pdf + return pdf_rendering._generate_offboarding_pdf(request_obj) @shared_task diff --git a/backend/workflows/view_audit.py b/backend/workflows/view_audit.py new file mode 100644 index 0000000..3480907 --- /dev/null +++ b/backend/workflows/view_audit.py @@ -0,0 +1,82 @@ +from django.utils.translation import gettext as _ + +from .models import AdminAuditLog + + +def display_user_name(user) -> str: + first_name = (getattr(user, 'first_name', '') or '').strip() + last_name = (getattr(user, 'last_name', '') or '').strip() + full_name = f'{first_name} {last_name}'.strip() + if full_name: + return full_name + username = (getattr(user, 'username', '') or '').strip() + if username: + return username + return (getattr(user, 'email', '') or '').strip() + + +def audit( + request, + action: str, + *, + target_type: str = '', + target_id: int | None = None, + target_label: str = '', + details: dict | None = None, +) -> None: + if not getattr(request, 'user', None) or not request.user.is_authenticated: + return + AdminAuditLog.objects.create( + actor=request.user, + actor_display=display_user_name(request.user), + action=action, + target_type=target_type, + target_id=target_id, + target_label=target_label, + details=details or {}, + ) + + +def audit_action_label(action: str) -> str: + labels = { + 'requests_deleted': _('Vorgänge gelöscht'), + 'request_deleted': _('Vorgang gelöscht'), + 'request_retried': _('Vorgang erneut angestoßen'), + 'intro_pdf_generated': _('Einweisungs-PDF erzeugt'), + 'intro_live_pdf_generated': _('Live-Protokoll erzeugt'), + 'intro_session_reset': _('Einweisung zurückgesetzt'), + 'intro_session_saved': _('Einweisung als Entwurf gespeichert'), + 'intro_session_completed': _('Einweisung abgeschlossen'), + 'form_option_deleted': _('Formularoption gelöscht'), + 'form_options_saved': _('Formularoptionen gespeichert'), + 'form_field_texts_saved': _('Feldtexte gespeichert'), + 'form_layout_saved': _('Formularlayout gespeichert'), + 'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'), + 'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'), + 'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'), + 'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'), + 'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'), + 'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'), + 'welcome_email_paused': _('Welcome E-Mail pausiert'), + 'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'), + 'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'), + 'smtp_test_sent': _('SMTP-Test gesendet'), + 'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'), + 'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'), + 'email_mode_toggled': _('E-Mail-Modus umgeschaltet'), + 'integrations_saved': _('Integrationen gespeichert'), + 'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'), + 'mail_settings_saved': _('Mail-Einstellungen gespeichert'), + 'email_routing_saved': _('E-Mail-Routing gespeichert'), + 'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'), + 'user_created': _('Benutzer erstellt'), + 'user_updated': _('Benutzer aktualisiert'), + 'user_password_reset_sent': _('Passwort-Reset-Link versendet'), + 'user_deleted': _('Benutzer gelöscht'), + 'backup_created': _('Backup erstellt'), + 'backup_verified': _('Backup verifiziert'), + 'backup_deleted': _('Backup gelöscht'), + 'backup_settings_saved': _('Backup-Einstellungen gespeichert'), + 'portal_app_registry_saved': _('App-Registry gespeichert'), + } + return labels.get(action, action.replace('_', ' ').strip().capitalize()) diff --git a/backend/workflows/view_context.py b/backend/workflows/view_context.py new file mode 100644 index 0000000..732e005 --- /dev/null +++ b/backend/workflows/view_context.py @@ -0,0 +1,105 @@ +from datetime import timedelta + +from django.db.models import Count +from django.utils import timezone +from django.utils.translation import get_language, gettext as _, override + +from .backup_ops import latest_backup_health_snapshot +from .form_builder import get_custom_field_configs +from .forms import OffboardingRequestForm, OnboardingRequestForm +from .models import AsyncTaskLog, OffboardingRequest, OnboardingRequest +from .roles import user_has_capability + + +def form_field_labels(form_type: str) -> dict[str, str]: + if form_type == 'onboarding': + return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()} + if form_type == 'offboarding': + return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()} + return {} + + +def request_target_label(obj, kind: str | None = None) -> str: + request_kind = (kind or '').strip() + if not request_kind: + request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding' + name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}' + email = (getattr(obj, 'work_email', '') or '').strip() + created_at = getattr(obj, 'created_at', None) + date_label = created_at.strftime('%Y-%m-%d') if created_at else '' + parts = [request_kind.capitalize(), name] + if email: + parts.append(f'<{email}>') + if date_label: + parts.append(date_label) + return ' | '.join(parts) + + +def request_status_label(status_key: str, language_code: str | None = None) -> str: + lang = ((language_code or 'de').split('-')[0] or 'de').lower() + with override(lang): + labels = { + 'submitted': _('Eingereicht'), + 'processing': _('In Bearbeitung'), + 'completed': _('Abgeschlossen'), + 'failed': _('Fehlgeschlagen'), + } + return labels.get(status_key, status_key) + + +def request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]: + form_type = 'onboarding' if kind == 'onboarding' else 'offboarding' + language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower() + values = getattr(obj, 'custom_field_values', {}) or {} + rows = [] + yes_label = 'Ja' if language_code == 'de' else 'Yes' + for cfg in get_custom_field_configs(form_type, include_inactive=True): + raw_value = values.get(cfg.field_key) + if raw_value in (None, '', False, []): + continue + if isinstance(raw_value, bool): + display_value = str(yes_label) if raw_value else '' + elif isinstance(raw_value, list): + display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip()) + else: + display_value = str(raw_value).strip() + if not display_value: + continue + rows.append( + { + 'label': cfg.translated_label(language_code), + 'value': display_value, + 'section': cfg.section_key, + 'sort_order': cfg.sort_order, + } + ) + rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label'])) + return rows + + +def ops_summary_for_user(user) -> dict[str, object]: + can_view_jobs = user_has_capability(user, 'view_job_monitor') + can_manage_backups = user_has_capability(user, 'manage_backups') + summary: dict[str, object] = { + 'show': can_view_jobs or can_manage_backups, + 'can_view_jobs': can_view_jobs, + 'can_manage_backups': can_manage_backups, + 'failed_count_24h': 0, + 'started_count_24h': 0, + 'success_count_24h': 0, + 'recent_failed_logs': [], + 'backup_health': latest_backup_health_snapshot() if can_manage_backups else None, + } + if not can_view_jobs: + return summary + + since = timezone.now() - timedelta(hours=24) + logs = AsyncTaskLog.objects.filter(started_at__gte=since) + counts = {row['status']: row['count'] for row in logs.values('status').annotate(count=Count('id'))} + summary['failed_count_24h'] = counts.get('failed', 0) + summary['started_count_24h'] = counts.get('started', 0) + summary['success_count_24h'] = counts.get('succeeded', 0) + summary['recent_failed_logs'] = list( + AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5] + ) + return summary diff --git a/backend/workflows/view_form_runtime.py b/backend/workflows/view_form_runtime.py new file mode 100644 index 0000000..f3182bb --- /dev/null +++ b/backend/workflows/view_form_runtime.py @@ -0,0 +1,231 @@ +from django.utils.translation import gettext as _, gettext_lazy + +from .form_builder import ( + LOCKED_SECTION_RULES, + OFFBOARDING_PAGE_ORDER, + ensure_form_conditional_rule_configs, + get_section_definitions, +) + +ONBOARDING_GROUPS = { + 'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'], + 'employment-end-box': ['employment_end_date'], + 'group-mailboxes-box': ['group_mailboxes'], + 'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'], + 'extra-software-box': ['additional_software_multi', 'additional_software'], + 'extra-access-box': ['additional_access_text'], + 'successor-box': ['successor_name', 'inherit_phone_number_choice'], +} + +ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'} +ONBOARDING_CHECKBOX_LISTS = { + 'needed_devices_multi', + 'additional_hardware_multi', + 'needed_software_multi', + 'additional_software_multi', + 'needed_accesses_multi', + 'needed_workspace_groups_multi', + 'needed_resources_multi', +} + +CONDITIONAL_RULE_OPERATOR_CHOICES = [ + ('checked', _('ist aktiviert')), + ('equals', _('ist gleich')), + ('not_equals', _('ist nicht gleich')), +] + +ONBOARDING_SECTION_META = { + 'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')}, + 'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')}, + 'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')}, + 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')}, +} + +OFFBOARDING_SECTION_META = { + 'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')}, + 'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')}, + 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')}, +} + + +def field_rule_summary(*, is_visible: bool, is_required, locked: bool) -> str: + if locked: + return str(_('Fixes Kernfeld, immer sichtbar.')) + if not is_visible: + return str(_('Ausgeblendet, erscheint nicht im Formular.')) + if is_required is True: + return str(_('Sichtbar und als Pflichtfeld markiert.')) + if is_required is False: + return str(_('Sichtbar und optional.')) + return str(_('Sichtbar mit Standardverhalten.')) + + +def conditional_clause_sentence(clause: dict, field_label_map: dict[str, str]) -> str: + field_name = (clause.get('field') or '').strip() + operator = (clause.get('operator') or '').strip() + value = clause.get('value') + if not field_name or not operator: + return '' + field_label = field_label_map.get(field_name, field_name) + if operator == 'checked': + return _('%(field)s ist aktiviert') % {'field': field_label} + if operator == 'equals': + if value not in (None, ''): + return _('%(field)s ist gleich %(value)s') % {'field': field_label, 'value': value} + return _('%(field)s ist gleich') % {'field': field_label} + if operator == 'not_equals': + if value not in (None, ''): + return _('%(field)s ist nicht gleich %(value)s') % {'field': field_label, 'value': value} + return _('%(field)s ist nicht gleich') % {'field': field_label} + return _('%(field)s erfüllt die Bedingung') % {'field': field_label} + + +def conditional_rule_summary(clauses: list[dict], field_label_map: dict[str, str]) -> str: + active_clauses = [clause for clause in clauses if clause.get('field') and clause.get('operator')] + if not active_clauses: + return str(_('Immer sichtbar.')) + parts = [str(conditional_clause_sentence(clause, field_label_map)) for clause in active_clauses] + return str(_('Sichtbar, wenn %(conditions)s.') % {'conditions': ' und '.join(parts)}) + + +def normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]: + configs = ensure_form_conditional_rule_configs(form_type) + payload = {} + for target_key, cfg in configs.items(): + if not cfg.is_active: + continue + clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')] + if clauses: + payload[target_key] = {'all': clauses} + return payload + + +def active_conditional_target_keys(form_type: str) -> set[str]: + return set(normalized_conditional_rule_payload(form_type).keys()) + + +def translate_choice_list(choices): + return [(value, str(label)) for value, label in choices] + + +def build_onboarding_layout(form) -> list[dict]: + ordered_names = list(form.fields.keys()) + group_by_field = {} + for group_id, group_fields in ONBOARDING_GROUPS.items(): + for name in group_fields: + group_by_field[name] = group_id + conditional_target_keys = active_conditional_target_keys('onboarding') + + rendered_groups = set() + consumed = set() + blocks = [] + + for field_name in ordered_names: + if field_name in consumed: + continue + + group_id = group_by_field.get(field_name) + if group_id: + if group_id in rendered_groups: + continue + group_fields = [form[name] for name in ONBOARDING_GROUPS[group_id] if name in form.fields] + if not group_fields: + continue + blocks.append( + { + 'kind': 'group', + 'id': group_id, + 'hidden_default': group_id in conditional_target_keys, + 'fields': group_fields, + } + ) + rendered_groups.add(group_id) + consumed.update([f.name for f in group_fields]) + continue + + if field_name.startswith('custom__') and field_name in conditional_target_keys: + blocks.append( + { + 'kind': 'group', + 'id': field_name, + 'hidden_default': True, + 'fields': [form[field_name]], + } + ) + consumed.add(field_name) + continue + + blocks.append({'kind': 'field', 'field': form[field_name]}) + consumed.add(field_name) + + return blocks + + +def section_for_block(block: dict, field_pages: dict[str, str]) -> str: + if block['kind'] == 'field': + return field_pages.get(block['field'].name, 'abschluss') + fields = block.get('fields') or [] + if not fields: + return 'abschluss' + return field_pages.get(fields[0].name, 'abschluss') + + +def build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]: + section_defs = get_section_definitions('onboarding') + section_order = [item['key'] for item in section_defs] + section_titles = {item['key']: item['title'] for item in section_defs} + grouped = {key: [] for key in section_order} + for block in blocks: + section_key = section_for_block(block, field_pages) + if section_key not in grouped: + section_key = 'abschluss' + grouped[section_key].append(block) + visible_keys = visible_section_keys or set(section_order) + sections = [] + custom_section_keys = {item['key'] for item in section_defs if item.get('is_custom')} + for key in section_order: + if key not in visible_keys: + continue + blocks_for_section = grouped[key] + has_custom_checkbox_fields = False + for block in blocks_for_section: + candidate_fields = [block['field']] if block['kind'] == 'field' else (block.get('fields') or []) + for bound_field in candidate_fields: + widget_type = getattr(getattr(bound_field.field, 'widget', None), 'input_type', '') + if bound_field.name.startswith('custom__') and widget_type == 'checkbox': + has_custom_checkbox_fields = True + break + if has_custom_checkbox_fields: + break + sections.append( + { + 'key': key, + 'title': section_titles.get(key, ONBOARDING_SECTION_META.get(key, {}).get('title', key)), + 'subtitle': ONBOARDING_SECTION_META.get(key, {}).get('subtitle', ''), + 'blocks': blocks_for_section, + 'is_custom': key in custom_section_keys, + 'has_custom_checkbox_fields': has_custom_checkbox_fields, + } + ) + return sections + + +def build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]: + field_pages = getattr(form, '_field_page_keys', {}) + grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER} + for field_name in form.fields.keys(): + section_key = field_pages.get(field_name, 'abschluss') + if section_key not in grouped: + section_key = 'abschluss' + grouped[section_key].append(form[field_name]) + visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER) + return [ + { + 'key': key, + 'title': OFFBOARDING_SECTION_META[key]['title'], + 'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'], + 'fields': grouped[key], + } + for key in OFFBOARDING_PAGE_ORDER + if key in visible_keys and grouped[key] + ] diff --git a/backend/workflows/view_permissions.py b/backend/workflows/view_permissions.py new file mode 100644 index 0000000..81fffbe --- /dev/null +++ b/backend/workflows/view_permissions.py @@ -0,0 +1,23 @@ +from functools import wraps + +from django.contrib import messages +from django.contrib.auth.decorators import login_required +from django.shortcuts import redirect +from django.utils.translation import gettext as _ + +from .roles import user_has_capability + + +def require_capability(capability: str): + def decorator(view_func): + @wraps(view_func) + @login_required + def wrapped(request, *args, **kwargs): + if not user_has_capability(request.user, capability): + messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) + return redirect('home') + return view_func(request, *args, **kwargs) + + return wrapped + + return decorator diff --git a/backend/workflows/views.py b/backend/workflows/views.py index a2573cf..d6ff995 100644 --- a/backend/workflows/views.py +++ b/backend/workflows/views.py @@ -4,8 +4,6 @@ from datetime import timedelta from tempfile import NamedTemporaryFile import json from io import BytesIO -from functools import wraps - from celery import current_app from django.conf import settings from django.db import connection @@ -23,8 +21,7 @@ from django.views.decorators.csrf import ensure_csrf_cookie from django.utils import timezone from django.utils.encoding import force_bytes from django.utils.http import urlsafe_base64_encode -from django.utils.translation import gettext as _, gettext_lazy -from django.utils.translation import get_language, override +from django.utils.translation import gettext as _ from django.urls import reverse from .app_registry import build_portal_app_sections, get_portal_app_registry_rows, normalize_portal_app_sort_orders @@ -69,6 +66,28 @@ from .observability_views import ( job_monitor_page_impl, verify_backup_from_admin_impl, ) +from .view_audit import audit as _audit, audit_action_label as _audit_action_label, display_user_name as _display_user_name +from .view_context import ( + form_field_labels as _form_field_labels, + request_custom_field_details as _request_custom_field_details, + request_status_label as _request_status_label, + request_target_label as _request_target_label, +) +from .view_form_runtime import ( + CONDITIONAL_RULE_OPERATOR_CHOICES, + ONBOARDING_CHECKBOX_LISTS, + ONBOARDING_GROUPS, + ONBOARDING_INLINE_CHECKS, + active_conditional_target_keys as _active_conditional_target_keys, + build_offboarding_sections as _build_offboarding_sections, + build_onboarding_layout as _build_onboarding_layout, + build_onboarding_sections as _build_onboarding_sections, + conditional_rule_summary as _conditional_rule_summary, + field_rule_summary as _field_rule_summary, + normalized_conditional_rule_payload as _normalized_conditional_rule_payload, + translate_choice_list as _translate_choice_list, +) +from .view_permissions import require_capability as _require_capability from .models import AdminAuditLog, AsyncTaskLog, EmployeeProfile, FormConditionalRuleConfig, FormCustomFieldConfig, FormCustomSectionConfig, FormFieldConfig, FormOption, FormSectionConfig, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, PortalAppConfig, PortalBranding, PortalCompanyConfig, PortalTrialConfig, ScheduledWelcomeEmail, SystemEmailConfig, UserNotification, UserProfile, WorkflowConfig from .emailing import send_system_email from .notifications import notify_user @@ -109,96 +128,6 @@ def mark_all_notifications_read(request): UserNotification.objects.filter(user=request.user, read_at__isnull=True).update(read_at=timezone.now()) return _redirect_back(request, 'home') -ONBOARDING_GROUPS = { - 'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'], - 'employment-end-box': ['employment_end_date'], - 'group-mailboxes-box': ['group_mailboxes'], - 'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'], - 'extra-software-box': ['additional_software_multi', 'additional_software'], - 'extra-access-box': ['additional_access_text'], - 'successor-box': ['successor_name', 'inherit_phone_number_choice'], -} - -ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'} -ONBOARDING_CHECKBOX_LISTS = { - 'needed_devices_multi', - 'additional_hardware_multi', - 'needed_software_multi', - 'additional_software_multi', - 'needed_accesses_multi', - 'needed_workspace_groups_multi', - 'needed_resources_multi', -} -ONBOARDING_SECTION_META = { - 'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')}, - 'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')}, - 'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')}, - 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')}, -} - -CONDITIONAL_RULE_OPERATOR_CHOICES = [ - ('checked', _('ist aktiviert')), - ('equals', _('ist gleich')), - ('not_equals', _('ist nicht gleich')), -] - - -def _field_rule_summary(*, is_visible: bool, is_required, locked: bool) -> str: - if locked: - return str(_('Fixes Kernfeld, immer sichtbar.')) - if not is_visible: - return str(_('Ausgeblendet, erscheint nicht im Formular.')) - if is_required is True: - return str(_('Sichtbar und als Pflichtfeld markiert.')) - if is_required is False: - return str(_('Sichtbar und optional.')) - return str(_('Sichtbar mit Standardverhalten.')) - - -def _conditional_clause_sentence(clause: dict, field_label_map: dict[str, str]) -> str: - field_name = (clause.get('field') or '').strip() - operator = (clause.get('operator') or '').strip() - value = clause.get('value') - if not field_name or not operator: - return '' - field_label = field_label_map.get(field_name, field_name) - if operator == 'checked': - return _('%(field)s ist aktiviert') % {'field': field_label} - if operator == 'equals': - if value not in (None, ''): - return _('%(field)s ist gleich %(value)s') % {'field': field_label, 'value': value} - return _('%(field)s ist gleich') % {'field': field_label} - if operator == 'not_equals': - if value not in (None, ''): - return _('%(field)s ist nicht gleich %(value)s') % {'field': field_label, 'value': value} - return _('%(field)s ist nicht gleich') % {'field': field_label} - return _('%(field)s erfüllt die Bedingung') % {'field': field_label} - - -def _conditional_rule_summary(clauses: list[dict], field_label_map: dict[str, str]) -> str: - active_clauses = [clause for clause in clauses if clause.get('field') and clause.get('operator')] - if not active_clauses: - return str(_('Immer sichtbar.')) - parts = [str(_conditional_clause_sentence(clause, field_label_map)) for clause in active_clauses] - return str(_('Sichtbar, wenn %(conditions)s.') % {'conditions': ' und '.join(parts)}) - - -def _normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]: - configs = ensure_form_conditional_rule_configs(form_type) - payload = {} - for target_key, cfg in configs.items(): - if not cfg.is_active: - continue - clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')] - if clauses: - payload[target_key] = {'all': clauses} - return payload - - -def _active_conditional_target_keys(form_type: str) -> set[str]: - return set(_normalized_conditional_rule_payload(form_type).keys()) - - def healthz(request): db_ok = True try: @@ -233,335 +162,6 @@ def account_profile_page(request): return account_views.account_profile_page_impl(request) -def _require_capability(capability: str): - def decorator(view_func): - @wraps(view_func) - @login_required - def wrapped(request, *args, **kwargs): - if not user_has_capability(request.user, capability): - messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) - return redirect('home') - return view_func(request, *args, **kwargs) - - return wrapped - - return decorator - - -def _display_user_name(user) -> str: - first_name = (getattr(user, 'first_name', '') or '').strip() - last_name = (getattr(user, 'last_name', '') or '').strip() - full_name = f'{first_name} {last_name}'.strip() - if full_name: - return full_name - username = (getattr(user, 'username', '') or '').strip() - if username: - return username - return (getattr(user, 'email', '') or '').strip() - - -def _audit( - request, - action: str, - *, - target_type: str = '', - target_id: int | None = None, - target_label: str = '', - details: dict | None = None, -) -> None: - if not getattr(request, 'user', None) or not request.user.is_authenticated: - return - AdminAuditLog.objects.create( - actor=request.user, - actor_display=_display_user_name(request.user), - action=action, - target_type=target_type, - target_id=target_id, - target_label=target_label, - details=details or {}, - ) - - -def _form_field_labels(form_type: str) -> dict[str, str]: - if form_type == 'onboarding': - return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()} - if form_type == 'offboarding': - return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()} - return {} - - -def _request_target_label(obj, kind: str | None = None) -> str: - request_kind = (kind or '').strip() - if not request_kind: - request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding' - name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}' - email = (getattr(obj, 'work_email', '') or '').strip() - created_at = getattr(obj, 'created_at', None) - date_label = created_at.strftime('%Y-%m-%d') if created_at else '' - parts = [request_kind.capitalize(), name] - if email: - parts.append(f'<{email}>') - if date_label: - parts.append(date_label) - return ' | '.join(parts) - - -def _request_status_label(status_key: str, language_code: str | None = None) -> str: - lang = ((language_code or 'de').split('-')[0] or 'de').lower() - with override(lang): - labels = { - 'submitted': _('Eingereicht'), - 'processing': _('In Bearbeitung'), - 'completed': _('Abgeschlossen'), - 'failed': _('Fehlgeschlagen'), - } - return labels.get(status_key, status_key) - - -def _request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]: - form_type = 'onboarding' if kind == 'onboarding' else 'offboarding' - language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower() - values = getattr(obj, 'custom_field_values', {}) or {} - rows = [] - yes_label = 'Ja' if language_code == 'de' else 'Yes' - for cfg in get_custom_field_configs(form_type, include_inactive=True): - raw_value = values.get(cfg.field_key) - if raw_value in (None, '', False, []): - continue - if isinstance(raw_value, bool): - display_value = str(yes_label) if raw_value else '' - elif isinstance(raw_value, list): - display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip()) - else: - display_value = str(raw_value).strip() - if not display_value: - continue - rows.append( - { - 'label': cfg.translated_label(language_code), - 'value': display_value, - 'section': cfg.section_key, - 'sort_order': cfg.sort_order, - } - ) - rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label'])) - return rows - - -def _audit_action_label(action: str) -> str: - labels = { - 'requests_deleted': _('Vorgänge gelöscht'), - 'request_deleted': _('Vorgang gelöscht'), - 'request_retried': _('Vorgang erneut angestoßen'), - 'intro_pdf_generated': _('Einweisungs-PDF erzeugt'), - 'intro_live_pdf_generated': _('Live-Protokoll erzeugt'), - 'intro_session_reset': _('Einweisung zurückgesetzt'), - 'intro_session_saved': _('Einweisung als Entwurf gespeichert'), - 'intro_session_completed': _('Einweisung abgeschlossen'), - 'form_option_deleted': _('Formularoption gelöscht'), - 'form_options_saved': _('Formularoptionen gespeichert'), - 'form_field_texts_saved': _('Feldtexte gespeichert'), - 'form_layout_saved': _('Formularlayout gespeichert'), - 'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'), - 'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'), - 'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'), - 'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'), - 'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'), - 'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'), - 'welcome_email_paused': _('Welcome E-Mail pausiert'), - 'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'), - 'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'), - 'smtp_test_sent': _('SMTP-Test gesendet'), - 'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'), - 'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'), - 'email_mode_toggled': _('E-Mail-Modus umgeschaltet'), - 'integrations_saved': _('Integrationen gespeichert'), - 'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'), - 'mail_settings_saved': _('Mail-Einstellungen gespeichert'), - 'email_routing_saved': _('E-Mail-Routing gespeichert'), - 'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'), - 'user_created': _('Benutzer erstellt'), - 'user_updated': _('Benutzer aktualisiert'), - 'user_password_reset_sent': _('Passwort-Reset-Link versendet'), - 'user_deleted': _('Benutzer gelöscht'), - 'backup_created': _('Backup erstellt'), - 'backup_verified': _('Backup verifiziert'), - 'backup_deleted': _('Backup gelöscht'), - 'backup_settings_saved': _('Backup-Einstellungen gespeichert'), - 'portal_app_registry_saved': _('App-Registry gespeichert'), - } - return labels.get(action, action.replace('_', ' ').strip().capitalize()) - - -def _translate_choice_list(choices): - return [(value, str(label)) for value, label in choices] - - -def _build_onboarding_layout(form) -> list[dict]: - ordered_names = list(form.fields.keys()) - group_by_field = {} - for group_id, group_fields in ONBOARDING_GROUPS.items(): - for name in group_fields: - group_by_field[name] = group_id - conditional_target_keys = _active_conditional_target_keys('onboarding') - - rendered_groups = set() - consumed = set() - blocks = [] - - for field_name in ordered_names: - if field_name in consumed: - continue - - group_id = group_by_field.get(field_name) - if group_id: - if group_id in rendered_groups: - continue - group_fields = [ - form[name] - for name in ONBOARDING_GROUPS[group_id] - if name in form.fields - ] - if not group_fields: - continue - blocks.append( - { - 'kind': 'group', - 'id': group_id, - 'hidden_default': group_id in conditional_target_keys, - 'fields': group_fields, - } - ) - rendered_groups.add(group_id) - consumed.update([f.name for f in group_fields]) - continue - - if field_name.startswith('custom__') and field_name in conditional_target_keys: - blocks.append( - { - 'kind': 'group', - 'id': field_name, - 'hidden_default': True, - 'fields': [form[field_name]], - } - ) - consumed.add(field_name) - continue - - blocks.append({'kind': 'field', 'field': form[field_name]}) - consumed.add(field_name) - - return blocks - - -def _section_for_block(block: dict, field_pages: dict[str, str]) -> str: - if block['kind'] == 'field': - return field_pages.get(block['field'].name, 'abschluss') - fields = block.get('fields') or [] - if not fields: - return 'abschluss' - return field_pages.get(fields[0].name, 'abschluss') - - -def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]: - section_defs = get_section_definitions('onboarding') - section_order = [item['key'] for item in section_defs] - section_titles = {item['key']: item['title'] for item in section_defs} - grouped = {key: [] for key in section_order} - for block in blocks: - section_key = _section_for_block(block, field_pages) - if section_key not in grouped: - section_key = 'abschluss' - grouped[section_key].append(block) - visible_keys = visible_section_keys or set(section_order) - sections = [] - custom_section_keys = {item['key'] for item in section_defs if item.get('is_custom')} - for key in section_order: - if key not in visible_keys: - continue - blocks_for_section = grouped[key] - has_custom_checkbox_fields = False - for block in blocks_for_section: - candidate_fields = [block['field']] if block['kind'] == 'field' else (block.get('fields') or []) - for bound_field in candidate_fields: - widget_type = getattr(getattr(bound_field.field, 'widget', None), 'input_type', '') - if bound_field.name.startswith('custom__') and widget_type == 'checkbox': - has_custom_checkbox_fields = True - break - if has_custom_checkbox_fields: - break - sections.append( - { - 'key': key, - 'title': section_titles.get(key, ONBOARDING_SECTION_META.get(key, {}).get('title', key)), - 'subtitle': ONBOARDING_SECTION_META.get(key, {}).get('subtitle', ''), - 'blocks': blocks_for_section, - 'is_custom': key in custom_section_keys, - 'has_custom_checkbox_fields': has_custom_checkbox_fields, - } - ) - return sections - - -OFFBOARDING_SECTION_META = { - 'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')}, - 'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')}, - 'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')}, -} - - -def _build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]: - field_pages = getattr(form, '_field_page_keys', {}) - grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER} - for field_name in form.fields.keys(): - section_key = field_pages.get(field_name, 'abschluss') - if section_key not in grouped: - section_key = 'abschluss' - grouped[section_key].append(form[field_name]) - visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER) - return [ - { - 'key': key, - 'title': OFFBOARDING_SECTION_META[key]['title'], - 'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'], - 'fields': grouped[key], - } - for key in OFFBOARDING_PAGE_ORDER - if key in visible_keys and grouped[key] - ] - - -def _ops_summary_for_user(user) -> dict[str, object]: - can_view_jobs = user_has_capability(user, 'view_job_monitor') - can_manage_backups = user_has_capability(user, 'manage_backups') - summary: dict[str, object] = { - 'show': can_view_jobs or can_manage_backups, - 'can_view_jobs': can_view_jobs, - 'can_manage_backups': can_manage_backups, - 'failed_count_24h': 0, - 'started_count_24h': 0, - 'success_count_24h': 0, - 'recent_failed_logs': [], - 'backup_health': latest_backup_health_snapshot() if can_manage_backups else None, - } - if not can_view_jobs: - return summary - - since = timezone.now() - timedelta(hours=24) - logs = AsyncTaskLog.objects.filter(started_at__gte=since) - counts = { - row['status']: row['count'] - for row in logs.values('status').annotate(count=Count('id')) - } - summary['failed_count_24h'] = counts.get('failed', 0) - summary['started_count_24h'] = counts.get('started', 0) - summary['success_count_24h'] = counts.get('succeeded', 0) - summary['recent_failed_logs'] = list( - AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5] - ) - return summary - - @login_required def home(request): config, _ = WorkflowConfig.objects.get_or_create(name='Default') @@ -957,13 +557,24 @@ def backup_recovery_page(request): @_require_capability('manage_backups') @require_POST def create_backup_from_admin(request): - return create_backup_from_admin_impl(request, audit_fn=_audit) + return create_backup_from_admin_impl( + request, + audit_fn=_audit, + notify_user_fn=notify_user, + create_backup_bundle_fn=create_backup_bundle, + ) @_require_capability('manage_backups') @require_POST def verify_backup_from_admin(request, backup_name: str): - return verify_backup_from_admin_impl(request, backup_name, audit_fn=_audit) + return verify_backup_from_admin_impl( + request, + backup_name, + audit_fn=_audit, + notify_user_fn=notify_user, + verify_backup_bundle_fn=verify_backup_bundle, + ) @_require_capability('manage_backups')