Files
workdock-platform/backend/workflows/emailing.py

77 lines
2.5 KiB
Python

from django.conf import settings
from django.core.mail import EmailMessage, get_connection
from .branding import get_branded_from_email
from .models import SystemEmailConfig, WorkflowConfig
def _active_smtp_config() -> SystemEmailConfig | None:
return SystemEmailConfig.objects.filter(is_active=True).order_by('-updated_at').first()
def _effective_smtp_settings() -> dict:
cfg = _active_smtp_config()
if not cfg or not cfg.host:
workflow_cfg = WorkflowConfig.objects.order_by('id').first()
if workflow_cfg and workflow_cfg.smtp_server:
return {
'host': workflow_cfg.smtp_server,
'port': workflow_cfg.smtp_port or settings.EMAIL_PORT,
'username': workflow_cfg.email_account or settings.EMAIL_HOST_USER,
'password': workflow_cfg.email_password or settings.EMAIL_HOST_PASSWORD,
'use_tls': workflow_cfg.smtp_use_tls,
'use_ssl': workflow_cfg.smtp_use_ssl,
'from_email': workflow_cfg.email_account or settings.DEFAULT_FROM_EMAIL,
}
return {
'host': settings.EMAIL_HOST,
'port': settings.EMAIL_PORT,
'username': settings.EMAIL_HOST_USER,
'password': settings.EMAIL_HOST_PASSWORD,
'use_tls': settings.EMAIL_USE_TLS,
'use_ssl': settings.EMAIL_USE_SSL,
'from_email': settings.DEFAULT_FROM_EMAIL,
}
return {
'host': cfg.host,
'port': cfg.port,
'username': cfg.username,
'password': cfg.password,
'use_tls': cfg.use_tls,
'use_ssl': cfg.use_ssl,
'from_email': cfg.from_email or settings.DEFAULT_FROM_EMAIL,
}
def send_system_email(
subject: str,
body: str,
to: list[str],
attachments: list[str] | None = None,
from_email: str | None = None,
) -> None:
smtp = _effective_smtp_settings()
connection = get_connection(
backend='django.core.mail.backends.smtp.EmailBackend',
host=smtp['host'],
port=smtp['port'],
username=smtp['username'],
password=smtp['password'],
use_tls=smtp['use_tls'],
use_ssl=smtp['use_ssl'],
timeout=settings.SMTP_TIMEOUT_SECONDS,
)
msg = EmailMessage(
subject=subject,
body=body,
from_email=get_branded_from_email(from_email or smtp['from_email']) or (from_email or smtp['from_email']),
to=to,
connection=connection,
)
for path in attachments or []:
msg.attach_file(path)
msg.send(fail_silently=False)