snapshot: modularize workflow helper and task orchestration layers

This commit is contained in:
Md Bayazid Bostame
2026-03-28 09:10:07 +01:00
parent ee323106e9
commit e80a68d6f8
9 changed files with 748 additions and 1233 deletions

View File

@@ -0,0 +1,188 @@
from datetime import timedelta
from pathlib import Path
from django.conf import settings
from django.utils import timezone
from jinja2 import Template
from .branding import get_default_notification_templates
from .emailing import send_system_email
from .forms import OnboardingRequestForm
from .models import NotificationRule, NotificationTemplate, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig
from .services import get_email_test_redirect, is_email_test_mode
def resolve_workflow_emails() -> tuple[str, str, str, str, str]:
config = WorkflowConfig.objects.order_by('id').first()
it_email = config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL
general_info_email = config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL
business_card_email = config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL
hr_works_email = config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL
key_email = config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL
return it_email, general_info_email, business_card_email, hr_works_email, key_email
def send_workflow_email(
subject: str,
body: str,
to: list[str],
attachments: list[Path] | None = None,
from_email: str | None = None,
) -> None:
recipients = [r for r in to if r]
if not recipients:
return
effective_to = recipients
effective_body = body
if is_email_test_mode():
effective_to = [get_email_test_redirect()]
effective_body = (
'[TEST MODE] Diese E-Mail wurde umgeleitet.\n'
f"Originale Empfänger: {', '.join(recipients)}\n\n{body}"
)
send_system_email(
subject=subject,
body=effective_body,
to=effective_to,
attachments=[str(a) for a in (attachments or [])],
from_email=from_email,
)
def render_notification_template(template_key: str, context: dict, language_code: str | None = None) -> tuple[str, str]:
lang = (language_code or 'de').split('-')[0]
db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first()
if db_template:
subject_template = db_template.translated_subject_template(lang)
body_template = db_template.translated_body_template(lang)
else:
fallback = get_default_notification_templates()[template_key]
subject_template = fallback.get(f'subject_{lang}', '') or fallback['subject']
body_template = fallback.get(f'body_{lang}', '') or fallback['body']
subject = Template(subject_template).render(context).strip()
body = Template(body_template).render(context).strip()
return subject, body
def parse_recipients(raw: str) -> list[str]:
if not raw:
return []
cleaned = raw.replace(';', ',').replace('\n', ',')
return [x.strip() for x in cleaned.split(',') if x.strip()]
def as_bool(value) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
text = str(value).strip().lower()
return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'}
def rule_matches(rule: NotificationRule, request_obj) -> bool:
if rule.operator == 'always':
return True
raw_value = getattr(request_obj, rule.field_name, '')
actual = '' if raw_value is None else str(raw_value)
expected = (rule.expected_value or '').strip()
if rule.operator == 'contains':
return expected.lower() in actual.lower()
if rule.operator == 'equals':
return actual.strip().lower() == expected.lower()
if rule.operator == 'is_true':
return as_bool(raw_value)
if rule.operator == 'is_false':
return not as_bool(raw_value)
return False
def send_templated_email(
template_key: str,
to: list[str],
context: dict,
attachments: list[Path] | None = None,
from_email: str | None = None,
language_code: str | None = None,
) -> None:
subject, body = render_notification_template(template_key, context, language_code=language_code)
send_workflow_email(subject=subject, body=body, to=to, attachments=attachments, from_email=from_email)
def apply_notification_rules(
event_type: str,
request_obj,
context: dict,
pdf_path: Path | None = None,
) -> None:
language_code = (getattr(request_obj, 'preferred_language', '') or 'de').split('-')[0]
rules = NotificationRule.objects.filter(event_type=event_type, is_active=True).order_by('sort_order', 'id')
for rule in rules:
if not rule_matches(rule, request_obj):
continue
recipients = parse_recipients(rule.recipients)
if not recipients:
continue
attachments = [pdf_path] if (pdf_path and rule.include_pdf_attachment) else None
template_key = (rule.template_key or '').strip()
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
if template_key and template_key in known_keys:
send_templated_email(
template_key=template_key,
context=context,
to=recipients,
attachments=attachments,
language_code=language_code,
)
continue
subject = rule.translated_custom_subject(language_code)
body = rule.translated_custom_body(language_code)
if not subject and not body:
continue
subject_rendered = Template(subject or f'[{event_type}] Regelmail').render(context).strip()
body_rendered = Template(body or '-').render(context).strip()
send_workflow_email(
subject=subject_rendered,
body=body_rendered,
to=recipients,
attachments=attachments,
)
def schedule_welcome_email(request_obj: OnboardingRequest, *, send_scheduled_welcome_email_task) -> None:
recipient = (request_obj.work_email or '').strip().lower()
if not recipient:
return
config = WorkflowConfig.objects.order_by('id').first()
delay_days = 5
if config:
delay_days = max(0, int(config.welcome_email_delay_days or 5))
send_at = timezone.now() + timedelta(days=delay_days)
scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create(
onboarding_request=request_obj,
defaults={
'recipient_email': recipient,
'send_at': send_at,
'status': 'scheduled',
'last_error': '',
'sent_at': None,
},
)
try:
async_result = send_scheduled_welcome_email_task.apply_async(args=[scheduled.id], eta=send_at)
scheduled.celery_task_id = async_result.id or ''
scheduled.save(update_fields=['celery_task_id', 'updated_at'])
except Exception as exc:
scheduled.status = 'failed'
scheduled.last_error = f'Scheduling failed: {exc}'
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])

View File

@@ -0,0 +1,29 @@
from django.utils.translation import gettext as _
from .notifications import notify_user_by_email
def notify_request_result(*, recipient_email: str, title: str, body: str, level: str, event_key: str) -> None:
notify_user_by_email(
email=recipient_email,
title=title,
body=body,
level=level,
link_url='/requests/',
event_key=event_key,
)
def notify_welcome_email_result(*, recipient_email: str, full_name: str, body: str, level: str, event_key: str) -> None:
notify_user_by_email(
email=recipient_email,
title=(
_('Welcome E-Mail gesendet: %(name)s') % {'name': full_name}
if event_key == 'welcome_email_success'
else _('Welcome E-Mail fehlgeschlagen: %(name)s') % {'name': full_name}
),
body=body,
level=level,
link_url='/admin-tools/welcome-emails/',
event_key=event_key,
)

View File

