Files
workdock-platform/backend/workflows/tasks.py

1089 lines
43 KiB
Python

from pathlib import Path
from datetime import timedelta
import base64
import mimetypes
import re
from celery import shared_task
from django.contrib.auth import get_user_model
from django.conf import settings
from django.utils import timezone
from jinja2 import Template
from pypdf import PageObject, PdfReader, PdfWriter
from xhtml2pdf import pisa
from .models import EmployeeProfile, IntroChecklistItem, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig
from .emailing import send_system_email
from .services import upload_to_nextcloud
from .services import get_email_test_redirect, is_email_test_mode
from .forms import (
ACCESS_CHOICES,
DEVICE_CHOICES,
HARDWARE_EXTRA_CHOICES,
OnboardingRequestForm,
RESOURCE_CHOICES,
SOFTWARE_CHOICES,
SOFTWARE_EXTRA_CHOICES,
WORKSPACE_GROUP_CHOICES,
)
DEFAULT_NOTIFICATION_TEMPLATES = {
'onboarding_it': {
'subject': '[Onboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Neue Onboarding-Anfrage für {{ FULL_NAME }}.\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Vertragsbeginn: {{ CONTRACT_START }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
'Bitte IT-Setup vorbereiten.'
),
},
'onboarding_general_info': {
'subject': '[Info Onboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Hallo,\n\n'
'{{ FULL_NAME }} wird onboarded.\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Vertragsbeginn: {{ CONTRACT_START }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
'onboarding_business_card': {
'subject': '[Visitenkarte] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Hallo,\n\n'
'bitte Visitenkarten erstellen:\n'
'Name: {{ BUSINESS_CARD_NAME }}\n'
'Titel: {{ BUSINESS_CARD_TITLE }}\n'
'E-Mail: {{ BUSINESS_CARD_EMAIL }}\n'
'Telefon: {{ BUSINESS_CARD_PHONE }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
'onboarding_hr_works': {
'subject': '[HR Works] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Hello Stefanie,\n\n'
'Es ist wieder soweit. Zuwachs!\n\n'
'Könntest du deshalb bitte ein HR Works Konto mit den folgenden Daten erstellen:\n\n'
'Name: {{ VORNAME }} {{ NACHNAME }}\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Vertragsbeginn: {{ CONTRACT_START }}\n'
'E-Mail-Adresse: {{ EMAIL }}\n\n'
'{% if PDF_LINK %}In 2 Minuten findest du alle Infos über den Mitarbeiter als PDF unter diesem Link: {{ PDF_LINK }}\n\n{% endif %}'
'Falls du noch irgendwelche anderen Informationen benötigen solltest, kannst du dich bei der it@tub.co melden!\n\n'
'Vielen Dank und schöne Grüße,\n'
'Die IT.'
),
},
'onboarding_key': {
'subject': '[Schlüssel] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Hallo,\n\n'
'bitte Schlüssel vorbereiten für:\n'
'Name: {{ FULL_NAME }}\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Vertragsbeginn: {{ CONTRACT_START }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
'onboarding_reference': {
'subject': '[Referenz Onboarding] {{ FULL_NAME }} | Ihre Anfrage',
'body': (
'Diese E-Mail dient als Referenz für Ihre Onboarding-Anfrage.\n'
'Name: {{ FULL_NAME }}\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Vertragsbeginn: {{ CONTRACT_START }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
'onboarding_welcome': {
'subject': 'Willkommen bei TUB/CO, {{ VORNAME }}',
'body': (
'Hallo {{ FULL_NAME }},\n\n'
'herzlich willkommen bei TUB/CO.\n'
'Wir freuen uns sehr, dass du ab dem {{ CONTRACT_START }} unser Team in der Abteilung {{ DEPARTMENT }} verstärkst.\n\n'
'Deine dienstliche E-Mail-Adresse lautet: {{ EMAIL }}.\n'
'Im Anhang findest du deine Onboarding-Unterlagen als PDF.\n\n'
'Wenn du Fragen hast, melde dich gerne jederzeit.\n\n'
'Viele Grüße\n'
'TUB/CO IT'
),
},
'offboarding_it': {
'subject': '[Offboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Neue Offboarding-Anfrage für {{ FULL_NAME }}.\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Letzter Arbeitstag: {{ LAST_WORKING_DAY }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
'Bitte IT-Offboarding durchführen.'
),
},
'offboarding_general_info': {
'subject': '[Info Offboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Neue Offboarding-Anfrage für {{ FULL_NAME }}.\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Letzter Arbeitstag: {{ LAST_WORKING_DAY }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
'offboarding_hr_works_disable': {
'subject': '[HR Works Deaktivierung] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}',
'body': (
'Bitte HR Works Zugriff deaktivieren für {{ FULL_NAME }} ({{ EMAIL }}) zum {{ LAST_WORKING_DAY }}.\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
'offboarding_reference': {
'subject': '[Referenz Offboarding] {{ FULL_NAME }} | Ihre Anfrage',
'body': (
'Diese E-Mail dient als Referenz für Ihre Offboarding-Anfrage.\n'
'Name: {{ FULL_NAME }}\n'
'Abteilung: {{ DEPARTMENT }}\n'
'Letzter Arbeitstag: {{ LAST_WORKING_DAY }}\n'
'Angefordert von: {{ REQUESTED_BY }}\n'
),
},
}
def _split_name(full_name: str) -> tuple[str, str]:
parts = full_name.split()
if not parts:
return '', ''
return parts[0], ' '.join(parts[1:])
def _safe_filename_fragment(text: str, fallback: str = 'document') -> str:
value = re.sub(r'[^A-Za-z0-9._-]+', '_', (text or '').strip()).strip('._')
return value[:120] if value else fallback
def _resolve_user_display_name(email: str) -> str:
email = (email or '').strip().lower()
if not email:
return ''
user_model = get_user_model()
user = user_model.objects.filter(email__iexact=email).first()
if not user:
return ''
first_name = (getattr(user, 'first_name', '') or '').strip()
last_name = (getattr(user, 'last_name', '') or '').strip()
full_name = f'{first_name} {last_name}'.strip()
if full_name:
return full_name
return (getattr(user, 'username', '') or '').strip()
def _chunk_list(data_list: list[str], chunk_size: int = 3) -> list[list[str]]:
items = [i.strip() for i in data_list if i and i.strip()]
chunks = []
for i in range(0, len(items), chunk_size):
chunks.append(items[i : i + chunk_size])
return chunks
def _split_multiline(text: str) -> list[str]:
return [line.strip() for line in (text or '').split('\n') if line.strip()]
def _chunk_choice_labels(choices: list[tuple[str, str]], chunk_size: int = 3) -> list[list[str]]:
labels = [label for _, label in choices]
return _chunk_list(labels, chunk_size=chunk_size)
MANUAL_ONBOARDING_FIELD_SECTIONS = [
(
'Stammdaten',
[
'gender',
'first_name',
'last_name',
'job_title',
'department',
'work_email',
'order_business_cards',
'business_card_name',
'business_card_title',
'business_card_email',
'business_card_phone',
],
),
(
'Vertrag',
[
'contract_start',
'employment_type',
'employment_end_date',
'handover_date',
],
),
(
'IT-Setup',
[
'group_mailboxes_required_choice',
'group_mailboxes',
'additional_hardware_needed_choice',
'additional_hardware_other',
'additional_software_needed_choice',
'additional_software',
'additional_access_needed_choice',
'additional_access_text',
'successor_required_choice',
'successor_name',
'inherit_phone_number_choice',
'phone_number_choice',
],
),
(
'Abschluss',
[
'additional_notes',
'signature_image',
'agreement_confirm',
],
),
]
def _manual_onboarding_field_sections() -> list[dict]:
fields = OnboardingRequestForm.base_fields
sections = []
for title, field_names in MANUAL_ONBOARDING_FIELD_SECTIONS:
labels = [str(fields[name].label or name) for name in field_names if name in fields]
if not labels:
continue
sections.append({'title': title, 'rows': _chunk_list(labels, chunk_size=2)})
return sections
def _resolve_workflow_emails() -> tuple[str, str, str, str, str]:
config = WorkflowConfig.objects.order_by('id').first()
it_email = (config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL)
general_info_email = (config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL)
business_card_email = (config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL)
hr_works_email = (config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL)
key_email = (config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL)
return it_email, general_info_email, business_card_email, hr_works_email, key_email
def _matches_intro_condition(request_obj: OnboardingRequest, item: IntroChecklistItem) -> bool:
operator = (item.condition_operator or 'always').strip()
field_name = (item.condition_field or '').strip()
expected = (item.condition_value or '').strip()
if operator == 'always' or not field_name:
return True
raw_value = getattr(request_obj, field_name, '')
if raw_value is None:
raw_value = ''
if operator == 'is_true':
return bool(raw_value)
if operator == 'is_false':
return not bool(raw_value)
text_value = str(raw_value).strip()
if operator == 'equals':
return text_value.lower() == expected.lower()
if operator == 'contains':
return expected.lower() in text_value.lower()
return True
def _build_intro_sections_from_admin(request_obj: OnboardingRequest) -> dict[str, list[str]]:
items = list(IntroChecklistItem.objects.filter(is_active=True).order_by('section', 'sort_order', 'label'))
if not items:
return {}
section_map = {key: [] for key, _label in IntroChecklistItem.SECTION_CHOICES}
for item in items:
if item.section not in section_map:
continue
if _matches_intro_condition(request_obj, item):
section_map[item.section].append(item.label)
return {key: values for key, values in section_map.items() if values}
def build_intro_sections_for_request(request_obj: OnboardingRequest) -> list[dict]:
devices = _split_multiline(request_obj.needed_devices)
software = _split_multiline(request_obj.needed_software)
accesses = _split_multiline(request_obj.needed_accesses)
groups = _split_multiline(request_obj.needed_workspace_groups)
resources = _split_multiline(request_obj.needed_resources)
extra_hardware = _split_multiline(request_obj.additional_hardware)
extra_software = _split_multiline(request_obj.additional_software)
group_mailboxes = _split_multiline(request_obj.group_mailboxes)
workplace_items = []
for item in devices:
workplace_items.append(f'{item} übergeben und Grundfunktionen erklärt')
for item in resources:
workplace_items.append(f'{item} gezeigt bzw. Nutzung erklärt')
if request_obj.phone_number:
workplace_items.append(f'Telefonnummer / Direktwahl erklärt: {request_obj.phone_number}')
if not workplace_items:
workplace_items.append('Arbeitsplatz, Geräte und allgemeine Nutzung besprochen')
account_items = [f'{item} Zugang erklärt' for item in accesses]
account_items.extend([f'{item} Gruppe / Berechtigung erläutert' for item in groups])
if request_obj.work_email:
account_items.insert(0, f'Dienstliche E-Mail-Adresse erläutert: {request_obj.work_email}')
if group_mailboxes:
account_items.extend([f'Gruppenpostfach erklärt: {item}' for item in group_mailboxes])
if not account_items:
account_items.append('Zugänge, Konten und Anmeldelogik besprochen')
software_items = [f'{item} Einführung durchgeführt' for item in software]
software_items.extend([f'{item} zusätzlich besprochen' for item in extra_software])
if not software_items:
software_items.append('Benötigte Standardsoftware und tägliche Nutzung erklärt')
process_items = [
'Passwortregeln und sicherer Umgang besprochen',
'Dateiablage, Nextcloud und Freigaben erklärt',
'Kommunikationswege und Support-Prozess erklärt',
]
if extra_hardware:
process_items.extend([f'{item} als zusätzliche Ausstattung besprochen' for item in extra_hardware])
if request_obj.additional_access_text:
process_items.extend([f'Zusätzlicher Zugang besprochen: {item}' for item in _split_multiline(request_obj.additional_access_text)])
if request_obj.successor_name:
process_items.append(f'Übergabe-/Nachfolgekontext besprochen: {request_obj.successor_name}')
custom_intro_items = _build_intro_sections_from_admin(request_obj)
intro_sections_raw = [
('workplace', 'Geräte und Arbeitsplatz', workplace_items),
('accounts', 'Konten und Berechtigungen', account_items),
('software', 'Software und Tools', software_items),
('process', 'Prozesse und Hinweise', process_items),
]
sections = []
for key, title, default_items in intro_sections_raw:
merged_items = list(default_items)
merged_items.extend(custom_intro_items.get(key, []))
section_items = []
for idx, label in enumerate(merged_items, start=1):
section_items.append({'id': f'{key}_{idx}', 'label': label})
if section_items:
sections.append({'key': key, 'title': title, 'items': section_items})
return sections
def _send_workflow_email(
subject: str,
body: str,
to: list[str],
attachments: list[Path] | None = None,
from_email: str | None = None,
) -> None:
recipients = [r for r in to if r]
if not recipients:
return
effective_to = recipients
effective_body = body
if is_email_test_mode():
effective_to = [get_email_test_redirect()]
effective_body = (
"[TEST MODE] Diese E-Mail wurde umgeleitet.\n"
f"Originale Empfänger: {', '.join(recipients)}\n\n{body}"
)
send_system_email(
subject=subject,
body=effective_body,
to=effective_to,
attachments=[str(a) for a in (attachments or [])],
from_email=from_email,
)
def _render_notification_template(template_key: str, context: dict) -> tuple[str, str]:
db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first()
if db_template:
subject_template = db_template.subject_template
body_template = db_template.body_template
else:
fallback = DEFAULT_NOTIFICATION_TEMPLATES[template_key]
subject_template = fallback['subject']
body_template = fallback['body']
subject = Template(subject_template).render(context).strip()
body = Template(body_template).render(context).strip()
return subject, body
def _parse_recipients(raw: str) -> list[str]:
if not raw:
return []
cleaned = raw.replace(';', ',').replace('\n', ',')
return [x.strip() for x in cleaned.split(',') if x.strip()]
def _as_bool(value) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
text = str(value).strip().lower()
return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'}
def _rule_matches(rule: NotificationRule, request_obj) -> bool:
if rule.operator == 'always':
return True
raw_value = getattr(request_obj, rule.field_name, '')
actual = '' if raw_value is None else str(raw_value)
expected = (rule.expected_value or '').strip()
if rule.operator == 'contains':
return expected.lower() in actual.lower()
if rule.operator == 'equals':
return actual.strip().lower() == expected.lower()
if rule.operator == 'is_true':
return _as_bool(raw_value)
if rule.operator == 'is_false':
return not _as_bool(raw_value)
return False
def _apply_notification_rules(
event_type: str,
request_obj,
context: dict,
pdf_path: Path | None = None,
) -> None:
rules = NotificationRule.objects.filter(event_type=event_type, is_active=True).order_by('sort_order', 'id')
for rule in rules:
if not _rule_matches(rule, request_obj):
continue
recipients = _parse_recipients(rule.recipients)
if not recipients:
continue
attachments = [pdf_path] if (pdf_path and rule.include_pdf_attachment) else None
template_key = (rule.template_key or '').strip()
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
if template_key and template_key in known_keys:
_send_templated_email(
template_key=template_key,
context=context,
to=recipients,
attachments=attachments,
)
continue
subject = (rule.custom_subject or '').strip()
body = (rule.custom_body or '').strip()
if not subject and not body:
continue
subject_rendered = Template(subject or f'[{event_type}] Regelmail').render(context).strip()
body_rendered = Template(body or '-').render(context).strip()
_send_workflow_email(
subject=subject_rendered,
body=body_rendered,
to=recipients,
attachments=attachments,
)
def _schedule_welcome_email(request_obj: OnboardingRequest) -> None:
recipient = (request_obj.work_email or '').strip().lower()
if not recipient:
return
config = WorkflowConfig.objects.order_by('id').first()
delay_days = 5
if config:
delay_days = max(0, int(config.welcome_email_delay_days or 5))
send_at = timezone.now() + timedelta(days=delay_days)
scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create(
onboarding_request=request_obj,
defaults={
'recipient_email': recipient,
'send_at': send_at,
'status': 'scheduled',
'last_error': '',
'sent_at': None,
},
)
try:
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=send_at)
scheduled.celery_task_id = async_result.id or ''
scheduled.save(update_fields=['celery_task_id', 'updated_at'])
except Exception as exc:
scheduled.status = 'failed'
scheduled.last_error = f'Scheduling failed: {exc}'
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
def _send_templated_email(
template_key: str,
to: list[str],
context: dict,
attachments: list[Path] | None = None,
from_email: str | None = None,
) -> None:
subject, body = _render_notification_template(template_key, context)
_send_workflow_email(subject=subject, body=body, to=to, attachments=attachments, from_email=from_email)
def _render_html(template_path: Path, context: dict) -> str:
with template_path.open('r', encoding='utf-8') as handle:
template = Template(handle.read())
return template.render(context)
def _generate_content_pdf(html_content: str, output_pdf: Path) -> None:
page_style = (
'<style>'
'@page { size: A4; margin: 58mm 16mm 16mm 16mm; }'
'body { margin: 0; }'
'</style>'
)
if '<head>' in html_content:
html_content = html_content.replace('<head>', f'<head>{page_style}', 1)
else:
html_content = page_style + html_content
output_pdf.parent.mkdir(parents=True, exist_ok=True)
with output_pdf.open('wb') as fp:
result = pisa.CreatePDF(
src=html_content,
dest=fp,
encoding='utf-8',
)
if result.err:
raise RuntimeError(f'Failed to render PDF content for {output_pdf.name}')
def _overlay_with_letterhead(content_pdf: Path, letterhead_pdf: Path, output_pdf: Path) -> None:
letterhead_reader = PdfReader(str(letterhead_pdf))
content_reader = PdfReader(str(content_pdf))
writer = PdfWriter()
letterhead_page = letterhead_reader.pages[0]
for page in content_reader.pages:
merged = PageObject.create_blank_page(
width=letterhead_page.mediabox.width,
height=letterhead_page.mediabox.height,
)
merged.merge_page(letterhead_page)
merged.merge_page(page)
writer.add_page(merged)
with output_pdf.open('wb') as fp:
writer.write(fp)
def _generate_onboarding_pdf(request_obj: OnboardingRequest) -> Path:
first_name, last_name = _split_name(request_obj.full_name)
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_{request_obj.id}')
output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_letter_{safe_name}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_{safe_name}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_template.html'
letterhead_path = settings.PDF_TEMPLATES_DIR / 'templates.pdf'
devices = _split_multiline(request_obj.needed_devices)
software = _split_multiline(request_obj.needed_software)
accesses = _split_multiline(request_obj.needed_accesses)
groups = _split_multiline(request_obj.needed_workspace_groups)
resources = _split_multiline(request_obj.needed_resources)
group_mailboxes_list = _split_multiline(request_obj.group_mailboxes or '')
additional_hardware_list = _split_multiline(request_obj.additional_hardware or '')
additional_software_list = _split_multiline(request_obj.additional_software or '')
additional_access_list = _split_multiline(request_obj.additional_access_text or '')
signature_src = ''
signature_note = '-'
if getattr(request_obj, 'signature_image', None):
try:
signature_path = Path(request_obj.signature_image.path).resolve()
with signature_path.open('rb') as sig_fp:
encoded = base64.b64encode(sig_fp.read()).decode('ascii')
mime_type = mimetypes.guess_type(signature_path.name)[0] or 'image/png'
signature_src = f"data:{mime_type};base64,{encoded}"
signature_note = 'Digitale Signatur als Bilddatei hinterlegt.'
except Exception:
signature_src = ''
signature_note = request_obj.signature_url or '-'
elif request_obj.signature_url:
signature_note = request_obj.signature_url
requester_email = request_obj.onboarded_by_email or '-'
requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-'
gender = (request_obj.get_gender_display() or '-').strip() or '-'
employment_type = (request_obj.employment_type or '-').strip() or '-'
employment_end = request_obj.employment_end_date or '-'
order_business_cards = bool(request_obj.order_business_cards)
group_mailboxes = (request_obj.group_mailboxes or '').strip()
additional_hardware_other = (request_obj.additional_hardware_other or '').strip()
additional_hardware = (request_obj.additional_hardware or '').strip()
additional_software = (request_obj.additional_software or '').strip()
additional_access_text = (request_obj.additional_access_text or '').strip()
successor_name = (request_obj.successor_name or '').strip()
additional_notes = (request_obj.additional_notes or '').strip()
phone_number = (request_obj.phone_number or '').strip()
display_name = f"{gender} {first_name} {last_name}".strip() if gender and gender != '-' else f"{first_name} {last_name}".strip()
context = {
'VORNAME': first_name,
'NACHNAME': last_name,
'DISPLAY_NAME': display_name or request_obj.full_name,
'ANREDE': gender,
'BERUFSBEZEICHNUNG': request_obj.job_title or 'N/A',
'ABTEILUNG': request_obj.department or 'N/A',
'EMAIL': request_obj.work_email or 'N/A',
'VERTRAGSBEGINN': request_obj.contract_start,
'BESCHAEFTIGUNG': employment_type,
'VERTRAGSENDE': employment_end,
'UEBERGABEDATUM': request_obj.handover_date or '-',
'ARBEITSGERAETE_TEXT': ' | '.join(devices) if devices else 'Keine Angabe',
'WORKSPACE_GROUPS_TEXT': ' | '.join(groups) if groups else 'Keine Angabe',
'SOFTWARE_TEXT': ' | '.join(software) if software else 'Keine Angabe',
'ZUGAENGE_TEXT': ' | '.join(accesses) if accesses else 'Keine Angabe',
'RESSOURCEN_TEXT': ' | '.join(resources) if resources else 'Keine Angabe',
'VISITENKARTE_BESTELLT': order_business_cards,
'HAS_VISITENKARTE_DATEN': order_business_cards and any(
[
(request_obj.business_card_name or '').strip(),
(request_obj.business_card_title or '').strip(),
(request_obj.business_card_email or '').strip(),
(request_obj.business_card_phone or '').strip(),
]
),
'VISITENKARTE_NAME': request_obj.business_card_name or '-',
'VISITENKARTE_TITEL': request_obj.business_card_title or '-',
'VISITENKARTE_EMAIL': request_obj.business_card_email or '-',
'VISITENKARTE_TELEFON': request_obj.business_card_phone or '-',
'GROUP_MAILBOXES': group_mailboxes or 'Keine Angabe',
'ADDITIONAL_HARDWARE_OTHER': additional_hardware_other or 'Keine Angabe',
'ADDITIONAL_HARDWARE': additional_hardware or 'Keine Angabe',
'ADDITIONAL_SOFTWARE': additional_software or 'Keine Angabe',
'ADDITIONAL_ACCESS_TEXT': additional_access_text or 'Keine Angabe',
'SUCCESSOR_NAME': successor_name or 'Keine Angabe',
'PHONE_NUMBER': phone_number or '-',
'INHERIT_PHONE_NUMBER': 'Ja' if request_obj.inherit_phone_number else 'Nein',
'ADDITIONAL_NOTES': additional_notes or 'Keine Angabe',
'GROUP_MAILBOXES_REQUIRED': bool(request_obj.group_mailboxes_required),
'ADDITIONAL_HARDWARE_NEEDED': bool(request_obj.additional_hardware_needed),
'ADDITIONAL_SOFTWARE_NEEDED': bool(request_obj.additional_software_needed),
'ADDITIONAL_ACCESS_NEEDED': bool(request_obj.additional_access_needed),
'HAS_DEVICES': bool(devices),
'HAS_GROUPS': bool(groups),
'HAS_SOFTWARE': bool(software),
'HAS_ACCESSES': bool(accesses),
'HAS_RESOURCES': bool(resources),
'HAS_GROUP_MAILBOXES': bool(group_mailboxes_list),
'HAS_ADDITIONAL_HARDWARE': bool(additional_hardware_list),
'HAS_ADDITIONAL_SOFTWARE': bool(additional_software_list),
'HAS_ADDITIONAL_ACCESS': bool(additional_access_list),
'HAS_ADDITIONAL_HARDWARE_OTHER': bool(additional_hardware_other),
'HAS_SUCCESSOR_INFO': bool(successor_name) or bool(request_obj.inherit_phone_number) or bool(phone_number),
'HAS_ADDITIONAL_NOTES': bool(additional_notes),
'GROUP_MAILBOXES_LIST': _chunk_list(group_mailboxes_list),
'ADDITIONAL_HARDWARE_LIST': _chunk_list(additional_hardware_list),
'ADDITIONAL_SOFTWARE_LIST': _chunk_list(additional_software_list),
'ADDITIONAL_ACCESS_LIST': _chunk_list(additional_access_list),
'ZUGAENGE_LIST': _chunk_list(groups),
'ARBEITSGERÄTE_LIST': _chunk_list(devices),
'SOFTWARE_LIST': _chunk_list(software),
'ACCOUNT_LIST': _chunk_list(accesses),
'STANDARD_RESSOURCEN': _chunk_list(resources),
'UNTERSCHRIFT': signature_src,
'UNTERSCHRIFT_HINWEIS': signature_note,
'REQUESTED_BY_NAME': requester_name,
'REQUESTED_BY_EMAIL': requester_email,
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
def _generate_onboarding_intro_pdf(request_obj: OnboardingRequest) -> Path:
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_intro_{request_obj.id}')
output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_intro_{safe_name}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_intro_{safe_name}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_intro_template.html'
letterhead_path = settings.PDF_TEMPLATES_DIR / 'templates.pdf'
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip() if salutation else request_obj.full_name
intro_sections = [
{
'title': section['title'],
'rows': _chunk_list([item['label'] for item in section['items']], chunk_size=2),
}
for section in build_intro_sections_for_request(request_obj)
]
requester_email = request_obj.onboarded_by_email or '-'
requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-'
context = {
'DISPLAY_NAME': display_name,
'ABTEILUNG': request_obj.department or '-',
'BERUFSBEZEICHNUNG': request_obj.job_title or '-',
'VERTRAGSBEGINN': request_obj.contract_start,
'EMAIL': request_obj.work_email or '-',
'REQUESTED_BY_NAME': requester_name,
'REQUESTED_BY_EMAIL': requester_email,
'INTRO_SECTIONS': intro_sections,
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
def _generate_onboarding_intro_session_pdf(session: OnboardingIntroductionSession, admin_signature_name: str = '-') -> Path:
request_obj = session.onboarding_request
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_intro_session_{request_obj.id}')
version = timezone.now().strftime('%Y%m%d%H%M%S')
output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_intro_session_{safe_name}_{version}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_intro_session_{safe_name}_{version}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_intro_session_pdf.html'
letterhead_path = settings.PDF_TEMPLATES_DIR / 'templates.pdf'
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip() if salutation else request_obj.full_name
raw_sections = build_intro_sections_for_request(request_obj)
checked_map = session.checklist_state or {}
exported_sections = []
checked_count = 0
total_count = 0
for section in raw_sections:
checked_items = []
for item in section['items']:
checked = bool(checked_map.get(item['id']))
total_count += 1
if checked:
checked_count += 1
checked_items.append({'label': item['label']})
if checked_items:
exported_sections.append({
'title': section['title'],
'rows': [checked_items[i:i + 2] for i in range(0, len(checked_items), 2)],
})
requester_email = request_obj.onboarded_by_email or '-'
requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-'
context = {
'DISPLAY_NAME': display_name,
'ABTEILUNG': request_obj.department or '-',
'BERUFSBEZEICHNUNG': request_obj.job_title or '-',
'VERTRAGSBEGINN': request_obj.contract_start,
'EMAIL': request_obj.work_email or '-',
'REQUESTED_BY_NAME': requester_name,
'REQUESTED_BY_EMAIL': requester_email,
'SESSION_STATUS': session.get_status_display(),
'SESSION_COMPLETED_BY': session.completed_by_name or '-',
'SESSION_COMPLETED_AT': session.completed_at or '-',
'SESSION_UPDATED_AT': session.updated_at,
'SESSION_NOTES': session.notes or '-',
'INTRO_SECTIONS': exported_sections,
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
def _generate_offboarding_pdf(request_obj: OffboardingRequest) -> Path:
safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'offboarding_{request_obj.id}')
output_pdf = settings.PDF_OUTPUT_DIR / f'offboarding_letter_{safe_name}.pdf'
temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_offboarding_{safe_name}.pdf'
template_path = settings.PDF_TEMPLATES_DIR / 'offboarding_template.html'
letterhead_path = settings.PDF_TEMPLATES_DIR / 'templates.pdf'
latest_onboarding = (
OnboardingRequest.objects.filter(work_email=request_obj.work_email)
.order_by('-created_at')
.first()
)
has_onboarding_data = latest_onboarding is not None
onboarding_hardware = _split_multiline(latest_onboarding.needed_devices) if latest_onboarding else []
selected_set = {item.lower() for item in onboarding_hardware}
hardware_catalog = [
'Laptop',
'Docking-Station',
'Tastatur und Maus',
'Kopfhörer',
'Tragetasche',
'Monitor',
'Schlüssel',
'Tischtelefon',
]
checklist = [{'label': item, 'selected': item.lower() in selected_set} for item in hardware_catalog]
extra_selected = [item for item in onboarding_hardware if item.lower() not in {x.lower() for x in hardware_catalog}]
for item in extra_selected:
checklist.append({'label': item, 'selected': True})
requester_email = request_obj.requested_by_email or '-'
requester_name = request_obj.requested_by_name or _resolve_user_display_name(request_obj.requested_by_email) or '-'
context = {
'FULL_NAME': request_obj.full_name,
'EMAIL': request_obj.work_email,
'DEPARTMENT': request_obj.department or '-',
'JOB_TITLE': request_obj.job_title or '-',
'LAST_WORKING_DAY': request_obj.last_working_day,
'NOTES': request_obj.notes or '-',
'REQUESTED_BY': requester_email,
'REQUESTED_BY_NAME': requester_name,
'HAS_ONBOARDING_DATA': has_onboarding_data,
'ONBOARDING_HARDWARE': onboarding_hardware,
'HARDWARE_CHECKLIST': checklist,
'MANUAL_FIELD_SECTIONS': _manual_onboarding_field_sections(),
'MANUAL_DEVICES': _chunk_choice_labels(DEVICE_CHOICES),
'MANUAL_SOFTWARE': _chunk_choice_labels(SOFTWARE_CHOICES),
'MANUAL_ACCESSES': _chunk_choice_labels(ACCESS_CHOICES),
'MANUAL_WORKSPACE_GROUPS': _chunk_choice_labels(WORKSPACE_GROUP_CHOICES),
'MANUAL_RESOURCES': _chunk_choice_labels(RESOURCE_CHOICES),
'MANUAL_EXTRA_HARDWARE': _chunk_choice_labels(HARDWARE_EXTRA_CHOICES),
'MANUAL_EXTRA_SOFTWARE': _chunk_choice_labels(SOFTWARE_EXTRA_CHOICES),
}
html = _render_html(template_path, context)
_generate_content_pdf(html, temp_pdf)
_overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf)
if temp_pdf.exists():
temp_pdf.unlink(missing_ok=True)
return output_pdf
@shared_task
def process_onboarding_request(onboarding_request_id: int) -> None:
request_obj = OnboardingRequest.objects.get(id=onboarding_request_id)
it_email, general_info_email, business_card_email, hr_works_email, key_email = _resolve_workflow_emails()
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip()
first_name, last_name = _split_name(request_obj.full_name)
EmployeeProfile.objects.update_or_create(
work_email=request_obj.work_email,
defaults={
'full_name': request_obj.full_name,
'first_name': first_name,
'last_name': last_name,
'department': request_obj.department,
'job_title': request_obj.job_title,
},
)
pdf_path = _generate_onboarding_pdf(request_obj)
request_obj.generated_pdf_path = str(pdf_path)
request_obj.save(update_fields=['generated_pdf_path'])
email_context = {
'FULL_NAME': display_name,
'VORNAME': first_name,
'NACHNAME': last_name,
'DEPARTMENT': request_obj.department or '-',
'CONTRACT_START': request_obj.contract_start,
'EMAIL': request_obj.work_email,
'REQUESTED_BY': request_obj.onboarded_by_email or '-',
'BUSINESS_CARD_NAME': request_obj.business_card_name or display_name,
'BUSINESS_CARD_TITLE': request_obj.business_card_title or '-',
'BUSINESS_CARD_EMAIL': request_obj.business_card_email or request_obj.work_email,
'BUSINESS_CARD_PHONE': request_obj.business_card_phone or '-',
'PDF_LINK': settings.ONBOARDING_SHARED_PDF_LINK,
}
_send_templated_email(
template_key='onboarding_it',
context=email_context,
to=[it_email],
attachments=[pdf_path],
)
_send_templated_email(
template_key='onboarding_general_info',
context=email_context,
to=[general_info_email],
)
if request_obj.order_business_cards:
_send_templated_email(
template_key='onboarding_business_card',
context=email_context,
to=[business_card_email],
)
if 'HR Works' in request_obj.needed_accesses:
_send_templated_email(
template_key='onboarding_hr_works',
context=email_context,
to=[hr_works_email],
)
if 'Schlüssel' in request_obj.needed_devices:
_send_templated_email(
template_key='onboarding_key',
context=email_context,
to=[key_email],
)
if request_obj.onboarded_by_email:
_send_templated_email(
template_key='onboarding_reference',
context=email_context,
to=[request_obj.onboarded_by_email],
attachments=[pdf_path],
)
_apply_notification_rules(
event_type='onboarding',
request_obj=request_obj,
context=email_context,
pdf_path=pdf_path,
)
_schedule_welcome_email(request_obj)
upload_to_nextcloud(pdf_path, Path(pdf_path).name)
@shared_task
def process_offboarding_request(offboarding_request_id: int) -> None:
request_obj = OffboardingRequest.objects.get(id=offboarding_request_id)
it_email, general_info_email, _, hr_works_email, _ = _resolve_workflow_emails()
pdf_path = _generate_offboarding_pdf(request_obj)
request_obj.generated_pdf_path = str(pdf_path)
request_obj.save(update_fields=['generated_pdf_path'])
email_context = {
'FULL_NAME': request_obj.full_name,
'DEPARTMENT': request_obj.department or '-',
'LAST_WORKING_DAY': request_obj.last_working_day,
'REQUESTED_BY': request_obj.requested_by_email,
'EMAIL': request_obj.work_email,
}
_send_templated_email(
template_key='offboarding_it',
context=email_context,
to=[it_email],
attachments=[pdf_path],
)
_send_templated_email(
template_key='offboarding_general_info',
context=email_context,
to=[general_info_email],
)
had_hr_works = OnboardingRequest.objects.filter(
work_email=request_obj.work_email,
needed_accesses__icontains='HR Works',
).exists()
if had_hr_works:
_send_templated_email(
template_key='offboarding_hr_works_disable',
context=email_context,
to=[hr_works_email],
)
_send_templated_email(
template_key='offboarding_reference',
context=email_context,
to=[request_obj.requested_by_email],
attachments=[pdf_path],
)
_apply_notification_rules(
event_type='offboarding',
request_obj=request_obj,
context=email_context,
pdf_path=pdf_path,
)
upload_to_nextcloud(pdf_path, Path(pdf_path).name)
@shared_task
def send_scheduled_welcome_email(scheduled_email_id: int, force_now: bool = False) -> None:
scheduled = ScheduledWelcomeEmail.objects.select_related('onboarding_request').filter(id=scheduled_email_id).first()
if not scheduled:
return
if scheduled.status in {'sent', 'cancelled'} and not force_now:
return
if scheduled.status == 'paused' and not force_now:
return
if not force_now and timezone.now() < scheduled.send_at:
async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=scheduled.send_at)
scheduled.celery_task_id = async_result.id or scheduled.celery_task_id
scheduled.save(update_fields=['celery_task_id', 'updated_at'])
return
request_obj = scheduled.onboarding_request
first_name, last_name = _split_name(request_obj.full_name)
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip()
email_context = {
'FULL_NAME': display_name,
'VORNAME': first_name,
'NACHNAME': last_name,
'DEPARTMENT': request_obj.department or '-',
'CONTRACT_START': request_obj.contract_start,
'EMAIL': request_obj.work_email,
'REQUESTED_BY': request_obj.onboarded_by_email or '-',
}
config = WorkflowConfig.objects.order_by('id').first()
include_pdf = True if not config else bool(config.welcome_include_pdf)
from_email = ''
if config:
from_email = (config.welcome_sender_email or config.email_account or '').strip()
attachments = []
if include_pdf and request_obj.generated_pdf_path:
pdf_path = Path(request_obj.generated_pdf_path)
if pdf_path.exists():
attachments = [pdf_path]
try:
_send_templated_email(
template_key='onboarding_welcome',
context=email_context,
to=[scheduled.recipient_email],
attachments=attachments,
from_email=from_email or None,
)
scheduled.status = 'sent'
scheduled.sent_at = timezone.now()
scheduled.last_error = ''
except Exception as exc:
scheduled.status = 'failed'
scheduled.last_error = str(exc)
raise
finally:
scheduled.save(update_fields=['status', 'sent_at', 'last_error', 'updated_at'])