from pathlib import Path from datetime import timedelta import base64 import mimetypes import re from celery import shared_task from django.contrib.auth import get_user_model from django.conf import settings from django.utils import timezone from jinja2 import Template from pypdf import PageObject, PdfReader, PdfWriter from xhtml2pdf import pisa from .models import EmployeeProfile, NotificationRule, NotificationTemplate, OffboardingRequest, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig from .emailing import send_system_email from .services import upload_to_nextcloud from .services import get_email_test_redirect, is_email_test_mode from .forms import ( ACCESS_CHOICES, DEVICE_CHOICES, HARDWARE_EXTRA_CHOICES, OnboardingRequestForm, RESOURCE_CHOICES, SOFTWARE_CHOICES, SOFTWARE_EXTRA_CHOICES, WORKSPACE_GROUP_CHOICES, ) DEFAULT_NOTIFICATION_TEMPLATES = { 'onboarding_it': { 'subject': '[Onboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Neue Onboarding-Anfrage für {{ FULL_NAME }}.\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Vertragsbeginn: {{ CONTRACT_START }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' 'Bitte IT-Setup vorbereiten.' ), }, 'onboarding_general_info': { 'subject': '[Info Onboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Hallo,\n\n' '{{ FULL_NAME }} wird onboarded.\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Vertragsbeginn: {{ CONTRACT_START }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, 'onboarding_business_card': { 'subject': '[Visitenkarte] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Hallo,\n\n' 'bitte Visitenkarten erstellen:\n' 'Name: {{ BUSINESS_CARD_NAME }}\n' 'Titel: {{ BUSINESS_CARD_TITLE }}\n' 'E-Mail: {{ BUSINESS_CARD_EMAIL }}\n' 'Telefon: {{ BUSINESS_CARD_PHONE }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, 'onboarding_hr_works': { 'subject': '[HR Works] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Hello Stefanie,\n\n' 'Es ist wieder soweit. Zuwachs!\n\n' 'Könntest du deshalb bitte ein HR Works Konto mit den folgenden Daten erstellen:\n\n' 'Name: {{ VORNAME }} {{ NACHNAME }}\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Vertragsbeginn: {{ CONTRACT_START }}\n' 'E-Mail-Adresse: {{ EMAIL }}\n\n' '{% if PDF_LINK %}In 2 Minuten findest du alle Infos über den Mitarbeiter als PDF unter diesem Link: {{ PDF_LINK }}\n\n{% endif %}' 'Falls du noch irgendwelche anderen Informationen benötigen solltest, kannst du dich bei der it@tub.co melden!\n\n' 'Vielen Dank und schöne Grüße,\n' 'Die IT.' ), }, 'onboarding_key': { 'subject': '[Schlüssel] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Hallo,\n\n' 'bitte Schlüssel vorbereiten für:\n' 'Name: {{ FULL_NAME }}\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Vertragsbeginn: {{ CONTRACT_START }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, 'onboarding_reference': { 'subject': '[Referenz Onboarding] {{ FULL_NAME }} | Ihre Anfrage', 'body': ( 'Diese E-Mail dient als Referenz für Ihre Onboarding-Anfrage.\n' 'Name: {{ FULL_NAME }}\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Vertragsbeginn: {{ CONTRACT_START }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, 'onboarding_welcome': { 'subject': 'Willkommen bei TUB/CO, {{ VORNAME }}', 'body': ( 'Hallo {{ FULL_NAME }},\n\n' 'herzlich willkommen bei TUB/CO.\n' 'Wir freuen uns sehr, dass du ab dem {{ CONTRACT_START }} unser Team in der Abteilung {{ DEPARTMENT }} verstärkst.\n\n' 'Deine dienstliche E-Mail-Adresse lautet: {{ EMAIL }}.\n' 'Im Anhang findest du deine Onboarding-Unterlagen als PDF.\n\n' 'Wenn du Fragen hast, melde dich gerne jederzeit.\n\n' 'Viele Grüße\n' 'TUB/CO IT' ), }, 'offboarding_it': { 'subject': '[Offboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Neue Offboarding-Anfrage für {{ FULL_NAME }}.\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Letzter Arbeitstag: {{ LAST_WORKING_DAY }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' 'Bitte IT-Offboarding durchführen.' ), }, 'offboarding_general_info': { 'subject': '[Info Offboarding] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Neue Offboarding-Anfrage für {{ FULL_NAME }}.\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Letzter Arbeitstag: {{ LAST_WORKING_DAY }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, 'offboarding_hr_works_disable': { 'subject': '[HR Works Deaktivierung] {{ FULL_NAME }} | Anfrage von {{ REQUESTED_BY }}', 'body': ( 'Bitte HR Works Zugriff deaktivieren für {{ FULL_NAME }} ({{ EMAIL }}) zum {{ LAST_WORKING_DAY }}.\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, 'offboarding_reference': { 'subject': '[Referenz Offboarding] {{ FULL_NAME }} | Ihre Anfrage', 'body': ( 'Diese E-Mail dient als Referenz für Ihre Offboarding-Anfrage.\n' 'Name: {{ FULL_NAME }}\n' 'Abteilung: {{ DEPARTMENT }}\n' 'Letzter Arbeitstag: {{ LAST_WORKING_DAY }}\n' 'Angefordert von: {{ REQUESTED_BY }}\n' ), }, } def _split_name(full_name: str) -> tuple[str, str]: parts = full_name.split() if not parts: return '', '' return parts[0], ' '.join(parts[1:]) def _safe_filename_fragment(text: str, fallback: str = 'document') -> str: value = re.sub(r'[^A-Za-z0-9._-]+', '_', (text or '').strip()).strip('._') return value[:120] if value else fallback def _resolve_user_display_name(email: str) -> str: email = (email or '').strip().lower() if not email: return '' user_model = get_user_model() user = user_model.objects.filter(email__iexact=email).first() if not user: return '' first_name = (getattr(user, 'first_name', '') or '').strip() last_name = (getattr(user, 'last_name', '') or '').strip() full_name = f'{first_name} {last_name}'.strip() if full_name: return full_name return (getattr(user, 'username', '') or '').strip() def _chunk_list(data_list: list[str], chunk_size: int = 3) -> list[list[str]]: items = [i.strip() for i in data_list if i and i.strip()] chunks = [] for i in range(0, len(items), chunk_size): chunks.append(items[i : i + chunk_size]) return chunks def _split_multiline(text: str) -> list[str]: return [line.strip() for line in (text or '').split('\n') if line.strip()] def _chunk_choice_labels(choices: list[tuple[str, str]], chunk_size: int = 3) -> list[list[str]]: labels = [label for _, label in choices] return _chunk_list(labels, chunk_size=chunk_size) MANUAL_ONBOARDING_FIELD_SECTIONS = [ ( 'Stammdaten', [ 'gender', 'first_name', 'last_name', 'job_title', 'department', 'work_email', 'order_business_cards', 'business_card_name', 'business_card_title', 'business_card_email', 'business_card_phone', ], ), ( 'Vertrag', [ 'contract_start', 'employment_type', 'employment_end_date', 'handover_date', ], ), ( 'IT-Setup', [ 'group_mailboxes_required_choice', 'group_mailboxes', 'additional_hardware_needed_choice', 'additional_hardware_other', 'additional_software_needed_choice', 'additional_software', 'additional_access_needed_choice', 'additional_access_text', 'successor_required_choice', 'successor_name', 'inherit_phone_number_choice', 'phone_number_choice', ], ), ( 'Abschluss', [ 'additional_notes', 'signature_image', 'agreement_confirm', ], ), ] def _manual_onboarding_field_sections() -> list[dict]: fields = OnboardingRequestForm.base_fields sections = [] for title, field_names in MANUAL_ONBOARDING_FIELD_SECTIONS: labels = [str(fields[name].label or name) for name in field_names if name in fields] if not labels: continue sections.append({'title': title, 'rows': _chunk_list(labels, chunk_size=2)}) return sections def _resolve_workflow_emails() -> tuple[str, str, str, str, str]: config = WorkflowConfig.objects.order_by('id').first() it_email = (config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL) general_info_email = (config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL) business_card_email = (config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL) hr_works_email = (config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL) key_email = (config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL) return it_email, general_info_email, business_card_email, hr_works_email, key_email def _send_workflow_email( subject: str, body: str, to: list[str], attachments: list[Path] | None = None, from_email: str | None = None, ) -> None: recipients = [r for r in to if r] if not recipients: return effective_to = recipients effective_body = body if is_email_test_mode(): effective_to = [get_email_test_redirect()] effective_body = ( "[TEST MODE] Diese E-Mail wurde umgeleitet.\n" f"Originale Empfänger: {', '.join(recipients)}\n\n{body}" ) send_system_email( subject=subject, body=effective_body, to=effective_to, attachments=[str(a) for a in (attachments or [])], from_email=from_email, ) def _render_notification_template(template_key: str, context: dict) -> tuple[str, str]: db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first() if db_template: subject_template = db_template.subject_template body_template = db_template.body_template else: fallback = DEFAULT_NOTIFICATION_TEMPLATES[template_key] subject_template = fallback['subject'] body_template = fallback['body'] subject = Template(subject_template).render(context).strip() body = Template(body_template).render(context).strip() return subject, body def _parse_recipients(raw: str) -> list[str]: if not raw: return [] cleaned = raw.replace(';', ',').replace('\n', ',') return [x.strip() for x in cleaned.split(',') if x.strip()] def _as_bool(value) -> bool: if isinstance(value, bool): return value if value is None: return False text = str(value).strip().lower() return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'} def _rule_matches(rule: NotificationRule, request_obj) -> bool: if rule.operator == 'always': return True raw_value = getattr(request_obj, rule.field_name, '') actual = '' if raw_value is None else str(raw_value) expected = (rule.expected_value or '').strip() if rule.operator == 'contains': return expected.lower() in actual.lower() if rule.operator == 'equals': return actual.strip().lower() == expected.lower() if rule.operator == 'is_true': return _as_bool(raw_value) if rule.operator == 'is_false': return not _as_bool(raw_value) return False def _apply_notification_rules( event_type: str, request_obj, context: dict, pdf_path: Path | None = None, ) -> None: rules = NotificationRule.objects.filter(event_type=event_type, is_active=True).order_by('sort_order', 'id') for rule in rules: if not _rule_matches(rule, request_obj): continue recipients = _parse_recipients(rule.recipients) if not recipients: continue attachments = [pdf_path] if (pdf_path and rule.include_pdf_attachment) else None template_key = (rule.template_key or '').strip() known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES} if template_key and template_key in known_keys: _send_templated_email( template_key=template_key, context=context, to=recipients, attachments=attachments, ) continue subject = (rule.custom_subject or '').strip() body = (rule.custom_body or '').strip() if not subject and not body: continue subject_rendered = Template(subject or f'[{event_type}] Regelmail').render(context).strip() body_rendered = Template(body or '-').render(context).strip() _send_workflow_email( subject=subject_rendered, body=body_rendered, to=recipients, attachments=attachments, ) def _schedule_welcome_email(request_obj: OnboardingRequest) -> None: recipient = (request_obj.work_email or '').strip().lower() if not recipient: return config = WorkflowConfig.objects.order_by('id').first() delay_days = 5 if config: delay_days = max(0, int(config.welcome_email_delay_days or 5)) send_at = timezone.now() + timedelta(days=delay_days) scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create( onboarding_request=request_obj, defaults={ 'recipient_email': recipient, 'send_at': send_at, 'status': 'scheduled', 'last_error': '', 'sent_at': None, }, ) try: async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=send_at) scheduled.celery_task_id = async_result.id or '' scheduled.save(update_fields=['celery_task_id', 'updated_at']) except Exception as exc: scheduled.status = 'failed' scheduled.last_error = f'Scheduling failed: {exc}' scheduled.save(update_fields=['status', 'last_error', 'updated_at']) def _send_templated_email( template_key: str, to: list[str], context: dict, attachments: list[Path] | None = None, from_email: str | None = None, ) -> None: subject, body = _render_notification_template(template_key, context) _send_workflow_email(subject=subject, body=body, to=to, attachments=attachments, from_email=from_email) def _render_html(template_path: Path, context: dict) -> str: with template_path.open('r', encoding='utf-8') as handle: template = Template(handle.read()) return template.render(context) def _generate_content_pdf(html_content: str, output_pdf: Path) -> None: page_style = ( '' ) if '' in html_content: html_content = html_content.replace('', f'{page_style}', 1) else: html_content = page_style + html_content output_pdf.parent.mkdir(parents=True, exist_ok=True) with output_pdf.open('wb') as fp: result = pisa.CreatePDF( src=html_content, dest=fp, encoding='utf-8', ) if result.err: raise RuntimeError(f'Failed to render PDF content for {output_pdf.name}') def _overlay_with_letterhead(content_pdf: Path, letterhead_pdf: Path, output_pdf: Path) -> None: letterhead_reader = PdfReader(str(letterhead_pdf)) content_reader = PdfReader(str(content_pdf)) writer = PdfWriter() letterhead_page = letterhead_reader.pages[0] for page in content_reader.pages: merged = PageObject.create_blank_page( width=letterhead_page.mediabox.width, height=letterhead_page.mediabox.height, ) merged.merge_page(letterhead_page) merged.merge_page(page) writer.add_page(merged) with output_pdf.open('wb') as fp: writer.write(fp) def _generate_onboarding_pdf(request_obj: OnboardingRequest) -> Path: first_name, last_name = _split_name(request_obj.full_name) safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'onboarding_{request_obj.id}') output_pdf = settings.PDF_OUTPUT_DIR / f'onboarding_letter_{safe_name}.pdf' temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_onboarding_{safe_name}.pdf' template_path = settings.PDF_TEMPLATES_DIR / 'onboarding_template.html' letterhead_path = settings.PDF_TEMPLATES_DIR / 'templates.pdf' devices = _split_multiline(request_obj.needed_devices) software = _split_multiline(request_obj.needed_software) accesses = _split_multiline(request_obj.needed_accesses) groups = _split_multiline(request_obj.needed_workspace_groups) resources = _split_multiline(request_obj.needed_resources) group_mailboxes_list = _split_multiline(request_obj.group_mailboxes or '') additional_hardware_list = _split_multiline(request_obj.additional_hardware or '') additional_software_list = _split_multiline(request_obj.additional_software or '') additional_access_list = _split_multiline(request_obj.additional_access_text or '') signature_src = '' signature_note = '-' if getattr(request_obj, 'signature_image', None): try: signature_path = Path(request_obj.signature_image.path).resolve() with signature_path.open('rb') as sig_fp: encoded = base64.b64encode(sig_fp.read()).decode('ascii') mime_type = mimetypes.guess_type(signature_path.name)[0] or 'image/png' signature_src = f"data:{mime_type};base64,{encoded}" signature_note = 'Digitale Signatur als Bilddatei hinterlegt.' except Exception: signature_src = '' signature_note = request_obj.signature_url or '-' elif request_obj.signature_url: signature_note = request_obj.signature_url requester_email = request_obj.onboarded_by_email or '-' requester_name = request_obj.onboarded_by_name or _resolve_user_display_name(request_obj.onboarded_by_email) or '-' gender = (request_obj.get_gender_display() or '-').strip() or '-' employment_type = (request_obj.employment_type or '-').strip() or '-' employment_end = request_obj.employment_end_date or '-' order_business_cards = bool(request_obj.order_business_cards) group_mailboxes = (request_obj.group_mailboxes or '').strip() additional_hardware_other = (request_obj.additional_hardware_other or '').strip() additional_hardware = (request_obj.additional_hardware or '').strip() additional_software = (request_obj.additional_software or '').strip() additional_access_text = (request_obj.additional_access_text or '').strip() successor_name = (request_obj.successor_name or '').strip() additional_notes = (request_obj.additional_notes or '').strip() phone_number = (request_obj.phone_number or '').strip() display_name = f"{gender} {first_name} {last_name}".strip() if gender and gender != '-' else f"{first_name} {last_name}".strip() context = { 'VORNAME': first_name, 'NACHNAME': last_name, 'DISPLAY_NAME': display_name or request_obj.full_name, 'ANREDE': gender, 'BERUFSBEZEICHNUNG': request_obj.job_title or 'N/A', 'ABTEILUNG': request_obj.department or 'N/A', 'EMAIL': request_obj.work_email or 'N/A', 'VERTRAGSBEGINN': request_obj.contract_start, 'BESCHAEFTIGUNG': employment_type, 'VERTRAGSENDE': employment_end, 'UEBERGABEDATUM': request_obj.handover_date or '-', 'ARBEITSGERAETE_TEXT': ' | '.join(devices) if devices else 'Keine Angabe', 'WORKSPACE_GROUPS_TEXT': ' | '.join(groups) if groups else 'Keine Angabe', 'SOFTWARE_TEXT': ' | '.join(software) if software else 'Keine Angabe', 'ZUGAENGE_TEXT': ' | '.join(accesses) if accesses else 'Keine Angabe', 'RESSOURCEN_TEXT': ' | '.join(resources) if resources else 'Keine Angabe', 'VISITENKARTE_BESTELLT': order_business_cards, 'HAS_VISITENKARTE_DATEN': order_business_cards and any( [ (request_obj.business_card_name or '').strip(), (request_obj.business_card_title or '').strip(), (request_obj.business_card_email or '').strip(), (request_obj.business_card_phone or '').strip(), ] ), 'VISITENKARTE_NAME': request_obj.business_card_name or '-', 'VISITENKARTE_TITEL': request_obj.business_card_title or '-', 'VISITENKARTE_EMAIL': request_obj.business_card_email or '-', 'VISITENKARTE_TELEFON': request_obj.business_card_phone or '-', 'GROUP_MAILBOXES': group_mailboxes or 'Keine Angabe', 'ADDITIONAL_HARDWARE_OTHER': additional_hardware_other or 'Keine Angabe', 'ADDITIONAL_HARDWARE': additional_hardware or 'Keine Angabe', 'ADDITIONAL_SOFTWARE': additional_software or 'Keine Angabe', 'ADDITIONAL_ACCESS_TEXT': additional_access_text or 'Keine Angabe', 'SUCCESSOR_NAME': successor_name or 'Keine Angabe', 'PHONE_NUMBER': phone_number or '-', 'INHERIT_PHONE_NUMBER': 'Ja' if request_obj.inherit_phone_number else 'Nein', 'ADDITIONAL_NOTES': additional_notes or 'Keine Angabe', 'GROUP_MAILBOXES_REQUIRED': bool(request_obj.group_mailboxes_required), 'ADDITIONAL_HARDWARE_NEEDED': bool(request_obj.additional_hardware_needed), 'ADDITIONAL_SOFTWARE_NEEDED': bool(request_obj.additional_software_needed), 'ADDITIONAL_ACCESS_NEEDED': bool(request_obj.additional_access_needed), 'HAS_DEVICES': bool(devices), 'HAS_GROUPS': bool(groups), 'HAS_SOFTWARE': bool(software), 'HAS_ACCESSES': bool(accesses), 'HAS_RESOURCES': bool(resources), 'HAS_GROUP_MAILBOXES': bool(group_mailboxes_list), 'HAS_ADDITIONAL_HARDWARE': bool(additional_hardware_list), 'HAS_ADDITIONAL_SOFTWARE': bool(additional_software_list), 'HAS_ADDITIONAL_ACCESS': bool(additional_access_list), 'HAS_ADDITIONAL_HARDWARE_OTHER': bool(additional_hardware_other), 'HAS_SUCCESSOR_INFO': bool(successor_name) or bool(request_obj.inherit_phone_number) or bool(phone_number), 'HAS_ADDITIONAL_NOTES': bool(additional_notes), 'GROUP_MAILBOXES_LIST': _chunk_list(group_mailboxes_list), 'ADDITIONAL_HARDWARE_LIST': _chunk_list(additional_hardware_list), 'ADDITIONAL_SOFTWARE_LIST': _chunk_list(additional_software_list), 'ADDITIONAL_ACCESS_LIST': _chunk_list(additional_access_list), 'ZUGAENGE_LIST': _chunk_list(groups), 'ARBEITSGERÄTE_LIST': _chunk_list(devices), 'SOFTWARE_LIST': _chunk_list(software), 'ACCOUNT_LIST': _chunk_list(accesses), 'STANDARD_RESSOURCEN': _chunk_list(resources), 'UNTERSCHRIFT': signature_src, 'UNTERSCHRIFT_HINWEIS': signature_note, 'REQUESTED_BY_NAME': requester_name, 'REQUESTED_BY_EMAIL': requester_email, } html = _render_html(template_path, context) _generate_content_pdf(html, temp_pdf) _overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf) if temp_pdf.exists(): temp_pdf.unlink(missing_ok=True) return output_pdf def _generate_offboarding_pdf(request_obj: OffboardingRequest) -> Path: safe_name = _safe_filename_fragment(request_obj.full_name, fallback=f'offboarding_{request_obj.id}') output_pdf = settings.PDF_OUTPUT_DIR / f'offboarding_letter_{safe_name}.pdf' temp_pdf = settings.PDF_OUTPUT_DIR / f'_temp_offboarding_{safe_name}.pdf' template_path = settings.PDF_TEMPLATES_DIR / 'offboarding_template.html' letterhead_path = settings.PDF_TEMPLATES_DIR / 'templates.pdf' latest_onboarding = ( OnboardingRequest.objects.filter(work_email=request_obj.work_email) .order_by('-created_at') .first() ) has_onboarding_data = latest_onboarding is not None onboarding_hardware = _split_multiline(latest_onboarding.needed_devices) if latest_onboarding else [] selected_set = {item.lower() for item in onboarding_hardware} hardware_catalog = [ 'Laptop', 'Docking-Station', 'Tastatur und Maus', 'Kopfhörer', 'Tragetasche', 'Monitor', 'Schlüssel', 'Tischtelefon', ] checklist = [{'label': item, 'selected': item.lower() in selected_set} for item in hardware_catalog] extra_selected = [item for item in onboarding_hardware if item.lower() not in {x.lower() for x in hardware_catalog}] for item in extra_selected: checklist.append({'label': item, 'selected': True}) requester_email = request_obj.requested_by_email or '-' requester_name = request_obj.requested_by_name or _resolve_user_display_name(request_obj.requested_by_email) or '-' context = { 'FULL_NAME': request_obj.full_name, 'EMAIL': request_obj.work_email, 'DEPARTMENT': request_obj.department or '-', 'JOB_TITLE': request_obj.job_title or '-', 'LAST_WORKING_DAY': request_obj.last_working_day, 'NOTES': request_obj.notes or '-', 'REQUESTED_BY': requester_email, 'REQUESTED_BY_NAME': requester_name, 'HAS_ONBOARDING_DATA': has_onboarding_data, 'ONBOARDING_HARDWARE': onboarding_hardware, 'HARDWARE_CHECKLIST': checklist, 'MANUAL_FIELD_SECTIONS': _manual_onboarding_field_sections(), 'MANUAL_DEVICES': _chunk_choice_labels(DEVICE_CHOICES), 'MANUAL_SOFTWARE': _chunk_choice_labels(SOFTWARE_CHOICES), 'MANUAL_ACCESSES': _chunk_choice_labels(ACCESS_CHOICES), 'MANUAL_WORKSPACE_GROUPS': _chunk_choice_labels(WORKSPACE_GROUP_CHOICES), 'MANUAL_RESOURCES': _chunk_choice_labels(RESOURCE_CHOICES), 'MANUAL_EXTRA_HARDWARE': _chunk_choice_labels(HARDWARE_EXTRA_CHOICES), 'MANUAL_EXTRA_SOFTWARE': _chunk_choice_labels(SOFTWARE_EXTRA_CHOICES), } html = _render_html(template_path, context) _generate_content_pdf(html, temp_pdf) _overlay_with_letterhead(temp_pdf, letterhead_path, output_pdf) if temp_pdf.exists(): temp_pdf.unlink(missing_ok=True) return output_pdf @shared_task def process_onboarding_request(onboarding_request_id: int) -> None: request_obj = OnboardingRequest.objects.get(id=onboarding_request_id) it_email, general_info_email, business_card_email, hr_works_email, key_email = _resolve_workflow_emails() salutation = (request_obj.get_gender_display() or '').strip() display_name = f"{salutation} {request_obj.full_name}".strip() first_name, last_name = _split_name(request_obj.full_name) EmployeeProfile.objects.update_or_create( work_email=request_obj.work_email, defaults={ 'full_name': request_obj.full_name, 'first_name': first_name, 'last_name': last_name, 'department': request_obj.department, 'job_title': request_obj.job_title, }, ) pdf_path = _generate_onboarding_pdf(request_obj) request_obj.generated_pdf_path = str(pdf_path) request_obj.save(update_fields=['generated_pdf_path']) email_context = { 'FULL_NAME': display_name, 'VORNAME': first_name, 'NACHNAME': last_name, 'DEPARTMENT': request_obj.department or '-', 'CONTRACT_START': request_obj.contract_start, 'EMAIL': request_obj.work_email, 'REQUESTED_BY': request_obj.onboarded_by_email or '-', 'BUSINESS_CARD_NAME': request_obj.business_card_name or display_name, 'BUSINESS_CARD_TITLE': request_obj.business_card_title or '-', 'BUSINESS_CARD_EMAIL': request_obj.business_card_email or request_obj.work_email, 'BUSINESS_CARD_PHONE': request_obj.business_card_phone or '-', 'PDF_LINK': settings.ONBOARDING_SHARED_PDF_LINK, } _send_templated_email( template_key='onboarding_it', context=email_context, to=[it_email], attachments=[pdf_path], ) _send_templated_email( template_key='onboarding_general_info', context=email_context, to=[general_info_email], ) if request_obj.order_business_cards: _send_templated_email( template_key='onboarding_business_card', context=email_context, to=[business_card_email], ) if 'HR Works' in request_obj.needed_accesses: _send_templated_email( template_key='onboarding_hr_works', context=email_context, to=[hr_works_email], ) if 'Schlüssel' in request_obj.needed_devices: _send_templated_email( template_key='onboarding_key', context=email_context, to=[key_email], ) if request_obj.onboarded_by_email: _send_templated_email( template_key='onboarding_reference', context=email_context, to=[request_obj.onboarded_by_email], attachments=[pdf_path], ) _apply_notification_rules( event_type='onboarding', request_obj=request_obj, context=email_context, pdf_path=pdf_path, ) _schedule_welcome_email(request_obj) upload_to_nextcloud(pdf_path, Path(pdf_path).name) @shared_task def process_offboarding_request(offboarding_request_id: int) -> None: request_obj = OffboardingRequest.objects.get(id=offboarding_request_id) it_email, general_info_email, _, hr_works_email, _ = _resolve_workflow_emails() pdf_path = _generate_offboarding_pdf(request_obj) request_obj.generated_pdf_path = str(pdf_path) request_obj.save(update_fields=['generated_pdf_path']) email_context = { 'FULL_NAME': request_obj.full_name, 'DEPARTMENT': request_obj.department or '-', 'LAST_WORKING_DAY': request_obj.last_working_day, 'REQUESTED_BY': request_obj.requested_by_email, 'EMAIL': request_obj.work_email, } _send_templated_email( template_key='offboarding_it', context=email_context, to=[it_email], attachments=[pdf_path], ) _send_templated_email( template_key='offboarding_general_info', context=email_context, to=[general_info_email], ) had_hr_works = OnboardingRequest.objects.filter( work_email=request_obj.work_email, needed_accesses__icontains='HR Works', ).exists() if had_hr_works: _send_templated_email( template_key='offboarding_hr_works_disable', context=email_context, to=[hr_works_email], ) _send_templated_email( template_key='offboarding_reference', context=email_context, to=[request_obj.requested_by_email], attachments=[pdf_path], ) _apply_notification_rules( event_type='offboarding', request_obj=request_obj, context=email_context, pdf_path=pdf_path, ) upload_to_nextcloud(pdf_path, Path(pdf_path).name) @shared_task def send_scheduled_welcome_email(scheduled_email_id: int, force_now: bool = False) -> None: scheduled = ScheduledWelcomeEmail.objects.select_related('onboarding_request').filter(id=scheduled_email_id).first() if not scheduled: return if scheduled.status in {'sent', 'cancelled'} and not force_now: return if scheduled.status == 'paused' and not force_now: return if not force_now and timezone.now() < scheduled.send_at: async_result = send_scheduled_welcome_email.apply_async(args=[scheduled.id], eta=scheduled.send_at) scheduled.celery_task_id = async_result.id or scheduled.celery_task_id scheduled.save(update_fields=['celery_task_id', 'updated_at']) return request_obj = scheduled.onboarding_request first_name, last_name = _split_name(request_obj.full_name) salutation = (request_obj.get_gender_display() or '').strip() display_name = f"{salutation} {request_obj.full_name}".strip() email_context = { 'FULL_NAME': display_name, 'VORNAME': first_name, 'NACHNAME': last_name, 'DEPARTMENT': request_obj.department or '-', 'CONTRACT_START': request_obj.contract_start, 'EMAIL': request_obj.work_email, 'REQUESTED_BY': request_obj.onboarded_by_email or '-', } config = WorkflowConfig.objects.order_by('id').first() include_pdf = True if not config else bool(config.welcome_include_pdf) from_email = '' if config: from_email = (config.welcome_sender_email or config.email_account or '').strip() attachments = [] if include_pdf and request_obj.generated_pdf_path: pdf_path = Path(request_obj.generated_pdf_path) if pdf_path.exists(): attachments = [pdf_path] try: _send_templated_email( template_key='onboarding_welcome', context=email_context, to=[scheduled.recipient_email], attachments=attachments, from_email=from_email or None, ) scheduled.status = 'sent' scheduled.sent_at = timezone.now() scheduled.last_error = '' except Exception as exc: scheduled.status = 'failed' scheduled.last_error = str(exc) raise finally: scheduled.save(update_fields=['status', 'sent_at', 'last_error', 'updated_at'])