snapshot: preserve request status retry and i18n labels

This commit is contained in:
Md Bayazid Bostame
2026-03-25 20:42:01 +01:00
parent a8f7eadbc6
commit 197bd3c226
10 changed files with 851 additions and 583 deletions

View File

@@ -1195,159 +1195,183 @@ def _generate_offboarding_pdf(request_obj: OffboardingRequest) -> Path:
@shared_task
def process_onboarding_request(onboarding_request_id: int) -> None:
request_obj = OnboardingRequest.objects.get(id=onboarding_request_id)
it_email, general_info_email, business_card_email, hr_works_email, key_email = _resolve_workflow_emails()
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip()
request_obj.processing_status = 'processing'
request_obj.last_error = ''
request_obj.save(update_fields=['processing_status', 'last_error'])
try:
it_email, general_info_email, business_card_email, hr_works_email, key_email = _resolve_workflow_emails()
salutation = (request_obj.get_gender_display() or '').strip()
display_name = f"{salutation} {request_obj.full_name}".strip()
first_name, last_name = _split_name(request_obj.full_name)
EmployeeProfile.objects.update_or_create(
work_email=request_obj.work_email,
defaults={
'full_name': request_obj.full_name,
'first_name': first_name,
'last_name': last_name,
'department': request_obj.department,
'job_title': request_obj.job_title,
},
)
pdf_path = _generate_onboarding_pdf(request_obj)
request_obj.generated_pdf_path = str(pdf_path)
request_obj.save(update_fields=['generated_pdf_path'])
email_context = {
'FULL_NAME': display_name,
'VORNAME': first_name,
'NACHNAME': last_name,
'DEPARTMENT': request_obj.department or '-',
'CONTRACT_START': request_obj.contract_start,
'EMAIL': request_obj.work_email,
'REQUESTED_BY': request_obj.onboarded_by_email or '-',
'BUSINESS_CARD_NAME': request_obj.business_card_name or display_name,
'BUSINESS_CARD_TITLE': request_obj.business_card_title or '-',
'BUSINESS_CARD_EMAIL': request_obj.business_card_email or request_obj.work_email,
'BUSINESS_CARD_PHONE': request_obj.business_card_phone or '-',
'PDF_LINK': settings.ONBOARDING_SHARED_PDF_LINK,
}
_send_templated_email(
template_key='onboarding_it',
context=email_context,
to=[it_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
_send_templated_email(
template_key='onboarding_general_info',
context=email_context,
to=[general_info_email],
language_code=request_obj.preferred_language,
)
if request_obj.order_business_cards:
_send_templated_email(
template_key='onboarding_business_card',
context=email_context,
to=[business_card_email],
language_code=request_obj.preferred_language,
first_name, last_name = _split_name(request_obj.full_name)
EmployeeProfile.objects.update_or_create(
work_email=request_obj.work_email,
defaults={
'full_name': request_obj.full_name,
'first_name': first_name,
'last_name': last_name,
'department': request_obj.department,
'job_title': request_obj.job_title,
},
)
if 'HR Works' in request_obj.needed_accesses:
_send_templated_email(
template_key='onboarding_hr_works',
context=email_context,
to=[hr_works_email],
language_code=request_obj.preferred_language,
)
pdf_path = _generate_onboarding_pdf(request_obj)
request_obj.generated_pdf_path = str(pdf_path)
request_obj.save(update_fields=['generated_pdf_path'])
if 'Schlüssel' in request_obj.needed_devices:
_send_templated_email(
template_key='onboarding_key',
context=email_context,
to=[key_email],
language_code=request_obj.preferred_language,
)
email_context = {
'FULL_NAME': display_name,
'VORNAME': first_name,
'NACHNAME': last_name,
'DEPARTMENT': request_obj.department or '-',
'CONTRACT_START': request_obj.contract_start,
'EMAIL': request_obj.work_email,
'REQUESTED_BY': request_obj.onboarded_by_email or '-',
'BUSINESS_CARD_NAME': request_obj.business_card_name or display_name,
'BUSINESS_CARD_TITLE': request_obj.business_card_title or '-',
'BUSINESS_CARD_EMAIL': request_obj.business_card_email or request_obj.work_email,
'BUSINESS_CARD_PHONE': request_obj.business_card_phone or '-',
'PDF_LINK': settings.ONBOARDING_SHARED_PDF_LINK,
}
if request_obj.onboarded_by_email:
_send_templated_email(
template_key='onboarding_reference',
template_key='onboarding_it',
context=email_context,
to=[request_obj.onboarded_by_email],
to=[it_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
_send_templated_email(
template_key='onboarding_general_info',
context=email_context,
to=[general_info_email],
language_code=request_obj.preferred_language,
)
_apply_notification_rules(
event_type='onboarding',
request_obj=request_obj,
context=email_context,
pdf_path=pdf_path,
)
if request_obj.order_business_cards:
_send_templated_email(
template_key='onboarding_business_card',
context=email_context,
to=[business_card_email],
language_code=request_obj.preferred_language,
)
_schedule_welcome_email(request_obj)
if 'HR Works' in request_obj.needed_accesses:
_send_templated_email(
template_key='onboarding_hr_works',
context=email_context,
to=[hr_works_email],
language_code=request_obj.preferred_language,
)
upload_to_nextcloud(pdf_path, Path(pdf_path).name)
if 'Schlüssel' in request_obj.needed_devices:
_send_templated_email(
template_key='onboarding_key',
context=email_context,
to=[key_email],
language_code=request_obj.preferred_language,
)
if request_obj.onboarded_by_email:
_send_templated_email(
template_key='onboarding_reference',
context=email_context,
to=[request_obj.onboarded_by_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
_apply_notification_rules(
event_type='onboarding',
request_obj=request_obj,
context=email_context,
pdf_path=pdf_path,
)
_schedule_welcome_email(request_obj)
upload_to_nextcloud(pdf_path, Path(pdf_path).name)
request_obj.processing_status = 'completed'
request_obj.last_error = ''
request_obj.save(update_fields=['processing_status', 'last_error'])
except Exception as exc:
request_obj.processing_status = 'failed'
request_obj.last_error = str(exc)
request_obj.save(update_fields=['processing_status', 'last_error'])
raise
@shared_task
def process_offboarding_request(offboarding_request_id: int) -> None:
request_obj = OffboardingRequest.objects.get(id=offboarding_request_id)
it_email, general_info_email, _, hr_works_email, _ = _resolve_workflow_emails()
request_obj.processing_status = 'processing'
request_obj.last_error = ''
request_obj.save(update_fields=['processing_status', 'last_error'])
try:
it_email, general_info_email, _, hr_works_email, _ = _resolve_workflow_emails()
pdf_path = _generate_offboarding_pdf(request_obj)
request_obj.generated_pdf_path = str(pdf_path)
request_obj.save(update_fields=['generated_pdf_path'])
pdf_path = _generate_offboarding_pdf(request_obj)
request_obj.generated_pdf_path = str(pdf_path)
request_obj.save(update_fields=['generated_pdf_path'])
email_context = {
'FULL_NAME': request_obj.full_name,
'DEPARTMENT': request_obj.department or '-',
'LAST_WORKING_DAY': request_obj.last_working_day,
'REQUESTED_BY': request_obj.requested_by_email,
'EMAIL': request_obj.work_email,
}
email_context = {
'FULL_NAME': request_obj.full_name,
'DEPARTMENT': request_obj.department or '-',
'LAST_WORKING_DAY': request_obj.last_working_day,
'REQUESTED_BY': request_obj.requested_by_email,
'EMAIL': request_obj.work_email,
}
_send_templated_email(
template_key='offboarding_it',
context=email_context,
to=[it_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
_send_templated_email(
template_key='offboarding_general_info',
context=email_context,
to=[general_info_email],
language_code=request_obj.preferred_language,
)
had_hr_works = OnboardingRequest.objects.filter(
work_email=request_obj.work_email,
needed_accesses__icontains='HR Works',
).exists()
if had_hr_works:
_send_templated_email(
template_key='offboarding_hr_works_disable',
template_key='offboarding_it',
context=email_context,
to=[hr_works_email],
to=[it_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
_send_templated_email(
template_key='offboarding_general_info',
context=email_context,
to=[general_info_email],
language_code=request_obj.preferred_language,
)
_send_templated_email(
template_key='offboarding_reference',
context=email_context,
to=[request_obj.requested_by_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
had_hr_works = OnboardingRequest.objects.filter(
work_email=request_obj.work_email,
needed_accesses__icontains='HR Works',
).exists()
if had_hr_works:
_send_templated_email(
template_key='offboarding_hr_works_disable',
context=email_context,
to=[hr_works_email],
language_code=request_obj.preferred_language,
)
_apply_notification_rules(
event_type='offboarding',
request_obj=request_obj,
context=email_context,
pdf_path=pdf_path,
)
_send_templated_email(
template_key='offboarding_reference',
context=email_context,
to=[request_obj.requested_by_email],
attachments=[pdf_path],
language_code=request_obj.preferred_language,
)
upload_to_nextcloud(pdf_path, Path(pdf_path).name)
_apply_notification_rules(
event_type='offboarding',
request_obj=request_obj,
context=email_context,
pdf_path=pdf_path,
)
upload_to_nextcloud(pdf_path, Path(pdf_path).name)
request_obj.processing_status = 'completed'
request_obj.last_error = ''
request_obj.save(update_fields=['processing_status', 'last_error'])
except Exception as exc:
request_obj.processing_status = 'failed'
request_obj.last_error = str(exc)
request_obj.save(update_fields=['processing_status', 'last_error'])
raise
@shared_task