@@ -8,7 +8,6 @@ from django.utils.translation import gettext as _
from .backup_ops import create_backup_bundle, latest_backup_health_snapshot, list_backup_bundles, verify_backup_bundle
from .models import AdminAuditLog, AsyncTaskLog, UserNotification, UserProfile
from .notifications import notify_user
from .roles import user_has_capability
@@ -96,9 +95,9 @@ def backup_recovery_page_impl(request):
)
def create_backup_from_admin_impl(request, *, audit_fn):
def create_backup_from_admin_impl(request, *, audit_fn, notify_user_fn, create_backup_bundle_fn):
try:
result = create_backup_bundle()
result = create_backup_bundle_fn()
audit_fn(
request,
'backup_created',
@@ -106,7 +105,7 @@ def create_backup_from_admin_impl(request, *, audit_fn):
target_label=result['name'],
details={'path': result['path']},
)
notify_user(
notify_user_fn(
user=request.user,
title=_('Backup erstellt: %(name)s') % {'name': result['name']},
body=_('Das Backup-Bundle wurde erfolgreich erstellt.'),
@@ -116,7 +115,7 @@ def create_backup_from_admin_impl(request, *, audit_fn):
)
messages.success(request, _('Backup wurde erstellt: %(name)s') % {'name': result['name']})
except Exception as exc:
notify_user(
notify_user_fn(
user=request.user,
title=_('Backup fehlgeschlagen'),
body=str(exc),
@@ -128,9 +127,9 @@ def create_backup_from_admin_impl(request, *, audit_fn):
return redirect('backup_recovery_page')
def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn):
def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn, notify_user_fn, verify_backup_bundle_fn):
try:
result = verify_backup_bundle(backup_name)
result = verify_backup_bundle_fn(backup_name)
audit_fn(
request,
'backup_verified',
@@ -138,7 +137,7 @@ def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn):
target_label=backup_name,
details={'summary': result['summary']},
)
notify_user(
notify_user_fn(
user=request.user,
title=_('Backup verifiziert: %(name)s') % {'name': result['name']},
body=result.get('summary') or _('Das Backup wurde erfolgreich verifiziert.'),
@@ -148,7 +147,7 @@ def verify_backup_from_admin_impl(request, backup_name: str, *, audit_fn):
)
messages.success(request, _('Backup wurde verifiziert: %(name)s') % {'name': result['name']})
except Exception as exc:
notify_user(
notify_user_fn(
user=request.user,
title=_('Backup-Verifikation fehlgeschlagen'),
body=str(exc),

View File

@@ -13,9 +13,9 @@ from jinja2 import Template
from pypdf import PageObject, PdfReader, PdfWriter
from xhtml2pdf import pisa
from .branding import get_branding_email_copy, get_company_contact_copy, get_default_notification_templates, get_portal_letterhead_path
from .branding import get_branding_email_copy, get_company_contact_copy, get_portal_letterhead_path
from . import email_workflows, notification_dispatch, pdf_rendering
from .models import AsyncTaskLog, EmployeeProfile, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig
from .emailing import send_system_email
from .services import upload_to_nextcloud
from .services import get_email_test_redirect, is_email_test_mode
from .forms import (
@@ -28,7 +28,6 @@ from .forms import (
SOFTWARE_EXTRA_CHOICES,
WORKSPACE_GROUP_CHOICES,
)
from .notifications import notify_user_by_email
from .pdf_sections import build_pdf_sections
# These templates are the product-level defaults for fresh deployments.
@@ -254,27 +253,21 @@ DEFAULT_NOTIFICATION_TEMPLATES = {
def _notify_request_result(*, recipient_email: str, title: str, body: str, level: str, event_key: str) -> None:
notify_user_by_email(
email=recipient_email,
return notification_dispatch.notify_request_result(
recipient_email=recipient_email,
title=title,
body=body,
level=level,
link_url='/requests/',
event_key=event_key,
)
def _notify_welcome_email_result(*, recipient_email: str, full_name: str, body: str, level: str, event_key: str) -> None:
notify_user_by_email(
email=recipient_email,
title=(
_('Welcome E-Mail gesendet: %(name)s') % {'name': full_name}
if event_key == 'welcome_email_success'
else _('Welcome E-Mail fehlgeschlagen: %(name)s') % {'name': full_name}
),
return notification_dispatch.notify_welcome_email_result(
recipient_email=recipient_email,
full_name=full_name,
body=body,
level=level,
link_url='/admin-tools/welcome-emails/',
event_key=event_key,
)
@@ -300,211 +293,35 @@ def _finish_task_log(task_log: AsyncTaskLog | None, *, status: str, error_messag
task_log.save(update_fields=['status', 'error_message', 'finished_at'])
def _split_name(full_name: str) -> tuple[str, str]:
parts = full_name.split()
if not parts:
return '', ''
return parts[0], ' '.join(parts[1:])
return pdf_rendering._split_name(full_name)
def _safe_filename_fragment(text: str, fallback: str = 'document') -> str:
value = re.sub(r'[^A-Za-z0-9._-]+', '_', (text or '').strip()).strip('._')
return value[:120] if value else fallback
return pdf_rendering._safe_filename_fragment(text, fallback=fallback)
def _resolve_user_display_name(email: str) -> str:
email = (email or '').strip().lower()
if not email:
return ''
user_model = get_user_model()
user = user_model.objects.filter(email__iexact=email).first()
if not user:
return ''
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
return (getattr(user, 'username', '') or '').strip()
return pdf_rendering._resolve_user_display_name(email)
def _chunk_list(data_list: list[str], chunk_size: int = 3) -> list[list[str]]:
items = [i.strip() for i in data_list if i and i.strip()]
chunks = []
for i in range(0, len(items), chunk_size):
chunks.append(items[i : i + chunk_size])
return chunks
return pdf_rendering._chunk_list(data_list, chunk_size=chunk_size)
def _split_multiline(text: str) -> list[str]:
return [line.strip() for line in (text or '').split('\n') if line.strip()]
return pdf_rendering._split_multiline(text)
def _chunk_choice_labels(choices: list[tuple[str, str]], chunk_size: int = 3) -> list[list[str]]:
labels = [label for _, label in choices]
return _chunk_list(labels, chunk_size=chunk_size)
return pdf_rendering._chunk_choice_labels(choices, chunk_size=chunk_size)
def _normalized_lang(language_code: str | None) -> str:
return (language_code or 'de').split('-')[0].lower() or 'de'
return pdf_rendering._normalized_lang(language_code)
def _pdf_texts(language_code: str | None = None) -> dict[str, str]:
lang = _normalized_lang(language_code)
texts = {
'de': {
'lang': 'de',
'not_available': 'Keine Angabe',
'not_available_short': '-',
'yes': 'Ja',
'no': 'Nein',
'onboarding_title': 'Onboarding-Unterlagen',
'onboarding_staff_data': 'Personaldaten',
'name': 'Name',
'department': 'Abteilung',
'job_title': 'Berufsbezeichnung',
'work_email': 'Dienstliche E-Mail',
'employment_type': 'Beschäftigungsverhältnis',
'contract_start': 'Vertragsbeginn',
'contract_end': 'Vertragsende',
'handover_date': 'Übergabedatum',
'equipment_access': 'Ausstattung und Zugänge',
'devices': 'Benötigte Geräte und Gegenstände',
'workspace_groups': 'Benötigte Gruppen im Workspace',
'software': 'Benötigte Software',
'accesses': 'Benötigte Zugänge',
'resources': 'Benötigte Ressourcen',
'group_mailboxes_required': 'Gruppenpostfächer erforderlich',
'additional_hardware_needed': 'Darüber hinaus wird weitere Hardware benötigt',
'additional_software_needed': 'Wird zusätzliche Software benötigt',
'additional_access_needed': 'Darüber hinaus werden weitere Zugänge benötigt',
'additional_details': 'Zusätzliche Angaben',
'business_cards': 'Visitenkarten',
'email': 'E-Mail',
'phone': 'Telefon',
'additional_hardware_other': 'Weitere Hardware (Freitext)',
'successor_phone': 'Nachfolge und Telefon',
'successor_of': 'Nachfolge von',
'inherit_phone_number': 'Telefon von Vorgänger übernehmen',
'direct_extension': 'Direktwahl',
'notes': 'Notizen',
'confirmation': 'Bestätigung',
'requested_by_name': 'Angefordert von (Name)',
'requested_by_email': 'Angefordert von (E-Mail)',
'signature': 'Unterschrift',
'signature_alt': 'Unterschrift',
'onboarding_note': 'Hinweis: Dieses Formular dient als interne Prozessgrundlage für das Onboarding.',
'offboarding_title': 'Offboarding-Unterlagen',
'employee_info': 'Mitarbeitenden-Informationen',
'last_working_day': 'Letzter Arbeitstag',
'offboarding_requester': 'Offboarding-Anfordernde Person',
'it_hardware_status': 'IT-Hardware-Status (aus Onboarding)',
'hardware_check': 'Hardware-Check',
'no_onboarding_hardware': 'Keine Onboarding-Hardwaredaten gefunden.',
'manual_return_overview': 'Manuelle Rückgabeübersicht',
'manual_return_note': 'Es wurden keine gespeicherten Onboarding-Daten zu dieser Person gefunden. Die folgenden Listen dienen als manuelle Rückgabe- und Prüfübersicht.',
'returned_devices': 'Zurückgegebene Geräte und Artikel',
'returned_software': 'Zurückgegebene bzw. deaktivierte Software',
'removed_workspace_groups': 'Entfernte Gruppen im Workspace',
'removed_accesses': 'Entfernte Zugänge',
'returned_extra_it': 'Zurückgegebene zusätzliche Hardware / Software',
'it_signatures': 'IT-Section: Signaturen',
'it_checked_by': 'IT geprüft am durch:',
'it_signature': 'IT-Unterschrift:',
'return_complete': 'Rückgabe vollständig:',
'offboarding_note': 'Hinweis: Dieses Formular dient als interne Prozessgrundlage für das Offboarding.',
'intro_title': 'Einweisungs- und Übergabeprotokoll',
'intro_sub': 'Gesprächsleitfaden für die persönliche Einführung neuer Mitarbeitender.',
'base_data': 'Basisdaten',
'start_date': 'Startdatum',
'introduced_by': 'Einweisung durch',
'intro_note': 'Dieses Dokument dient als Gesprächsleitfaden für die persönliche Einweisung. Die Felder können während des Termins manuell abgehakt und anschließend unterschrieben werden.',
'employee_signature': 'Unterschrift Mitarbeitende Person:',
'trainer_signature': 'Unterschrift Einweisende Person:',
'intro_completed_at': 'Einweisung durchgeführt am:',
'open_questions': 'Rückfragen offen / Nacharbeit erforderlich:',
'live_intro_title': 'Einweisungsprotokoll',
'live_intro_sub': 'Export des aktuellen Live-Status aus der webbasierten Einweisung.',
'employment_start': 'Vertragsbeginn',
'employee_signature_block': 'Unterschrift Mitarbeitende Person',
},
'en': {
'lang': 'en',
'not_available': 'Not provided',
'not_available_short': '-',
'yes': 'Yes',
'no': 'No',
'onboarding_title': 'Onboarding Documents',
'onboarding_staff_data': 'Employee Details',
'name': 'Name',
'department': 'Department',
'job_title': 'Job title',
'work_email': 'Work email',
'employment_type': 'Employment type',
'contract_start': 'Contract start',
'contract_end': 'Contract end',
'handover_date': 'Handover date',
'equipment_access': 'Equipment and access',
'devices': 'Required devices and items',
'workspace_groups': 'Required workspace groups',
'software': 'Required software',
'accesses': 'Required accesses',
'resources': 'Required resources',
'group_mailboxes_required': 'Group mailboxes required',
'additional_hardware_needed': 'Additional hardware required',
'additional_software_needed': 'Additional software required',
'additional_access_needed': 'Additional accesses required',
'additional_details': 'Additional details',
'business_cards': 'Business cards',
'email': 'Email',
'phone': 'Phone',
'additional_hardware_other': 'Additional hardware (free text)',
'successor_phone': 'Successor and phone',
'successor_of': 'Successor to',
'inherit_phone_number': 'Take over predecessor phone number',
'direct_extension': 'Direct extension',
'notes': 'Notes',
'confirmation': 'Confirmation',
'requested_by_name': 'Requested by (name)',
'requested_by_email': 'Requested by (email)',
'signature': 'Signature',
'signature_alt': 'Signature',
'onboarding_note': 'Note: This form serves as the internal process basis for onboarding.',
'offboarding_title': 'Offboarding Documents',
'employee_info': 'Employee information',
'last_working_day': 'Last working day',
'offboarding_requester': 'Offboarding requester',
'it_hardware_status': 'IT hardware status (from onboarding)',
'hardware_check': 'Hardware check',
'no_onboarding_hardware': 'No onboarding hardware data found.',
'manual_return_overview': 'Manual return overview',
'manual_return_note': 'No stored onboarding data was found for this person. The following lists serve as a manual return and review overview.',
'returned_devices': 'Returned devices and items',
'returned_software': 'Returned or disabled software',
'removed_workspace_groups': 'Removed workspace groups',
'removed_accesses': 'Removed accesses',
'returned_extra_it': 'Returned additional hardware / software',
'it_signatures': 'IT section: signatures',
'it_checked_by': 'Checked by IT on:',
'it_signature': 'IT signature:',
'return_complete': 'Return complete:',
'offboarding_note': 'Note: This form serves as the internal process basis for offboarding.',
'intro_title': 'Introduction and Handover Protocol',
'intro_sub': 'Conversation guide for the personal introduction of new employees.',
'base_data': 'Basic data',
'start_date': 'Start date',
'introduced_by': 'Introduction by',
'intro_note': 'This document serves as a conversation guide for the personal introduction. The fields can be checked manually during the meeting and signed afterwards.',
'employee_signature': 'Employee signature:',
'trainer_signature': 'Trainer signature:',
'intro_completed_at': 'Introduction completed on:',
'open_questions': 'Open questions / follow-up required:',
'live_intro_title': 'Introduction Protocol',
'live_intro_sub': 'Export of the current live status from the web-based introduction.',
'employment_start': 'Contract start',
'employee_signature_block': 'Employee signature',
},
}
return texts.get(lang, texts['de'])
return pdf_rendering._pdf_texts(language_code)
MANUAL_ONBOARDING_FIELD_SECTIONS = [
@@ -562,137 +379,23 @@ MANUAL_ONBOARDING_FIELD_SECTIONS = [
def _manual_onboarding_field_sections() -> list[dict]:
fields = OnboardingRequestForm.base_fields
sections = []
for title, field_names in MANUAL_ONBOARDING_FIELD_SECTIONS:
labels = [str(fields[name].label or name) for name in field_names if name in fields]
if not labels:
continue
sections.append({'title': title, 'rows': _chunk_list(labels, chunk_size=2)})
return sections
return pdf_rendering._manual_onboarding_field_sections()
def _resolve_workflow_emails() -> tuple[str, str, str, str, str]:
config = WorkflowConfig.objects.order_by('id').first()
it_email = (config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL)
general_info_email = (config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL)
business_card_email = (config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL)
hr_works_email = (config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL)
key_email = (config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL)
return it_email, general_info_email, business_card_email, hr_works_email, key_email
return email_workflows.resolve_workflow_emails()
def _matches_intro_condition(request_obj: OnboardingRequest, item: IntroChecklistItem) -> bool:
operator = (item.condition_operator or 'always').strip()
field_name = (item.condition_field or '').strip()
expected = (item.condition_value or '').strip()
if operator == 'always' or not field_name:
return True
raw_value = getattr(request_obj, field_name, '')
if raw_value is None:
raw_value = ''
if operator == 'is_true':
return bool(raw_value)
if operator == 'is_false':
return not bool(raw_value)
text_value = str(raw_value).strip()
if operator == 'equals':
return text_value.lower() == expected.lower()
if operator == 'contains':
return expected.lower() in text_value.lower()
return True
return pdf_rendering._matches_intro_condition(request_obj, item)
def _build_intro_sections_from_admin(request_obj: OnboardingRequest, language_code: str | None = None) -> dict[str, list[str]]:
items = list(IntroChecklistItem.objects.filter(is_active=True).order_by('section', 'sort_order', 'label'))
if not items:
return {}
section_map = {key: [] for key, _label in IntroChecklistItem.SECTION_CHOICES}
for item in items:
if item.section not in section_map:
continue
if _matches_intro_condition(request_obj, item):
section_map[item.section].append(item.translated_label(language_code))
return {key: values for key, values in section_map.items() if values}
return pdf_rendering._build_intro_sections_from_admin(request_obj, language_code=language_code)
def build_intro_sections_for_request(request_obj: OnboardingRequest, language_code: str | None = None) -> list[dict]:
lang = _normalized_lang(language_code or get_language())
with override(lang):
section_titles = {
'workplace': _('Geräte und Arbeitsplatz'),
'accounts': _('Konten und Berechtigungen'),
'software': _('Software und Tools'),
'process': _('Prozesse und Hinweise'),
}
devices = _split_multiline(request_obj.needed_devices)
software = _split_multiline(request_obj.needed_software)
accesses = _split_multiline(request_obj.needed_accesses)
groups = _split_multiline(request_obj.needed_workspace_groups)
resources = _split_multiline(request_obj.needed_resources)
extra_hardware = _split_multiline(request_obj.additional_hardware)
extra_software = _split_multiline(request_obj.additional_software)
group_mailboxes = _split_multiline(request_obj.group_mailboxes)
workplace_items = []
for item in devices:
workplace_items.append(_('%(item)s übergeben und Grundfunktionen erklärt') % {'item': item})
for item in resources:
workplace_items.append(_('%(item)s gezeigt bzw. Nutzung erklärt') % {'item': item})
if request_obj.phone_number:
workplace_items.append(_('Telefonnummer / Direktwahl erklärt: %(value)s') % {'value': request_obj.phone_number})
if not workplace_items:
workplace_items.append(_('Arbeitsplatz, Geräte und allgemeine Nutzung besprochen'))
account_items = [_('%(item)s Zugang erklärt') % {'item': item} for item in accesses]
account_items.extend([_('%(item)s Gruppe / Berechtigung erläutert') % {'item': item} for item in groups])
if request_obj.work_email:
account_items.insert(0, _('Dienstliche E-Mail-Adresse erläutert: %(value)s') % {'value': request_obj.work_email})
if group_mailboxes:
account_items.extend([_('Gruppenpostfach erklärt: %(item)s') % {'item': item} for item in group_mailboxes])
if not account_items:
account_items.append(_('Zugänge, Konten und Anmeldelogik besprochen'))
software_items = [_('%(item)s Einführung durchgeführt') % {'item': item} for item in software]
software_items.extend([_('%(item)s zusätzlich besprochen') % {'item': item} for item in extra_software])
if not software_items:
software_items.append(_('Benötigte Standardsoftware und tägliche Nutzung erklärt'))
process_items = [
_('Passwortregeln und sicherer Umgang besprochen'),
_('Dateiablage, Nextcloud und Freigaben erklärt'),
_('Kommunikationswege und Support-Prozess erklärt'),
]
if extra_hardware:
process_items.extend([_('%(item)s als zusätzliche Ausstattung besprochen') % {'item': item} for item in extra_hardware])
if request_obj.additional_access_text:
process_items.extend([_('Zusätzlicher Zugang besprochen: %(item)s') % {'item': item} for item in _split_multiline(request_obj.additional_access_text)])
if request_obj.successor_name:
process_items.append(_('Übergabe-/Nachfolgekontext besprochen: %(value)s') % {'value': request_obj.successor_name})
custom_intro_items = _build_intro_sections_from_admin(request_obj, lang)
intro_sections_raw = [
('workplace', section_titles['workplace'], workplace_items),
('accounts', section_titles['accounts'], account_items),
('software', section_titles['software'], software_items),
('process', section_titles['process'], process_items),
]
sections = []
for key, title, default_items in intro_sections_raw:
merged_items = list(default_items)
merged_items.extend(custom_intro_items.get(key, []))
section_items = []
for idx, label in enumerate(merged_items, start=1):
section_items.append({'id': f'{key}_{idx}', 'label': label})
if section_items:
sections.append({'key': key, 'title': title, 'items': section_items})
return sections
return pdf_rendering.build_intro_sections_for_request(request_obj, language_code=language_code)
def _send_workflow_email(
@@ -702,77 +405,33 @@ def _send_workflow_email(
attachments: list[Path] | None = None,
from_email: str | None = None,
) -> None:
recipients = [r for r in to if r]
if not recipients:
return
effective_to = recipients
effective_body = body
if is_email_test_mode():
effective_to = [get_email_test_redirect()]
effective_body = (
"[TEST MODE] Diese E-Mail wurde umgeleitet.\n"
f"Originale Empfänger: {', '.join(recipients)}\n\n{body}"
)
send_system_email(
return email_workflows.send_workflow_email(
subject=subject,
body=effective_body,
to=effective_to,
attachments=[str(a) for a in (attachments or [])],
body=body,
to=to,
attachments=attachments,
from_email=from_email,
)
def _render_notification_template(template_key: str, context: dict, language_code: str | None = None) -> tuple[str, str]:
lang = (language_code or 'de').split('-')[0]
db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first()
if db_template:
subject_template = db_template.translated_subject_template(lang)
body_template = db_template.translated_body_template(lang)
else:
fallback = get_default_notification_templates()[template_key]
subject_template = fallback.get(f'subject_{lang}', '') or fallback['subject']
body_template = fallback.get(f'body_{lang}', '') or fallback['body']
subject = Template(subject_template).render(context).strip()
body = Template(body_template).render(context).strip()
return subject, body
return email_workflows.render_notification_template(
template_key,
context,
language_code=language_code,
)
def _parse_recipients(raw: str) -> list[str]:
if not raw:
return []
cleaned = raw.replace(';', ',').replace('\n', ',')
return [x.strip() for x in cleaned.split(',') if x.strip()]
return email_workflows.parse_recipients(raw)
def _as_bool(value) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
text = str(value).strip().lower()
return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'}
return email_workflows.as_bool(value)
def _rule_matches(rule: NotificationRule, request_obj) -> bool:
if rule.operator == 'always':
return True
raw_value = getattr(request_obj, rule.field_name, '')
actual = '' if raw_value is None else str(raw_value)
expected = (rule.expected_value or '').strip()
if rule.operator == 'contains':
return expected.lower() in actual.lower()
if rule.operator == 'equals':
return actual.strip().lower() == expected.lower()
if rule.operator == 'is_true':
return _as_bool(raw_value)
if rule.operator == 'is_false':
return not _as_bool(raw_value)
return False
return email_workflows.rule_matches(rule, request_obj)
def _apply_notification_rules(
@@ -821,32 +480,10 @@ def _apply_notification_rules(
def _schedule_welcome_email(request_obj: OnboardingRequest) -> None:
recipient = (request_obj.work_email or '').strip().lower()
if not recipient:
return
config = WorkflowConfig.objects.order_by('id').first()
delay_days = 5
if config:
delay_days = max(0, int(config.welcome_email_delay_days or 5))
send_at = timezone.now() + timedelta(days=delay_days)
scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create(
onboarding_request=request_obj,
defaults={
'recipient_email': recipient,
'send_at': send_at,
'status': 'scheduled',
'last_error': '',
'sent_at': None,
},
return email_workflows.schedule_welcome_email(
request_obj,
send_scheduled_welcome_email_task=send_scheduled_welcome_email,
)
try:
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=send_at)
scheduled.celery_task_id = async_result.id or ''
scheduled.save(update_fields=['celery_task_id', 'updated_at'])
except Exception as exc:
scheduled.status = 'failed'
scheduled.last_error = f'Scheduling failed: {exc}'
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
def _send_templated_email(
@@ -862,281 +499,23 @@ def _send_templated_email(
def _render_html(template_path: Path, context: dict) -> str:
with template_path.open('r', encoding='utf-8') as handle:
template = Template(handle.read())
return template.render(context)
return pdf_rendering._render_html(template_path, context)
def _generate_content_pdf(html_content: str, output_pdf: Path) -> None:
page_style = (
'<style>'
'@page { size: A4; margin: 58mm 16mm 16mm 16mm; }'
'body { margin: 0; }'
'</style>'
)
if '<head>' in html_content:
html_content = html_content.replace('<head>', f'<head>{page_style}', 1)
else:
html_content = page_style + html_content
output_pdf.parent.mkdir(parents=True, exist_ok=True)
with output_pdf.open('wb') as fp:
result = pisa.CreatePDF(
src=html_content,
dest=fp,
encoding='utf-8',
)
if result.err:
raise RuntimeError(f'Failed to render PDF content for {output_pdf.name}')
return pdf_rendering._generate_content_pdf(html_content, output_pdf)
def _overlay_with_letterhead(content_pdf: Path, letterhead_pdf: Path, output_pdf: Path) -> None:
letterhead_reader = PdfReader(str(letterhead_pdf))
content_reader = PdfReader(str(content_pdf))
writer = PdfWriter()
letterhead_page = letterhead_reader.pages[0]
for page in content_reader.pages:
merged = PageObject.create_blank_page(
width=letterhead_page.mediabox.width,
height=letterhead_page.mediabox.height,
)
merged.merge_page(letterhead_page)
merged.merge_page(page)
writer.add_page(merged)
with output_pdf.open('wb') as fp:
writer.write(fp)
return pdf_rendering._overlay_with_letterhead(content_pdf, letterhead_pdf, output_pdf)
def _generate_onboarding_pdf(request_obj: OnboardingRequest) -> Path:
lang = _normalized_lang(request_obj.preferred_language)
t = _pdf_texts(lang)
first_name, last_name = _split_name(request_obj.full_name)
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_{request_obj.id}')
output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_letter_{safe_name}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_{safe_name}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_template.html'
letterhead_path = get_portal_letterhead_path()
company_contact = get_company_contact_copy()
devices = _split_multiline(request_obj.needed_devices)
software = _split_multiline(request_obj.needed_software)
accesses = _split_multiline(request_obj.needed_accesses)
groups = _split_multiline(request_obj.needed_workspace_groups)
resources = _split_multiline(request_obj.needed_resources)
group_mailboxes_list = _split_multiline(request_obj.group_mailboxes or '')
additional_hardware_list = _split_multiline(request_obj.additional_hardware or '')
additional_software_list = _split_multiline(request_obj.additional_software or '')
additional_access_list = _split_multiline(request_obj.additional_access_text or '')
signature_src = ''
signature_note = t['not_available_short']
if getattr(request_obj, 'signature_image', None):
try:
signature_path = Path(request_obj.signature_image.path).resolve()
with signature_path.open('rb') as sig_fp:
encoded = base64.b64encode(sig_fp.read()).decode('ascii')
mime_type = mimetypes.guess_type(signature_path.name)[0] or 'image/png'
signature_src = f"data:{mime_type};base64,{encoded}"
signature_note = 'Digital signature stored as image file.' if lang == 'en' else 'Digitale Signatur als Bilddatei hinterlegt.'
except Exception:
signature_src = ''
signature_note = request_obj.signature_url or t['not_available_short']
elif request_obj.signature_url:
signature_note = request_obj.signature_url
requester_email = request_obj.onboarded_by_email or t['not_available_short']
requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or t['not_available_short']
gender = (request_obj.get_gender_display() or t['not_available_short']).strip() or t['not_available_short']
employment_type = (request_obj.employment_type or t['not_available_short']).strip() or t['not_available_short']
employment_end = request_obj.employment_end_date or t['not_available_short']
order_business_cards = bool(request_obj.order_business_cards)
group_mailboxes = (request_obj.group_mailboxes or '').strip()
additional_hardware_other = (request_obj.additional_hardware_other or '').strip()
additional_hardware = (request_obj.additional_hardware or '').strip()
additional_software = (request_obj.additional_software or '').strip()
additional_access_text = (request_obj.additional_access_text or '').strip()
successor_name = (request_obj.successor_name or '').strip()
additional_notes = (request_obj.additional_notes or '').strip()
phone_number = (request_obj.phone_number or '').strip()
display_name = f"{gender} {first_name} {last_name}".strip() if gender and gender != '-' else f"{first_name} {last_name}".strip()
pdf_sections = build_pdf_sections('onboarding', request_obj, lang)
pdf_section_map = {section['key']: section for section in pdf_sections if section.get('has_content')}
pdf_field_map = {}
for section in pdf_sections:
for field in section.get('render_fields', []):
pdf_field_map[field['name']] = field
def _field_text(name: str, fallback):
field = pdf_field_map.get(name)
if not field:
return fallback
value = field.get('display_value')
if isinstance(value, list):
return ' | '.join(value) if value else fallback
return value or fallback
def _field_list(name: str) -> list[str]:
field = pdf_field_map.get(name)
if not field:
return []
value = field.get('display_value')
if isinstance(value, list):
return value
text = str(value or '').strip()
return [text] if text else []
devices_visible = _field_list('needed_devices_multi')
groups_visible = _field_list('needed_workspace_groups_multi')
software_visible = _field_list('needed_software_multi')
accesses_visible = _field_list('needed_accesses_multi')
resources_visible = _field_list('needed_resources_multi')
group_mailboxes_visible = _field_list('group_mailboxes')
additional_hardware_visible = _field_list('additional_hardware_multi')
additional_software_visible = _field_list('additional_software_multi')
additional_access_visible = _field_list('additional_access_text')
job_title_visible = 'job_title' in pdf_field_map
itsetup_visible = 'itsetup' in pdf_section_map
context = {
'T': t,
'PDF_LANG': lang,
'PDF_SECTIONS': pdf_sections,
'VORNAME': first_name,
'NACHNAME': last_name,
'DISPLAY_NAME': display_name or request_obj.full_name,
'ANREDE': _field_text('gender', gender),
'JOB_TITLE_VISIBLE': job_title_visible,
'BERUFSBEZEICHNUNG': _field_text('job_title', t['not_available']),
'ABTEILUNG': _field_text('department', t['not_available']),
'EMAIL': _field_text('work_email', t['not_available']),
'VERTRAGSBEGINN': _field_text('contract_start', request_obj.contract_start),
'BESCHAEFTIGUNG': _field_text('employment_type', employment_type),
'VERTRAGSENDE': _field_text('employment_end_date', employment_end),
'UEBERGABEDATUM': _field_text('handover_date', request_obj.handover_date or t['not_available_short']),
'ARBEITSGERAETE_TEXT': ' | '.join(devices_visible) if devices_visible else t['not_available'],
'WORKSPACE_GROUPS_TEXT': ' | '.join(groups_visible) if groups_visible else t['not_available'],
'SOFTWARE_TEXT': ' | '.join(software_visible) if software_visible else t['not_available'],
'ZUGAENGE_TEXT': ' | '.join(accesses_visible) if accesses_visible else t['not_available'],
'RESSOURCEN_TEXT': ' | '.join(resources_visible) if resources_visible else t['not_available'],
'VISITENKARTE_BESTELLT': order_business_cards,
'HAS_VISITENKARTE_DATEN': order_business_cards and ('business_card_name' in pdf_field_map or 'business_card_title' in pdf_field_map or 'business_card_email' in pdf_field_map or 'business_card_phone' in pdf_field_map) and any(
[
_field_text('business_card_name', '').strip(),
_field_text('business_card_title', '').strip(),
_field_text('business_card_email', '').strip(),
_field_text('business_card_phone', '').strip(),
]
),
'VISITENKARTE_NAME': _field_text('business_card_name', t['not_available_short']),
'VISITENKARTE_TITEL': _field_text('business_card_title', t['not_available_short']),
'VISITENKARTE_EMAIL': _field_text('business_card_email', t['not_available_short']),
'VISITENKARTE_TELEFON': _field_text('business_card_phone', t['not_available_short']),
'GROUP_MAILBOXES': _field_text('group_mailboxes', group_mailboxes or t['not_available']),
'ADDITIONAL_HARDWARE_OTHER': _field_text('additional_hardware_other', additional_hardware_other or t['not_available']),
'ADDITIONAL_HARDWARE': _field_text('additional_hardware_other', additional_hardware or t['not_available']),
'ADDITIONAL_SOFTWARE': _field_text('additional_software', additional_software or t['not_available']),
'ADDITIONAL_ACCESS_TEXT': _field_text('additional_access_text', additional_access_text or t['not_available']),
'SUCCESSOR_NAME': _field_text('successor_name', successor_name or t['not_available']),
'PHONE_NUMBER': _field_text('phone_number_choice', phone_number or t['not_available_short']),
'INHERIT_PHONE_NUMBER': _field_text('inherit_phone_number_choice', t['yes'] if request_obj.inherit_phone_number else t['no']),
'ADDITIONAL_NOTES': _field_text('additional_notes', additional_notes or t['not_available']),
'GROUP_MAILBOXES_REQUIRED': 'group_mailboxes_required_choice' in pdf_field_map and bool(group_mailboxes_visible),
'ADDITIONAL_HARDWARE_NEEDED': 'additional_hardware_needed_choice' in pdf_field_map and bool(additional_hardware_visible),
'ADDITIONAL_SOFTWARE_NEEDED': 'additional_software_needed_choice' in pdf_field_map and bool(additional_software_visible),
'ADDITIONAL_ACCESS_NEEDED': 'additional_access_needed_choice' in pdf_field_map and bool(additional_access_visible),
'HAS_DEVICES': itsetup_visible and bool(devices_visible),
'HAS_GROUPS': itsetup_visible and bool(groups_visible),
'HAS_SOFTWARE': itsetup_visible and bool(software_visible),
'HAS_ACCESSES': itsetup_visible and bool(accesses_visible),
'HAS_RESOURCES': itsetup_visible and bool(resources_visible),
'HAS_GROUP_MAILBOXES': bool(group_mailboxes_visible),
'HAS_ADDITIONAL_HARDWARE': bool(additional_hardware_visible),
'HAS_ADDITIONAL_SOFTWARE': bool(additional_software_visible),
'HAS_ADDITIONAL_ACCESS': bool(additional_access_visible),
'HAS_ADDITIONAL_HARDWARE_OTHER': bool(_field_text('additional_hardware_other', '').strip()),
'HAS_SUCCESSOR_INFO': bool(_field_text('successor_name', '').strip()) or 'inherit_phone_number_choice' in pdf_field_map or bool(_field_text('phone_number_choice', '').strip()),
'HAS_ADDITIONAL_NOTES': bool(_field_text('additional_notes', '').strip()),
'GROUP_MAILBOXES_LIST': _chunk_list(group_mailboxes_visible),
'ADDITIONAL_HARDWARE_LIST': _chunk_list(additional_hardware_visible),
'ADDITIONAL_SOFTWARE_LIST': _chunk_list(additional_software_visible),
'ADDITIONAL_ACCESS_LIST': _chunk_list(additional_access_visible),
'ZUGAENGE_LIST': _chunk_list(groups_visible),
'ARBEITSGERÄTE_LIST': _chunk_list(devices_visible),
'SOFTWARE_LIST': _chunk_list(software_visible),
'ACCOUNT_LIST': _chunk_list(accesses_visible),
'STANDARD_RESSOURCEN': _chunk_list(resources_visible),
'UNTERSCHRIFT': signature_src,
'UNTERSCHRIFT_HINWEIS': signature_note,
'REQUESTED_BY_NAME': requester_name,
'REQUESTED_BY_EMAIL': requester_email,
'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'],
'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'],
'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'],
'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'],
'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'],
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
return pdf_rendering._generate_onboarding_pdf(request_obj)
def _generate_onboarding_intro_pdf(request_obj: OnboardingRequest, language_code: str | None = None) -> Path:
lang = _normalized_lang(language_code or request_obj.preferred_language)
t = _pdf_texts(lang)
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_intro_{request_obj.id}')
output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_intro_{safe_name}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_intro_{safe_name}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_intro_template.html'
letterhead_path = get_portal_letterhead_path()
company_contact = get_company_contact_copy()
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip() if salutation else request_obj.full_name
intro_sections = [
{
'title': section['title'],
'rows': _chunk_list([item['label'] for item in section['items']], chunk_size=2),
}
for section in build_intro_sections_for_request(request_obj, language_code=language_code)
]
requester_email = request_obj.onboarded_by_email or '-'
requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-'
context = {
'T': t,
'DISPLAY_NAME': display_name,
'ABTEILUNG': request_obj.department or t['not_available_short'],
'BERUFSBEZEICHNUNG': request_obj.job_title or t['not_available_short'],
'VERTRAGSBEGINN': request_obj.contract_start,
'EMAIL': request_obj.work_email or t['not_available_short'],
'REQUESTED_BY_NAME': requester_name,
'REQUESTED_BY_EMAIL': requester_email,
'INTRO_SECTIONS': intro_sections,
'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'],
'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'],
'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'],
'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'],
'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'],
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
return pdf_rendering._generate_onboarding_intro_pdf(request_obj, language_code=language_code)
def _generate_onboarding_intro_session_pdf(
@@ -1144,147 +523,15 @@ def _generate_onboarding_intro_session_pdf(
admin_signature_name: str = '-',
language_code: str | None = None,
) -> Path:
request_obj = session.onboarding_request
lang = _normalized_lang(language_code or request_obj.preferred_language)
t = _pdf_texts(lang)
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_intro_session_{request_obj.id}')
version = timezone.now().strftime('%Y%m%d%H%M%S')
output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_intro_session_{safe_name}_{version}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_intro_session_{safe_name}_{version}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_intro_session_pdf.html'
letterhead_path = get_portal_letterhead_path()
company_contact = get_company_contact_copy()
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip() if salutation else request_obj.full_name
raw_sections = build_intro_sections_for_request(request_obj, language_code=language_code)
checked_map = session.checklist_state or {}
exported_sections = []
checked_count = 0
total_count = 0
for section in raw_sections:
checked_items = []
for item in section['items']:
checked = bool(checked_map.get(item['id']))
total_count += 1
if checked:
checked_count += 1
checked_items.append({'label': item['label']})
if checked_items:
exported_sections.append({
'title': section['title'],
'rows': [checked_items[i:i + 2] for i in range(0, len(checked_items), 2)],
})
requester_email = request_obj.onboarded_by_email or '-'
requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-'
context = {
'T': t,
'DISPLAY_NAME': display_name,
'ABTEILUNG': request_obj.department or t['not_available_short'],
'BERUFSBEZEICHNUNG': request_obj.job_title or t['not_available_short'],
'VERTRAGSBEGINN': request_obj.contract_start,
'EMAIL': request_obj.work_email or t['not_available_short'],
'REQUESTED_BY_NAME': requester_name,
'REQUESTED_BY_EMAIL': requester_email,
'SESSION_STATUS': session.get_status_display(),
'SESSION_COMPLETED_BY': session.completed_by_name or '-',
'SESSION_COMPLETED_AT': session.completed_at or '-',
'SESSION_UPDATED_AT': session.updated_at,
'SESSION_NOTES': session.notes or t['not_available_short'],
'INTRO_SECTIONS': exported_sections,
'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'],
'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'],
'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'],
'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'],
'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'],
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
return pdf_rendering._generate_onboarding_intro_session_pdf(
session,
admin_signature_name=admin_signature_name,
language_code=language_code,
)
def _generate_offboarding_pdf(request_obj: OffboardingRequest) -> Path:
lang = _normalized_lang(request_obj.preferred_language)
t = _pdf_texts(lang)
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'offboarding_{request_obj.id}')
output_pdf = settings.PDF_OUTPUT_DIR / f'offboarding_letter_{safe_name}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_offboarding_{safe_name}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'offboarding_template.html'
letterhead_path = get_portal_letterhead_path()
company_contact = get_company_contact_copy()
latest_onboarding = (
OnboardingRequest.objects.filter(work_email=request_obj.work_email)
.order_by('-created_at')
.first()
)
has_onboarding_data = latest_onboarding is not None
onboarding_hardware = _split_multiline(latest_onboarding.needed_devices) if latest_onboarding else []
selected_set = {item.lower() for item in onboarding_hardware}
hardware_catalog = [
'Laptop',
'Docking-Station',
'Tastatur und Maus',
'Kopfhörer',
'Tragetasche',
'Monitor',
'Schlüssel',
'Tischtelefon',
]
checklist = [{'label': item, 'selected': item.lower() in selected_set} for item in hardware_catalog]
extra_selected = [item for item in onboarding_hardware if item.lower() not in {x.lower() for x in hardware_catalog}]
for item in extra_selected:
checklist.append({'label': item, 'selected': True})
requester_email = request_obj.requested_by_email or t['not_available_short']
requester_name = request_obj.requested_by_name or _resolve_user_display_name(request_obj.requested_by_email) or t['not_available_short']
context = {
'T': t,
'PDF_LANG': lang,
'PDF_SECTIONS': build_pdf_sections('offboarding', request_obj, lang),
'FULL_NAME': request_obj.full_name,
'EMAIL': request_obj.work_email,
'DEPARTMENT': request_obj.department or t['not_available_short'],
'JOB_TITLE': request_obj.job_title or t['not_available_short'],
'LAST_WORKING_DAY': request_obj.last_working_day,
'NOTES': request_obj.notes or t['not_available_short'],
'REQUESTED_BY': requester_email,
'REQUESTED_BY_NAME': requester_name,
'HAS_ONBOARDING_DATA': has_onboarding_data,
'ONBOARDING_HARDWARE': onboarding_hardware,
'HARDWARE_CHECKLIST': checklist,
'MANUAL_FIELD_SECTIONS': _manual_onboarding_field_sections(),
'MANUAL_DEVICES': _chunk_choice_labels(DEVICE_CHOICES),
'MANUAL_SOFTWARE': _chunk_choice_labels(SOFTWARE_CHOICES),
'MANUAL_ACCESSES': _chunk_choice_labels(ACCESS_CHOICES),
'MANUAL_WORKSPACE_GROUPS': _chunk_choice_labels(WORKSPACE_GROUP_CHOICES),
'MANUAL_RESOURCES': _chunk_choice_labels(RESOURCE_CHOICES),
'MANUAL_EXTRA_HARDWARE': _chunk_choice_labels(HARDWARE_EXTRA_CHOICES),
'MANUAL_EXTRA_SOFTWARE': _chunk_choice_labels(SOFTWARE_EXTRA_CHOICES),
'COMPANY_LEGAL_NAME': company_contact['legal_company_name'] or company_contact['company_name'],
'COMPANY_ADDRESS': company_contact['address'] or t['not_available_short'],
'COMPANY_IT_CONTACT': company_contact['it_contact_email'] or t['not_available_short'],
'COMPANY_HR_CONTACT': company_contact['hr_contact_email'] or t['not_available_short'],
'COMPANY_PHONE': company_contact['phone_number'] or t['not_available_short'],
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
return pdf_rendering._generate_offboarding_pdf(request_obj)
@shared_task

View File

@@ -0,0 +1,82 @@
from django.utils.translation import gettext as _
from .models import AdminAuditLog
def display_user_name(user) -> str:
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
username = (getattr(user, 'username', '') or '').strip()
if username:
return username
return (getattr(user, 'email', '') or '').strip()
def audit(
request,
action: str,
*,
target_type: str = '',
target_id: int | None = None,
target_label: str = '',
details: dict | None = None,
) -> None:
if not getattr(request, 'user', None) or not request.user.is_authenticated:
return
AdminAuditLog.objects.create(
actor=request.user,
actor_display=display_user_name(request.user),
action=action,
target_type=target_type,
target_id=target_id,
target_label=target_label,
details=details or {},
)
def audit_action_label(action: str) -> str:
labels = {
'requests_deleted': _('Vorgänge gelöscht'),
'request_deleted': _('Vorgang gelöscht'),
'request_retried': _('Vorgang erneut angestoßen'),
'intro_pdf_generated': _('Einweisungs-PDF erzeugt'),
'intro_live_pdf_generated': _('Live-Protokoll erzeugt'),
'intro_session_reset': _('Einweisung zurückgesetzt'),
'intro_session_saved': _('Einweisung als Entwurf gespeichert'),
'intro_session_completed': _('Einweisung abgeschlossen'),
'form_option_deleted': _('Formularoption gelöscht'),
'form_options_saved': _('Formularoptionen gespeichert'),
'form_field_texts_saved': _('Feldtexte gespeichert'),
'form_layout_saved': _('Formularlayout gespeichert'),
'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'),
'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'),
'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'),
'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'),
'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'),
'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'),
'welcome_email_paused': _('Welcome E-Mail pausiert'),
'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'),
'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'),
'smtp_test_sent': _('SMTP-Test gesendet'),
'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'),
'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'),
'email_mode_toggled': _('E-Mail-Modus umgeschaltet'),
'integrations_saved': _('Integrationen gespeichert'),
'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'),
'mail_settings_saved': _('Mail-Einstellungen gespeichert'),
'email_routing_saved': _('E-Mail-Routing gespeichert'),
'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'),
'user_created': _('Benutzer erstellt'),
'user_updated': _('Benutzer aktualisiert'),
'user_password_reset_sent': _('Passwort-Reset-Link versendet'),
'user_deleted': _('Benutzer gelöscht'),
'backup_created': _('Backup erstellt'),
'backup_verified': _('Backup verifiziert'),
'backup_deleted': _('Backup gelöscht'),
'backup_settings_saved': _('Backup-Einstellungen gespeichert'),
'portal_app_registry_saved': _('App-Registry gespeichert'),
}
return labels.get(action, action.replace('_', ' ').strip().capitalize())

