198 lines
7.3 KiB
Python
198 lines
7.3 KiB
Python
from datetime import timedelta
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from jinja2 import Template
|
|
|
|
from .branding import get_default_notification_templates
|
|
from .emailing import send_system_email
|
|
from .forms import OnboardingRequestForm
|
|
from .models import NotificationRule, NotificationTemplate, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig
|
|
from .services import get_email_test_redirect, is_email_test_mode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def resolve_workflow_emails() -> tuple[str, str, str, str, str]:
|
|
config = WorkflowConfig.objects.order_by('id').first()
|
|
it_email = config.it_onboarding_email if config and config.it_onboarding_email else settings.IT_ONBOARDING_NOTIFICATION_EMAIL
|
|
general_info_email = config.general_info_email if config and config.general_info_email else settings.GENERAL_INFO_NOTIFICATION_EMAIL
|
|
business_card_email = config.business_card_email if config and config.business_card_email else settings.BUSINESS_CARD_NOTIFICATION_EMAIL
|
|
hr_works_email = config.hr_works_email if config and config.hr_works_email else settings.HR_WORKS_NOTIFICATION_EMAIL
|
|
key_email = config.key_notification_email if config and config.key_notification_email else settings.KEY_NOTIFICATION_EMAIL
|
|
return it_email, general_info_email, business_card_email, hr_works_email, key_email
|
|
|
|
|
|
def send_workflow_email(
|
|
subject: str,
|
|
body: str,
|
|
to: list[str],
|
|
attachments: list[Path] | None = None,
|
|
from_email: str | None = None,
|
|
) -> None:
|
|
recipients = [r for r in to if r]
|
|
if not recipients:
|
|
return
|
|
|
|
effective_to = recipients
|
|
effective_body = body
|
|
if is_email_test_mode():
|
|
effective_to = [get_email_test_redirect()]
|
|
effective_body = (
|
|
'[TEST MODE] Diese E-Mail wurde umgeleitet.\n'
|
|
f"Originale Empfänger: {', '.join(recipients)}\n\n{body}"
|
|
)
|
|
|
|
try:
|
|
send_system_email(
|
|
subject=subject,
|
|
body=effective_body,
|
|
to=effective_to,
|
|
attachments=[str(a) for a in (attachments or [])],
|
|
from_email=from_email,
|
|
)
|
|
except OSError as exc:
|
|
if is_email_test_mode():
|
|
logger.warning('Email send skipped in test mode because SMTP is unavailable: %s', exc)
|
|
return
|
|
raise
|
|
|
|
|
|
def render_notification_template(template_key: str, context: dict, language_code: str | None = None) -> tuple[str, str]:
|
|
lang = (language_code or 'de').split('-')[0]
|
|
db_template = NotificationTemplate.objects.filter(key=template_key, is_active=True).first()
|
|
if db_template:
|
|
subject_template = db_template.translated_subject_template(lang)
|
|
body_template = db_template.translated_body_template(lang)
|
|
else:
|
|
fallback = get_default_notification_templates()[template_key]
|
|
subject_template = fallback.get(f'subject_{lang}', '') or fallback['subject']
|
|
body_template = fallback.get(f'body_{lang}', '') or fallback['body']
|
|
|
|
subject = Template(subject_template).render(context).strip()
|
|
body = Template(body_template).render(context).strip()
|
|
return subject, body
|
|
|
|
|
|
def parse_recipients(raw: str) -> list[str]:
|
|
if not raw:
|
|
return []
|
|
cleaned = raw.replace(';', ',').replace('\n', ',')
|
|
return [x.strip() for x in cleaned.split(',') if x.strip()]
|
|
|
|
|
|
def as_bool(value) -> bool:
|
|
if isinstance(value, bool):
|
|
return value
|
|
if value is None:
|
|
return False
|
|
text = str(value).strip().lower()
|
|
return text in {'1', 'true', 'ja', 'yes', 'on', 'aktiv'}
|
|
|
|
|
|
def rule_matches(rule: NotificationRule, request_obj) -> bool:
|
|
if rule.operator == 'always':
|
|
return True
|
|
|
|
raw_value = getattr(request_obj, rule.field_name, '')
|
|
actual = '' if raw_value is None else str(raw_value)
|
|
expected = (rule.expected_value or '').strip()
|
|
|
|
if rule.operator == 'contains':
|
|
return expected.lower() in actual.lower()
|
|
if rule.operator == 'equals':
|
|
return actual.strip().lower() == expected.lower()
|
|
if rule.operator == 'is_true':
|
|
return as_bool(raw_value)
|
|
if rule.operator == 'is_false':
|
|
return not as_bool(raw_value)
|
|
return False
|
|
|
|
|
|
def send_templated_email(
|
|
template_key: str,
|
|
to: list[str],
|
|
context: dict,
|
|
attachments: list[Path] | None = None,
|
|
from_email: str | None = None,
|
|
language_code: str | None = None,
|
|
) -> None:
|
|
subject, body = render_notification_template(template_key, context, language_code=language_code)
|
|
send_workflow_email(subject=subject, body=body, to=to, attachments=attachments, from_email=from_email)
|
|
|
|
|
|
def apply_notification_rules(
|
|
event_type: str,
|
|
request_obj,
|
|
context: dict,
|
|
pdf_path: Path | None = None,
|
|
) -> None:
|
|
language_code = (getattr(request_obj, 'preferred_language', '') or 'de').split('-')[0]
|
|
rules = NotificationRule.objects.filter(event_type=event_type, is_active=True).order_by('sort_order', 'id')
|
|
for rule in rules:
|
|
if not rule_matches(rule, request_obj):
|
|
continue
|
|
|
|
recipients = parse_recipients(rule.recipients)
|
|
if not recipients:
|
|
continue
|
|
|
|
attachments = [pdf_path] if (pdf_path and rule.include_pdf_attachment) else None
|
|
template_key = (rule.template_key or '').strip()
|
|
known_keys = {k for k, _ in NotificationTemplate.TEMPLATE_CHOICES}
|
|
|
|
if template_key and template_key in known_keys:
|
|
send_templated_email(
|
|
template_key=template_key,
|
|
context=context,
|
|
to=recipients,
|
|
attachments=attachments,
|
|
language_code=language_code,
|
|
)
|
|
continue
|
|
|
|
subject = rule.translated_custom_subject(language_code)
|
|
body = rule.translated_custom_body(language_code)
|
|
if not subject and not body:
|
|
continue
|
|
|
|
subject_rendered = Template(subject or f'[{event_type}] Regelmail').render(context).strip()
|
|
body_rendered = Template(body or '-').render(context).strip()
|
|
send_workflow_email(
|
|
subject=subject_rendered,
|
|
body=body_rendered,
|
|
to=recipients,
|
|
attachments=attachments,
|
|
)
|
|
|
|
|
|
def schedule_welcome_email(request_obj: OnboardingRequest, *, send_scheduled_welcome_email_task) -> None:
|
|
recipient = (request_obj.work_email or '').strip().lower()
|
|
if not recipient:
|
|
return
|
|
config = WorkflowConfig.objects.order_by('id').first()
|
|
delay_days = 5
|
|
if config:
|
|
delay_days = max(0, int(config.welcome_email_delay_days or 5))
|
|
send_at = timezone.now() + timedelta(days=delay_days)
|
|
scheduled, _ = ScheduledWelcomeEmail.objects.update_or_create(
|
|
onboarding_request=request_obj,
|
|
defaults={
|
|
'recipient_email': recipient,
|
|
'send_at': send_at,
|
|
'status': 'scheduled',
|
|
'last_error': '',
|
|
'sent_at': None,
|
|
},
|
|
)
|
|
try:
|
|
async_result = send_scheduled_welcome_email_task.apply_async(args=[scheduled.id], eta=send_at)
|
|
scheduled.celery_task_id = async_result.id or ''
|
|
scheduled.save(update_fields=['celery_task_id', 'updated_at'])
|
|
except Exception as exc:
|
|
scheduled.status = 'failed'
|
|
scheduled.last_error = f'Scheduling failed: {exc}'
|
|
scheduled.save(update_fields=['status', 'last_error', 'updated_at'])
|