from datetime import timedelta from pathlib import Path import logging from django.conf import settings from django.utils import timezone from jinja2 import Template from .branding import get_default_notification_templates from .emailing import send_system_email from .forms import OnboardingRequestForm from .models import NotificationRule, NotificationTemplate, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig from .services import get_email_test_redirect, is_email_test_mode logger = logging.getLogger(__name__) def resolve_workflow_emails() -> tuple[str, str, str, str, str]: config = WorkflowConfig.objects.order_by('id').first() it_email = config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL general_info_email = config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL business_card_email = config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL hr_works_email = config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL key_email = config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL return it_email, general_info_email, business_card_email, hr_works_email, key_email def send_workflow_email( subject: str, body: str, to: list[str], attachments: list[Path] | None = None, from_email: str | None = None, ) -> None: recipients = [r for r in to if r] if not recipients: return effective_to = recipients effective_body = body if is_email_test_mode(): effective_to = [get_email_test_redirect()] effective_body = ( '[TEST MODE] Diese E-Mail wurde umgeleitet.\n' f"Originale Empfänger: {', '.join(recipients)}\n\n{body}" ) try: send_system_email( subject=subject, body=effective_body, to=effective_to, attachments=[str(a) for a in (attachments or [])], from_email=from_email, ) except OSError as exc: if is_email_test_mode(): logger.warning('Email send skipped in test mode because SMTP is unavailable: %s', exc) return raise def render_notification_template(template_key: str, context: dict, language_code: str | None = None) -> tuple[str, str]: lang = (language_code or 'de').split('-')[0] db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first() if db_template: subject_template = db_template.translated_subject_template(lang) body_template = db_template.translated_body_template(lang) else: fallback = get_default_notification_templates()[template_key] subject_template = fallback.get(f'subject_{lang}', '') or fallback['subject'] body_template = fallback.get(f'body_{lang}', '') or fallback['body'] subject = Template(subject_template).render(context).strip() body = Template(body_template).render(context).strip() return subject, body def parse_recipients(raw: str) -> list[str]: if not raw: return [] cleaned = raw.replace(';', ',').replace('\n', ',') return [x.strip() for x in cleaned.split(',') if x.strip()] def as_bool(value) -> bool: if isinstance(value, bool): return value if value is None: return False text = str(value).strip().lower() return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'} def rule_matches(rule: NotificationRule, request_obj) -> bool: if rule.operator == 'always': return True raw_value = getattr(request_obj, rule.field_name, '') actual = '' if raw_value is None else str(raw_value) expected = (rule.expected_value or '').strip() if rule.operator == 'contains': return expected.lower() in actual.lower() if rule.operator == 'equals': return actual.strip().lower() == expected.lower() if rule.operator == 'is_true': return as_bool(raw_value) if rule.operator == 'is_false': return not as_bool(raw_value) return False def send_templated_email( template_key: str, to: list[str], context: dict, attachments: list[Path] | None = None, from_email: str | None = None, language_code: str | None = None, ) -> None: subject, body = render_notification_template(template_key, context, language_code=language_code) send_workflow_email(subject=subject, body=body, to=to, attachments=attachments, from_email=from_email) def apply_notification_rules( event_type: str, request_obj, context: dict, pdf_path: Path | None = None, ) -> None: language_code = (getattr(request_obj, 'preferred_language', '') or 'de').split('-')[0] rules = NotificationRule.objects.filter(event_type=event_type, is_active=True).order_by('sort_order', 'id') for rule in rules: if not rule_matches(rule, request_obj): continue recipients = parse_recipients(rule.recipients) if not recipients: continue attachments = [pdf_path] if (pdf_path and rule.include_pdf_attachment) else None template_key = (rule.template_key or '').strip() known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES} if template_key and template_key in known_keys: send_templated_email( template_key=template_key, context=context, to=recipients, attachments=attachments, language_code=language_code, ) continue subject = rule.translated_custom_subject(language_code) body = rule.translated_custom_body(language_code) if not subject and not body: continue subject_rendered = Template(subject or f'[{event_type}] Regelmail').render(context).strip() body_rendered = Template(body or '-').render(context).strip() send_workflow_email( subject=subject_rendered, body=body_rendered, to=recipients, attachments=attachments, ) def schedule_welcome_email(request_obj: OnboardingRequest, *, send_scheduled_welcome_email_task) -> None: recipient = (request_obj.work_email or '').strip().lower() if not recipient: return config = WorkflowConfig.objects.order_by('id').first() delay_days = 5 if config: delay_days = max(0, int(config.welcome_email_delay_days or 5)) send_at = timezone.now() + timedelta(days=delay_days) scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create( onboarding_request=request_obj, defaults={ 'recipient_email': recipient, 'send_at': send_at, 'status': 'scheduled', 'last_error': '', 'sent_at': None, }, ) try: async_result = send_scheduled_welcome_email_task.apply_async(args=[scheduled.id], eta=send_at) scheduled.celery_task_id = async_result.id or '' scheduled.save(update_fields=['celery_task_id', 'updated_at']) except Exception as exc: scheduled.status = 'failed' scheduled.last_error = f'Scheduling failed: {exc}' scheduled.save(update_fields=['status', 'last_error', 'updated_at'])