View File

@@ -0,0 +1,105 @@
from datetime import timedelta
from django.db.models import Count
from django.utils import timezone
from django.utils.translation import get_language, gettext as _, override
from .backup_ops import latest_backup_health_snapshot
from .form_builder import get_custom_field_configs
from .forms import OffboardingRequestForm, OnboardingRequestForm
from .models import AsyncTaskLog, OffboardingRequest, OnboardingRequest
from .roles import user_has_capability
def form_field_labels(form_type: str) -> dict[str, str]:
if form_type == 'onboarding':
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
if form_type == 'offboarding':
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
return {}
def request_target_label(obj, kind: str | None = None) -> str:
request_kind = (kind or '').strip()
if not request_kind:
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
email = (getattr(obj, 'work_email', '') or '').strip()
created_at = getattr(obj, 'created_at', None)
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
parts = [request_kind.capitalize(), name]
if email:
parts.append(f'<{email}>')
if date_label:
parts.append(date_label)
return ' | '.join(parts)
def request_status_label(status_key: str, language_code: str | None = None) -> str:
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
with override(lang):
labels = {
'submitted': _('Eingereicht'),
'processing': _('In Bearbeitung'),
'completed': _('Abgeschlossen'),
'failed': _('Fehlgeschlagen'),
}
return labels.get(status_key, status_key)
def request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]:
form_type = 'onboarding' if kind == 'onboarding' else 'offboarding'
language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower()
values = getattr(obj, 'custom_field_values', {}) or {}
rows = []
yes_label = 'Ja' if language_code == 'de' else 'Yes'
for cfg in get_custom_field_configs(form_type, include_inactive=True):
raw_value = values.get(cfg.field_key)
if raw_value in (None, '', False, []):
continue
if isinstance(raw_value, bool):
display_value = str(yes_label) if raw_value else ''
elif isinstance(raw_value, list):
display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip())
else:
display_value = str(raw_value).strip()
if not display_value:
continue
rows.append(
{
'label': cfg.translated_label(language_code),
'value': display_value,
'section': cfg.section_key,
'sort_order': cfg.sort_order,
}
)
rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label']))
return rows
def ops_summary_for_user(user) -> dict[str, object]:
can_view_jobs = user_has_capability(user, 'view_job_monitor')
can_manage_backups = user_has_capability(user, 'manage_backups')
summary: dict[str, object] = {
'show': can_view_jobs or can_manage_backups,
'can_view_jobs': can_view_jobs,
'can_manage_backups': can_manage_backups,
'failed_count_24h': 0,
'started_count_24h': 0,
'success_count_24h': 0,
'recent_failed_logs': [],
'backup_health': latest_backup_health_snapshot() if can_manage_backups else None,
}
if not can_view_jobs:
return summary
since = timezone.now() - timedelta(hours=24)
logs = AsyncTaskLog.objects.filter(started_at__gte=since)
counts = {row['status']: row['count'] for row in logs.values('status').annotate(count=Count('id'))}
summary['failed_count_24h'] = counts.get('failed', 0)
summary['started_count_24h'] = counts.get('started', 0)
summary['success_count_24h'] = counts.get('succeeded', 0)
summary['recent_failed_logs'] = list(
AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5]
)
return summary

View File

@@ -0,0 +1,231 @@
from django.utils.translation import gettext as _, gettext_lazy
from .form_builder import (
LOCKED_SECTION_RULES,
OFFBOARDING_PAGE_ORDER,
ensure_form_conditional_rule_configs,
get_section_definitions,
)
ONBOARDING_GROUPS = {
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
'employment-end-box': ['employment_end_date'],
'group-mailboxes-box': ['group_mailboxes'],
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
'extra-software-box': ['additional_software_multi', 'additional_software'],
'extra-access-box': ['additional_access_text'],
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
}
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
ONBOARDING_CHECKBOX_LISTS = {
'needed_devices_multi',
'additional_hardware_multi',
'needed_software_multi',
'additional_software_multi',
'needed_accesses_multi',
'needed_workspace_groups_multi',
'needed_resources_multi',
}
CONDITIONAL_RULE_OPERATOR_CHOICES = [
('checked', _('ist aktiviert')),
('equals', _('ist gleich')),
('not_equals', _('ist nicht gleich')),
]
ONBOARDING_SECTION_META = {
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
}
OFFBOARDING_SECTION_META = {
'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')},
'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')},
}
def field_rule_summary(*, is_visible: bool, is_required, locked: bool) -> str:
if locked:
return str(_('Fixes Kernfeld, immer sichtbar.'))
if not is_visible:
return str(_('Ausgeblendet, erscheint nicht im Formular.'))
if is_required is True:
return str(_('Sichtbar und als Pflichtfeld markiert.'))
if is_required is False:
return str(_('Sichtbar und optional.'))
return str(_('Sichtbar mit Standardverhalten.'))
def conditional_clause_sentence(clause: dict, field_label_map: dict[str, str]) -> str:
field_name = (clause.get('field') or '').strip()
operator = (clause.get('operator') or '').strip()
value = clause.get('value')
if not field_name or not operator:
return ''
field_label = field_label_map.get(field_name, field_name)
if operator == 'checked':
return _('%(field)s ist aktiviert') % {'field': field_label}
if operator == 'equals':
if value not in (None, ''):
return _('%(field)s ist gleich %(value)s') % {'field': field_label, 'value': value}
return _('%(field)s ist gleich') % {'field': field_label}
if operator == 'not_equals':
if value not in (None, ''):
return _('%(field)s ist nicht gleich %(value)s') % {'field': field_label, 'value': value}
return _('%(field)s ist nicht gleich') % {'field': field_label}
return _('%(field)s erfüllt die Bedingung') % {'field': field_label}
def conditional_rule_summary(clauses: list[dict], field_label_map: dict[str, str]) -> str:
active_clauses = [clause for clause in clauses if clause.get('field') and clause.get('operator')]
if not active_clauses:
return str(_('Immer sichtbar.'))
parts = [str(conditional_clause_sentence(clause, field_label_map)) for clause in active_clauses]
return str(_('Sichtbar, wenn %(conditions)s.') % {'conditions': ' und '.join(parts)})
def normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]:
configs = ensure_form_conditional_rule_configs(form_type)
payload = {}
for target_key, cfg in configs.items():
if not cfg.is_active:
continue
clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')]
if clauses:
payload[target_key] = {'all': clauses}
return payload
def active_conditional_target_keys(form_type: str) -> set[str]:
return set(normalized_conditional_rule_payload(form_type).keys())
def translate_choice_list(choices):
return [(value, str(label)) for value, label in choices]
def build_onboarding_layout(form) -> list[dict]:
ordered_names = list(form.fields.keys())
group_by_field = {}
for group_id, group_fields in ONBOARDING_GROUPS.items():
for name in group_fields:
group_by_field[name] = group_id
conditional_target_keys = active_conditional_target_keys('onboarding')
rendered_groups = set()
consumed = set()
blocks = []
for field_name in ordered_names:
if field_name in consumed:
continue
group_id = group_by_field.get(field_name)
if group_id:
if group_id in rendered_groups:
continue
group_fields = [form[name] for name in ONBOARDING_GROUPS[group_id] if name in form.fields]
if not group_fields:
continue
blocks.append(
{
'kind': 'group',
'id': group_id,
'hidden_default': group_id in conditional_target_keys,
'fields': group_fields,
}
)
rendered_groups.add(group_id)
consumed.update([f.name for f in group_fields])
continue
if field_name.startswith('custom__') and field_name in conditional_target_keys:
blocks.append(
{
'kind': 'group',
'id': field_name,
'hidden_default': True,
'fields': [form[field_name]],
}
)
consumed.add(field_name)
continue
blocks.append({'kind': 'field', 'field': form[field_name]})
consumed.add(field_name)
return blocks
def section_for_block(block: dict, field_pages: dict[str, str]) -> str:
if block['kind'] == 'field':
return field_pages.get(block['field'].name, 'abschluss')
fields = block.get('fields') or []
if not fields:
return 'abschluss'
return field_pages.get(fields[0].name, 'abschluss')
def build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]:
section_defs = get_section_definitions('onboarding')
section_order = [item['key'] for item in section_defs]
section_titles = {item['key']: item['title'] for item in section_defs}
grouped = {key: [] for key in section_order}
for block in blocks:
section_key = section_for_block(block, field_pages)
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(block)
visible_keys = visible_section_keys or set(section_order)
sections = []
custom_section_keys = {item['key'] for item in section_defs if item.get('is_custom')}
for key in section_order:
if key not in visible_keys:
continue
blocks_for_section = grouped[key]
has_custom_checkbox_fields = False
for block in blocks_for_section:
candidate_fields = [block['field']] if block['kind'] == 'field' else (block.get('fields') or [])
for bound_field in candidate_fields:
widget_type = getattr(getattr(bound_field.field, 'widget', None), 'input_type', '')
if bound_field.name.startswith('custom__') and widget_type == 'checkbox':
has_custom_checkbox_fields = True
break
if has_custom_checkbox_fields:
break
sections.append(
{
'key': key,
'title': section_titles.get(key, ONBOARDING_SECTION_META.get(key, {}).get('title', key)),
'subtitle': ONBOARDING_SECTION_META.get(key, {}).get('subtitle', ''),
'blocks': blocks_for_section,
'is_custom': key in custom_section_keys,
'has_custom_checkbox_fields': has_custom_checkbox_fields,
}
)
return sections
def build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]:
field_pages = getattr(form, '_field_page_keys', {})
grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER}
for field_name in form.fields.keys():
section_key = field_pages.get(field_name, 'abschluss')
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(form[field_name])
visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER)
return [
{
'key': key,
'title': OFFBOARDING_SECTION_META[key]['title'],
'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'],
'fields': grouped[key],
}
for key in OFFBOARDING_PAGE_ORDER
if key in visible_keys and grouped[key]
]

View File

@@ -0,0 +1,23 @@
from functools import wraps
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.shortcuts import redirect
from django.utils.translation import gettext as _
from .roles import user_has_capability
def require_capability(capability: str):
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped(request, *args, **kwargs):
if not user_has_capability(request.user, capability):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
return view_func(request, *args, **kwargs)
return wrapped
return decorator

View File

@@ -4,8 +4,6 @@ from datetime import timedelta
from tempfile import NamedTemporaryFile
import json
from io import BytesIO
from functools import wraps
from celery import current_app
from django.conf import settings
from django.db import connection
@@ -23,8 +21,7 @@ from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import gettext as _, gettext_lazy
from django.utils.translation import get_language, override
from django.utils.translation import gettext as _
from django.urls import reverse
from .app_registry import build_portal_app_sections, get_portal_app_registry_rows, normalize_portal_app_sort_orders
@@ -69,6 +66,28 @@ from .observability_views import (
job_monitor_page_impl,
verify_backup_from_admin_impl,
)
from .view_audit import audit as _audit, audit_action_label as _audit_action_label, display_user_name as _display_user_name
from .view_context import (
form_field_labels as _form_field_labels,
request_custom_field_details as _request_custom_field_details,
request_status_label as _request_status_label,
request_target_label as _request_target_label,
)
from .view_form_runtime import (
CONDITIONAL_RULE_OPERATOR_CHOICES,
ONBOARDING_CHECKBOX_LISTS,
ONBOARDING_GROUPS,
ONBOARDING_INLINE_CHECKS,
active_conditional_target_keys as _active_conditional_target_keys,
build_offboarding_sections as _build_offboarding_sections,
build_onboarding_layout as _build_onboarding_layout,
build_onboarding_sections as _build_onboarding_sections,
conditional_rule_summary as _conditional_rule_summary,
field_rule_summary as _field_rule_summary,
normalized_conditional_rule_payload as _normalized_conditional_rule_payload,
translate_choice_list as _translate_choice_list,
)
from .view_permissions import require_capability as _require_capability
from .models import AdminAuditLog, AsyncTaskLog, EmployeeProfile, FormConditionalRuleConfig, FormCustomFieldConfig, FormCustomSectionConfig, FormFieldConfig, FormOption, FormSectionConfig, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, PortalAppConfig, PortalBranding, PortalCompanyConfig, PortalTrialConfig, ScheduledWelcomeEmail, SystemEmailConfig, UserNotification, UserProfile, WorkflowConfig
from .emailing import send_system_email
from .notifications import notify_user
@@ -109,96 +128,6 @@ def mark_all_notifications_read(request):
UserNotification.objects.filter(user=request.user, read_at__isnull=True).update(read_at=timezone.now())
return _redirect_back(request, 'home')
ONBOARDING_GROUPS = {
'business-card-box': ['business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone'],
'employment-end-box': ['employment_end_date'],
'group-mailboxes-box': ['group_mailboxes'],
'extra-hardware-box': ['additional_hardware_multi', 'additional_hardware_other'],
'extra-software-box': ['additional_software_multi', 'additional_software'],
'extra-access-box': ['additional_access_text'],
'successor-box': ['successor_name', 'inherit_phone_number_choice'],
}
ONBOARDING_INLINE_CHECKS = {'order_business_cards', 'agreement_confirm'}
ONBOARDING_CHECKBOX_LISTS = {
'needed_devices_multi',
'additional_hardware_multi',
'needed_software_multi',
'additional_software_multi',
'needed_accesses_multi',
'needed_workspace_groups_multi',
'needed_resources_multi',
}
ONBOARDING_SECTION_META = {
'stammdaten': {'title': gettext_lazy('Stammdaten'), 'subtitle': gettext_lazy('Person, Rolle, Abteilung')},
'vertrag': {'title': gettext_lazy('Vertrag'), 'subtitle': gettext_lazy('Beschäftigung und Termine')},
'itsetup': {'title': gettext_lazy('IT-Setup'), 'subtitle': gettext_lazy('Geräte, Software und Zugänge')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Notizen und Freigabe')},
}
CONDITIONAL_RULE_OPERATOR_CHOICES = [
('checked', _('ist aktiviert')),
('equals', _('ist gleich')),
('not_equals', _('ist nicht gleich')),
]
def _field_rule_summary(*, is_visible: bool, is_required, locked: bool) -> str:
if locked:
return str(_('Fixes Kernfeld, immer sichtbar.'))
if not is_visible:
return str(_('Ausgeblendet, erscheint nicht im Formular.'))
if is_required is True:
return str(_('Sichtbar und als Pflichtfeld markiert.'))
if is_required is False:
return str(_('Sichtbar und optional.'))
return str(_('Sichtbar mit Standardverhalten.'))
def _conditional_clause_sentence(clause: dict, field_label_map: dict[str, str]) -> str:
field_name = (clause.get('field') or '').strip()
operator = (clause.get('operator') or '').strip()
value = clause.get('value')
if not field_name or not operator:
return ''
field_label = field_label_map.get(field_name, field_name)
if operator == 'checked':
return _('%(field)s ist aktiviert') % {'field': field_label}
if operator == 'equals':
if value not in (None, ''):
return _('%(field)s ist gleich %(value)s') % {'field': field_label, 'value': value}
return _('%(field)s ist gleich') % {'field': field_label}
if operator == 'not_equals':
if value not in (None, ''):
return _('%(field)s ist nicht gleich %(value)s') % {'field': field_label, 'value': value}
return _('%(field)s ist nicht gleich') % {'field': field_label}
return _('%(field)s erfüllt die Bedingung') % {'field': field_label}
def _conditional_rule_summary(clauses: list[dict], field_label_map: dict[str, str]) -> str:
active_clauses = [clause for clause in clauses if clause.get('field') and clause.get('operator')]
if not active_clauses:
return str(_('Immer sichtbar.'))
parts = [str(_conditional_clause_sentence(clause, field_label_map)) for clause in active_clauses]
return str(_('Sichtbar, wenn %(conditions)s.') % {'conditions': ' und '.join(parts)})
def _normalized_conditional_rule_payload(form_type: str) -> dict[str, dict]:
configs = ensure_form_conditional_rule_configs(form_type)
payload = {}
for target_key, cfg in configs.items():
if not cfg.is_active:
continue
clauses = [clause for clause in (cfg.clauses or []) if clause.get('field') and clause.get('operator')]
if clauses:
payload[target_key] = {'all': clauses}
return payload
def _active_conditional_target_keys(form_type: str) -> set[str]:
return set(_normalized_conditional_rule_payload(form_type).keys())
def healthz(request):
db_ok = True
try:
@@ -233,335 +162,6 @@ def account_profile_page(request):
return account_views.account_profile_page_impl(request)
def _require_capability(capability: str):
def decorator(view_func):
@wraps(view_func)
@login_required
def wrapped(request, *args, **kwargs):
if not user_has_capability(request.user, capability):
messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.'))
return redirect('home')
return view_func(request, *args, **kwargs)
return wrapped
return decorator
def _display_user_name(user) -> str:
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
username = (getattr(user, 'username', '') or '').strip()
if username:
return username
return (getattr(user, 'email', '') or '').strip()
def _audit(
request,
action: str,
*,
target_type: str = '',
target_id: int | None = None,
target_label: str = '',
details: dict | None = None,
) -> None:
if not getattr(request, 'user', None) or not request.user.is_authenticated:
return
AdminAuditLog.objects.create(
actor=request.user,
actor_display=_display_user_name(request.user),
action=action,
target_type=target_type,
target_id=target_id,
target_label=target_label,
details=details or {},
)
def _form_field_labels(form_type: str) -> dict[str, str]:
if form_type == 'onboarding':
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
if form_type == 'offboarding':
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
return {}
def _request_target_label(obj, kind: str | None = None) -> str:
request_kind = (kind or '').strip()
if not request_kind:
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
email = (getattr(obj, 'work_email', '') or '').strip()
created_at = getattr(obj, 'created_at', None)
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
parts = [request_kind.capitalize(), name]
if email:
parts.append(f'<{email}>')
if date_label:
parts.append(date_label)
return ' | '.join(parts)
def _request_status_label(status_key: str, language_code: str | None = None) -> str:
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
with override(lang):
labels = {
'submitted': _('Eingereicht'),
'processing': _('In Bearbeitung'),
'completed': _('Abgeschlossen'),
'failed': _('Fehlgeschlagen'),
}
return labels.get(status_key, status_key)
def _request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]:
form_type = 'onboarding' if kind == 'onboarding' else 'offboarding'
language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower()
values = getattr(obj, 'custom_field_values', {}) or {}
rows = []
yes_label = 'Ja' if language_code == 'de' else 'Yes'
for cfg in get_custom_field_configs(form_type, include_inactive=True):
raw_value = values.get(cfg.field_key)
if raw_value in (None, '', False, []):
continue
if isinstance(raw_value, bool):
display_value = str(yes_label) if raw_value else ''
elif isinstance(raw_value, list):
display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip())
else:
display_value = str(raw_value).strip()
if not display_value:
continue
rows.append(
{
'label': cfg.translated_label(language_code),
'value': display_value,
'section': cfg.section_key,
'sort_order': cfg.sort_order,
}
)
rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label']))
return rows
def _audit_action_label(action: str) -> str:
labels = {
'requests_deleted': _('Vorgänge gelöscht'),
'request_deleted': _('Vorgang gelöscht'),
'request_retried': _('Vorgang erneut angestoßen'),
'intro_pdf_generated': _('Einweisungs-PDF erzeugt'),
'intro_live_pdf_generated': _('Live-Protokoll erzeugt'),
'intro_session_reset': _('Einweisung zurückgesetzt'),
'intro_session_saved': _('Einweisung als Entwurf gespeichert'),
'intro_session_completed': _('Einweisung abgeschlossen'),
'form_option_deleted': _('Formularoption gelöscht'),
'form_options_saved': _('Formularoptionen gespeichert'),
'form_field_texts_saved': _('Feldtexte gespeichert'),
'form_layout_saved': _('Formularlayout gespeichert'),
'intro_checklist_item_deleted': _('Einweisungs-Checkpunkt gelöscht'),
'intro_checklist_item_added': _('Einweisungs-Checkpunkt hinzugefügt'),
'intro_checklist_saved': _('Einweisungs-Checkliste gespeichert'),
'welcome_email_triggered_now': _('Welcome E-Mail sofort ausgelöst'),
'welcome_email_settings_saved': _('Welcome E-Mail Einstellungen gespeichert'),
'welcome_email_bulk_action': _('Welcome E-Mail Sammelaktion ausgeführt'),
'welcome_email_paused': _('Welcome E-Mail pausiert'),
'welcome_email_resumed': _('Welcome E-Mail fortgesetzt'),
'welcome_email_cancelled': _('Welcome E-Mail abgebrochen'),
'smtp_test_sent': _('SMTP-Test gesendet'),
'nextcloud_test_upload': _('Nextcloud-Testupload ausgeführt'),
'nextcloud_mode_toggled': _('Nextcloud-Modus umgeschaltet'),
'email_mode_toggled': _('E-Mail-Modus umgeschaltet'),
'integrations_saved': _('Integrationen gespeichert'),
'nextcloud_settings_saved': _('Nextcloud-Einstellungen gespeichert'),
'mail_settings_saved': _('Mail-Einstellungen gespeichert'),
'email_routing_saved': _('E-Mail-Routing gespeichert'),
'notification_rules_saved': _('Benachrichtigungsregeln gespeichert'),
'user_created': _('Benutzer erstellt'),
'user_updated': _('Benutzer aktualisiert'),
'user_password_reset_sent': _('Passwort-Reset-Link versendet'),
'user_deleted': _('Benutzer gelöscht'),
'backup_created': _('Backup erstellt'),
'backup_verified': _('Backup verifiziert'),
'backup_deleted': _('Backup gelöscht'),
'backup_settings_saved': _('Backup-Einstellungen gespeichert'),
'portal_app_registry_saved': _('App-Registry gespeichert'),
}
return labels.get(action, action.replace('_', ' ').strip().capitalize())
def _translate_choice_list(choices):
return [(value, str(label)) for value, label in choices]
def _build_onboarding_layout(form) -> list[dict]:
ordered_names = list(form.fields.keys())
group_by_field = {}
for group_id, group_fields in ONBOARDING_GROUPS.items():
for name in group_fields:
group_by_field[name] = group_id
conditional_target_keys = _active_conditional_target_keys('onboarding')
rendered_groups = set()
consumed = set()
blocks = []
for field_name in ordered_names:
if field_name in consumed:
continue
group_id = group_by_field.get(field_name)
if group_id:
if group_id in rendered_groups:
continue
group_fields = [
form[name]
for name in ONBOARDING_GROUPS[group_id]
if name in form.fields
]
if not group_fields:
continue
blocks.append(
{
'kind': 'group',
'id': group_id,
'hidden_default': group_id in conditional_target_keys,
'fields': group_fields,
}
)
rendered_groups.add(group_id)
consumed.update([f.name for f in group_fields])
continue
if field_name.startswith('custom__') and field_name in conditional_target_keys:
blocks.append(
{
'kind': 'group',
'id': field_name,
'hidden_default': True,
'fields': [form[field_name]],
}
)
consumed.add(field_name)
continue
blocks.append({'kind': 'field', 'field': form[field_name]})
consumed.add(field_name)
return blocks
def _section_for_block(block: dict, field_pages: dict[str, str]) -> str:
if block['kind'] == 'field':
return field_pages.get(block['field'].name, 'abschluss')
fields = block.get('fields') or []
if not fields:
return 'abschluss'
return field_pages.get(fields[0].name, 'abschluss')
def _build_onboarding_sections(blocks: list[dict], field_pages: dict[str, str], visible_section_keys: set[str] | None = None) -> list[dict]:
section_defs = get_section_definitions('onboarding')
section_order = [item['key'] for item in section_defs]
section_titles = {item['key']: item['title'] for item in section_defs}
grouped = {key: [] for key in section_order}
for block in blocks:
section_key = _section_for_block(block, field_pages)
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(block)
visible_keys = visible_section_keys or set(section_order)
sections = []
custom_section_keys = {item['key'] for item in section_defs if item.get('is_custom')}
for key in section_order:
if key not in visible_keys:
continue
blocks_for_section = grouped[key]
has_custom_checkbox_fields = False
for block in blocks_for_section:
candidate_fields = [block['field']] if block['kind'] == 'field' else (block.get('fields') or [])
for bound_field in candidate_fields:
widget_type = getattr(getattr(bound_field.field, 'widget', None), 'input_type', '')
if bound_field.name.startswith('custom__') and widget_type == 'checkbox':
has_custom_checkbox_fields = True
break
if has_custom_checkbox_fields:
break
sections.append(
{
'key': key,
'title': section_titles.get(key, ONBOARDING_SECTION_META.get(key, {}).get('title', key)),
'subtitle': ONBOARDING_SECTION_META.get(key, {}).get('subtitle', ''),
'blocks': blocks_for_section,
'is_custom': key in custom_section_keys,
'has_custom_checkbox_fields': has_custom_checkbox_fields,
}
)
return sections
OFFBOARDING_SECTION_META = {
'mitarbeitende': {'title': gettext_lazy('Mitarbeitende'), 'subtitle': gettext_lazy('Person, Rolle und Bereich')},
'austritt': {'title': gettext_lazy('Austritt'), 'subtitle': gettext_lazy('Letzter Arbeitstag')},
'abschluss': {'title': gettext_lazy('Abschluss'), 'subtitle': gettext_lazy('Hinweise und Abschlussnotizen')},
}
def _build_offboarding_sections(form, visible_section_keys: set[str] | None = None) -> list[dict]:
field_pages = getattr(form, '_field_page_keys', {})
grouped = {key: [] for key in OFFBOARDING_PAGE_ORDER}
for field_name in form.fields.keys():
section_key = field_pages.get(field_name, 'abschluss')
if section_key not in grouped:
section_key = 'abschluss'
grouped[section_key].append(form[field_name])
visible_keys = visible_section_keys or set(OFFBOARDING_PAGE_ORDER)
return [
{
'key': key,
'title': OFFBOARDING_SECTION_META[key]['title'],
'subtitle': OFFBOARDING_SECTION_META[key]['subtitle'],
'fields': grouped[key],
}
for key in OFFBOARDING_PAGE_ORDER
if key in visible_keys and grouped[key]
]
def _ops_summary_for_user(user) -> dict[str, object]:
can_view_jobs = user_has_capability(user, 'view_job_monitor')
can_manage_backups = user_has_capability(user, 'manage_backups')
summary: dict[str, object] = {
'show': can_view_jobs or can_manage_backups,
'can_view_jobs': can_view_jobs,
'can_manage_backups': can_manage_backups,
'failed_count_24h': 0,
'started_count_24h': 0,
'success_count_24h': 0,
'recent_failed_logs': [],
'backup_health': latest_backup_health_snapshot() if can_manage_backups else None,
}
if not can_view_jobs:
return summary
since = timezone.now() - timedelta(hours=24)
logs = AsyncTaskLog.objects.filter(started_at__gte=since)
counts = {
row['status']: row['count']
for row in logs.values('status').annotate(count=Count('id'))
}
summary['failed_count_24h'] = counts.get('failed', 0)
summary['started_count_24h'] = counts.get('started', 0)
summary['success_count_24h'] = counts.get('succeeded', 0)
summary['recent_failed_logs'] = list(
AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5]
)
return summary
@login_required
def home(request):
config, _ = WorkflowConfig.objects.get_or_create(name='Default')
@@ -957,13 +557,24 @@ def backup_recovery_page(request):
@_require_capability('manage_backups')
@require_POST
def create_backup_from_admin(request):
return create_backup_from_admin_impl(request, audit_fn=_audit)
return create_backup_from_admin_impl(
request,
audit_fn=_audit,
notify_user_fn=notify_user,
create_backup_bundle_fn=create_backup_bundle,
)
@_require_capability('manage_backups')
@require_POST
def verify_backup_from_admin(request, backup_name: str):
return verify_backup_from_admin_impl(request, backup_name, audit_fn=_audit)
return verify_backup_from_admin_impl(
request,
backup_name,
audit_fn=_audit,
notify_user_fn=notify_user,
verify_backup_bundle_fn=verify_backup_bundle,
)
@_require_capability('manage_backups')