from datetime import timedelta from pathlib import Path from django.conf import settings from django.contrib import messages from django.db.models import Q from django.shortcuts import get_object_or_404, redirect, render from django.utils import timezone from django.utils.translation import get_language, gettext as _ from .branding import get_company_email_domain from .form_builder import LOCKED_SECTION_RULES, OFFBOARDING_PAGE_ORDER, ensure_form_section_configs, get_section_definitions from .forms import OffboardingRequestForm, OnboardingRequestForm from .models import AdminAuditLog, EmployeeProfile, OffboardingRequest, OnboardingIntroductionSession, OnboardingRequest, ScheduledWelcomeEmail, WorkflowConfig from .roles import user_has_capability from .tasks import _generate_onboarding_intro_pdf, _generate_onboarding_intro_session_pdf, build_intro_sections_for_request, process_offboarding_request, process_onboarding_request def request_timeline_page_impl(request, kind: str, request_id: int, *, request_target_label_fn, request_custom_field_details_fn, audit_action_label_fn): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') request_label = request_target_label_fn(obj, kind) custom_field_details = request_custom_field_details_fn(obj, kind, getattr(request, 'LANGUAGE_CODE', None)) audit_rows = list( AdminAuditLog.objects.select_related('actor') .filter(target_type__in=[kind, 'request']) .filter(Q(target_id=request_id) | Q(target_label__icontains=(obj.full_name or '').strip())) .order_by('-created_at', '-id')[:200] ) timeline_rows = [ { 'created_at': obj.created_at, 'kind': 'system', 'title': _('Anfrage erstellt'), 'summary': request_label, 'meta': _('Status: %(status)s') % {'status': obj.get_processing_status_display()}, 'details': {item['label']: item['value'] for item in custom_field_details}, } ] contract_start = getattr(obj, 'contract_start', None) if contract_start: timeline_rows.append( { 'created_at': timezone.make_aware(timezone.datetime.combine(contract_start, timezone.datetime.min.time())), 'kind': 'milestone', 'title': _('Vertragsbeginn'), 'summary': str(contract_start), 'meta': _('Geplanter Start'), } ) handover_date = getattr(obj, 'handover_date', None) if handover_date: timeline_rows.append( { 'created_at': timezone.make_aware(timezone.datetime.combine(handover_date, timezone.datetime.min.time())), 'kind': 'milestone', 'title': _('Geräteübergabe / Hardware-Abholung'), 'summary': str(handover_date), 'meta': _('Geplanter Hardware-Termin'), } ) if getattr(obj, 'generated_pdf_path', ''): timeline_rows.append( { 'created_at': obj.created_at, 'kind': 'document', 'title': _('PDF verfügbar'), 'summary': Path(obj.generated_pdf_path).name, 'meta': '', 'url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}", } ) for row in audit_rows: timeline_rows.append( { 'created_at': row.created_at, 'kind': 'audit', 'title': audit_action_label_fn(row.action), 'summary': row.target_label or row.target_type or '-', 'meta': row.actor_display or '-', 'details': row.details, } ) if kind == 'onboarding': intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first() if intro_session: timeline_rows.append( { 'created_at': intro_session.updated_at, 'kind': 'session', 'title': _('Einweisungssitzung'), 'summary': intro_session.get_status_display(), 'meta': intro_session.completed_by_name or '-', 'url': (f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" if intro_session.exported_pdf_path else ''), } ) welcome_email = ScheduledWelcomeEmail.objects.filter(onboarding_request=obj).first() if welcome_email: timeline_rows.append( { 'created_at': welcome_email.updated_at, 'kind': 'email', 'title': _('Welcome E-Mail'), 'summary': welcome_email.get_status_display(), 'meta': welcome_email.recipient_email, } ) timeline_rows.sort(key=lambda item: item['created_at']) return render( request, 'workflows/request_timeline.html', { 'request_kind': kind, 'request_obj': obj, 'request_label': request_label, 'timeline_rows': timeline_rows, 'custom_field_details': custom_field_details, 'contract_start': getattr(obj, 'contract_start', None), 'handover_date': getattr(obj, 'handover_date', None), }, ) def requests_dashboard_impl(request, *, audit_fn, request_target_label_fn, request_status_label_fn): if not user_has_capability(request.user, 'access_requests_dashboard'): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('home') if request.method == 'POST': if not user_has_capability(request.user, 'delete_requests'): messages.error(request, _('Sie haben keine Berechtigung für diese Aktion.')) return redirect('requests_dashboard') selected = request.POST.getlist('selected_requests') single_delete = (request.POST.get('single_delete') or '').strip() if single_delete: selected = [single_delete] if not selected: messages.warning(request, _('Keine Einträge ausgewählt.')) return redirect('requests_dashboard') deleted_count = 0 invalid_count = 0 deleted_labels = [] for token in selected: try: kind, raw_id = token.split(':', 1) request_id = int(raw_id) except (ValueError, TypeError): invalid_count += 1 continue model = None if kind == 'onboarding': model = OnboardingRequest elif kind == 'offboarding': model = OffboardingRequest else: invalid_count += 1 continue obj = model.objects.filter(id=request_id).first() if not obj: continue deleted_labels.append(request_target_label_fn(obj, kind)) obj.delete() deleted_count += 1 if deleted_count: audit_fn( request, 'requests_deleted', target_type='request', target_label='Dashboard bulk/single delete', details={ 'deleted_count': deleted_count, 'invalid_count': invalid_count, 'selected': selected, 'request_labels': deleted_labels, }, ) messages.success(request, _('%(count)s Eintrag/Einträge gelöscht.') % {'count': deleted_count}) if invalid_count: messages.warning(request, _('%(count)s Auswahl(en) konnten nicht verarbeitet werden.') % {'count': invalid_count}) if not deleted_count and not invalid_count: messages.info(request, _('Keine passenden Einträge gefunden.')) return redirect('requests_dashboard') search_query = request.GET.get('q', '').strip() type_filter = (request.GET.get('type') or '').strip().lower() status_filter = (request.GET.get('status') or '').strip().lower() department_filter = (request.GET.get('department') or '').strip() date_from = (request.GET.get('date_from') or '').strip() date_to = (request.GET.get('date_to') or '').strip() onboarding_qs = OnboardingRequest.objects.order_by('-created_at') offboarding_qs = OffboardingRequest.objects.order_by('-created_at') all_onboarding = OnboardingRequest.objects.all() all_offboarding = OffboardingRequest.objects.all() if search_query: onboarding_qs = onboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query)) offboarding_qs = offboarding_qs.filter(Q(full_name__icontains=search_query) | Q(work_email__icontains=search_query)) if status_filter in {'submitted', 'processing', 'completed', 'failed'}: onboarding_qs = onboarding_qs.filter(processing_status=status_filter) offboarding_qs = offboarding_qs.filter(processing_status=status_filter) if department_filter: onboarding_qs = onboarding_qs.filter(department=department_filter) offboarding_qs = offboarding_qs.filter(department=department_filter) if date_from: onboarding_qs = onboarding_qs.filter(created_at__date__gte=date_from) offboarding_qs = offboarding_qs.filter(created_at__date__gte=date_from) if date_to: onboarding_qs = onboarding_qs.filter(created_at__date__lte=date_to) offboarding_qs = offboarding_qs.filter(created_at__date__lte=date_to) if type_filter == 'onboarding': offboarding_qs = offboarding_qs.none() elif type_filter == 'offboarding': onboarding_qs = onboarding_qs.none() onboarding_items = onboarding_qs[:50] offboarding_items = offboarding_qs[:50] language_code = ( request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME) or getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de' ).split('-')[0].lower() rows = [] for obj in onboarding_items: intro_session = OnboardingIntroductionSession.objects.filter(onboarding_request=obj).first() if intro_session and intro_session.exported_pdf_path: intro_session.exported_pdf_url = f"/media/pdfs/{Path(intro_session.exported_pdf_path).name}" rows.append( { 'id': obj.id, 'kind': 'Onboarding', 'kind_slug': 'onboarding', 'name': obj.full_name, 'work_email': obj.work_email, 'created_at': obj.created_at, 'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None, 'intro_pdf_url': f"/media/pdfs/{Path(obj.intro_pdf_path).name}" if obj.intro_pdf_path else None, 'intro_session': intro_session, 'status': request_status_label_fn(obj.processing_status, language_code), 'status_key': obj.processing_status, 'last_error': obj.last_error, } ) for obj in offboarding_items: rows.append( { 'id': obj.id, 'kind': 'Offboarding', 'kind_slug': 'offboarding', 'name': obj.full_name, 'work_email': obj.work_email, 'created_at': obj.created_at, 'pdf_url': f"/media/pdfs/{Path(obj.generated_pdf_path).name}" if obj.generated_pdf_path else None, 'intro_pdf_url': None, 'intro_session': None, 'status': request_status_label_fn(obj.processing_status, language_code), 'status_key': obj.processing_status, 'last_error': obj.last_error, } ) rows.sort(key=lambda x: x['created_at'], reverse=True) today = timezone.localdate() start_date = today - timedelta(days=13) onboarding_daily = {} offboarding_daily = {} for i in range(14): day = start_date + timedelta(days=i) onboarding_daily[day] = 0 offboarding_daily[day] = 0 for dt in onboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True): onboarding_daily[timezone.localtime(dt).date()] += 1 for dt in offboarding_qs.filter(created_at__date__gte=start_date).values_list('created_at', flat=True): offboarding_daily[timezone.localtime(dt).date()] += 1 chart_points = [] max_total = 1 for i in range(14): day = start_date + timedelta(days=i) on_count = onboarding_daily[day] off_count = offboarding_daily[day] total = on_count + off_count max_total = max(max_total, total) chart_points.append( { 'label': day.strftime('%d.%m'), 'onboarding': on_count, 'offboarding': off_count, 'total': total, } ) for point in chart_points: point['height'] = max(8, int((point['total'] / max_total) * 84)) onboarding_total = onboarding_qs.count() offboarding_total = offboarding_qs.count() departments = sorted( { value.strip() for value in list(all_onboarding.exclude(department='').values_list('department', flat=True)) + list(all_offboarding.exclude(department='').values_list('department', flat=True)) if value and value.strip() }, key=str.lower, ) status_choices = [ {'value': 'submitted', 'label': request_status_label_fn('submitted', language_code)}, {'value': 'processing', 'label': request_status_label_fn('processing', language_code)}, {'value': 'completed', 'label': request_status_label_fn('completed', language_code)}, {'value': 'failed', 'label': request_status_label_fn('failed', language_code)}, ] has_filters = any([search_query, type_filter, status_filter, department_filter, date_from, date_to]) column_count = 4 if user_has_capability(request.user, 'delete_requests'): column_count += 1 if user_has_capability(request.user, 'run_intro_session') or user_has_capability(request.user, 'generate_intro_pdfs'): column_count += 1 if user_has_capability(request.user, 'access_requests_dashboard'): column_count += 1 return render( request, 'workflows/requests_dashboard.html', { 'rows': rows[:60], 'search_query': search_query, 'selected_type': type_filter, 'selected_status': status_filter, 'selected_department': department_filter, 'date_from': date_from, 'date_to': date_to, 'departments': departments, 'status_choices': status_choices, 'has_filters': has_filters, 'column_count': column_count, 'onboarding_total': onboarding_total, 'offboarding_total': offboarding_total, 'combined_total': onboarding_total + offboarding_total, 'chart_points': chart_points, }, ) def onboarding_create_impl( request, *, build_onboarding_layout_fn, build_onboarding_sections_fn, normalized_conditional_rule_payload_fn, display_user_name_fn, onboarding_inline_checks, onboarding_checkbox_lists, ): config = WorkflowConfig.objects.order_by('id').first() legal_text = ( config.legal_text if config and config.legal_text else 'Eine Ausrüstungsvereinbarung erlaubt es einem Mitarbeitenden, die Ausrüstung des Unternehmens im Außendienst oder zu Hause zu nutzen und mitzunehmen.' ) if request.method == 'POST': form = OnboardingRequestForm(request.POST, request.FILES, requester_email=request.user.email) if form.is_valid(): obj = form.save() obj.onboarded_by_name = display_user_name_fn(request.user) obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0]) obj.save(update_fields=['onboarded_by_name', 'preferred_language']) process_onboarding_request.delay(obj.id) return redirect(f"/onboarding/new/?saved=1&id={obj.id}") else: form = OnboardingRequestForm(requester_email=request.user.email) onboarding_blocks = build_onboarding_layout_fn(form) field_pages = getattr(form, '_field_page_keys', {}) section_configs = ensure_form_section_configs('onboarding') visible_section_keys = set() for section in get_section_definitions('onboarding'): key = section['key'] if section.get('is_custom'): if section.get('is_active', True): visible_section_keys.add(key) elif key in LOCKED_SECTION_RULES.get('onboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible: visible_section_keys.add(key) onboarding_sections = build_onboarding_sections_fn(onboarding_blocks, field_pages, visible_section_keys=visible_section_keys) onboarding_conditional_rules = normalized_conditional_rule_payload_fn('onboarding') return render( request, 'workflows/onboarding_form.html', { 'form': form, 'onboarding_blocks': onboarding_blocks, 'onboarding_sections': onboarding_sections, 'onboarding_inline_checks': onboarding_inline_checks, 'onboarding_checkbox_lists': onboarding_checkbox_lists, 'onboarding_conditional_rules': onboarding_conditional_rules, 'legal_text': legal_text, 'saved': request.GET.get('saved') == '1', 'saved_request_id': request.GET.get('id', ''), 'portal_email_domain': get_company_email_domain(), }, ) def onboarding_success_impl(request, request_id: int): obj = get_object_or_404(OnboardingRequest, id=request_id) pdf_url = None if obj.generated_pdf_path: pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}" return render(request, 'workflows/onboarding_success.html', {'obj': obj, 'pdf_url': pdf_url}) def generate_onboarding_intro_pdf_impl(request, request_id: int, *, audit_fn): obj = get_object_or_404(OnboardingRequest, id=request_id) pdf_path = _generate_onboarding_intro_pdf(obj, language_code=get_language()) obj.intro_pdf_path = str(pdf_path) obj.save(update_fields=['intro_pdf_path']) audit_fn(request, 'intro_pdf_generated', target_type='onboarding', target_id=obj.id, target_label=obj.full_name) messages.success(request, _('Einweisungs- und Übergabeprotokoll wurde erzeugt.')) return redirect('requests_dashboard') def generate_onboarding_intro_session_pdf_impl(request, request_id: int, *, audit_fn, display_user_name_fn): onboarding = get_object_or_404(OnboardingRequest, id=request_id) session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding) pdf_path = _generate_onboarding_intro_session_pdf( session, admin_signature_name=display_user_name_fn(request.user), language_code=get_language(), ) session.exported_pdf_path = str(pdf_path) session.save(update_fields=['exported_pdf_path']) audit_fn(request, 'intro_live_pdf_generated', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name) messages.success(request, _('Einweisungsprotokoll aus Live-Status wurde erzeugt.')) return redirect('onboarding_intro_session_page', request_id=request_id) def onboarding_intro_session_page_impl(request, request_id: int, *, audit_fn, display_user_name_fn): onboarding = get_object_or_404(OnboardingRequest, id=request_id) session, _created = OnboardingIntroductionSession.objects.get_or_create(onboarding_request=onboarding) sections = build_intro_sections_for_request(onboarding, language_code=get_language()) if request.method == 'POST': checked_ids = set(request.POST.getlist('checked_items')) checklist_state = {} for section in sections: for item in section['items']: checklist_state[item['id']] = item['id'] in checked_ids action = (request.POST.get('session_action') or 'save').strip() session.checklist_state = checklist_state session.notes = (request.POST.get('notes') or '').strip() if action == 'reset': session.checklist_state = {} session.notes = '' session.status = 'draft' session.completed_at = None session.completed_by_name = '' session.exported_pdf_path = '' session.save(update_fields=['checklist_state', 'notes', 'status', 'completed_at', 'completed_by_name', 'exported_pdf_path']) audit_fn(request, 'intro_session_reset', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name) messages.success(request, _('Einweisung wurde zurückgesetzt.')) return redirect('onboarding_intro_session_page', request_id=request_id) if action == 'complete': session.status = 'completed' session.completed_at = timezone.now() session.completed_by_name = display_user_name_fn(request.user) audit_fn( request, 'intro_session_completed', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name, details={'checked_count': len([value for value in checklist_state.values() if value])}, ) messages.success(request, _('Einweisung wurde als abgeschlossen gespeichert.')) else: session.status = 'draft' session.completed_at = None session.completed_by_name = '' audit_fn( request, 'intro_session_saved', target_type='onboarding', target_id=onboarding.id, target_label=onboarding.full_name, details={'checked_count': len([value for value in checklist_state.values() if value])}, ) messages.success(request, _('Einweisung wurde als Entwurf gespeichert.')) session.save() return redirect('onboarding_intro_session_page', request_id=request_id) checked_map = session.checklist_state or {} checked_count = 0 total_count = 0 for section in sections: for item in section['items']: item['checked'] = bool(checked_map.get(item['id'])) total_count += 1 if item['checked']: checked_count += 1 salutation = (onboarding.get_gender_display() or '').strip() display_name = f"{salutation} {onboarding.full_name}".strip() if salutation else onboarding.full_name progress_percent = int((checked_count / total_count) * 100) if total_count else 0 return render( request, 'workflows/onboarding_intro_session.html', { 'onboarding': onboarding, 'session': session, 'display_name': display_name, 'sections': sections, 'checked_count': checked_count, 'total_count': total_count, 'progress_percent': progress_percent, 'session_pdf_url': f"/media/pdfs/{Path(session.exported_pdf_path).name}" if session.exported_pdf_path else None, }, ) def offboarding_create_impl(request, *, build_offboarding_sections_fn, display_user_name_fn): profile_id = request.GET.get('profile') search_query = request.GET.get('q', '').strip() selected_profile = None if profile_id: selected_profile = EmployeeProfile.objects.filter(id=profile_id).first() search_results = [] if search_query: search_results = list( EmployeeProfile.objects.filter(full_name__icontains=search_query)[:10] ) + list( EmployeeProfile.objects.filter(work_email__icontains=search_query)[:10] ) # preserve order while removing duplicates seen = set() unique = [] for r in search_results: if r.id not in seen: unique.append(r) seen.add(r.id) search_results = unique[:10] if request.method == 'POST': form = OffboardingRequestForm(request.POST, prefill_profile=selected_profile) if form.is_valid(): obj = form.save(commit=False) if selected_profile: obj.employee_profile = selected_profile requester_email = (request.user.email or '').strip().lower() company_suffix = f"@{get_company_email_domain()}" if requester_email and requester_email.endswith(company_suffix): obj.requested_by_email = requester_email else: obj.requested_by_email = settings.DEFAULT_FROM_EMAIL obj.requested_by_name = display_user_name_fn(request.user) obj.preferred_language = ((getattr(request, 'LANGUAGE_CODE', '') or get_language() or 'de').split('-')[0]) obj.save() process_offboarding_request.delay(obj.id) return redirect(f"/offboarding/new/?saved=1&id={obj.id}") else: form = OffboardingRequestForm(prefill_profile=selected_profile, initial={'search_query': search_query}) field_pages = getattr(form, '_field_page_keys', {}) section_configs = ensure_form_section_configs('offboarding') visible_section_keys = { key for key in OFFBOARDING_PAGE_ORDER if key in LOCKED_SECTION_RULES.get('offboarding', set()) or section_configs.get(key, None) is None or section_configs[key].is_visible } offboarding_sections = build_offboarding_sections_fn(form, visible_section_keys=visible_section_keys) return render( request, 'workflows/offboarding_form.html', { 'form': form, 'search_results': search_results, 'selected_profile': selected_profile, 'search_query': search_query, 'saved': request.GET.get('saved') == '1', 'saved_request_id': request.GET.get('id', ''), 'portal_email_domain': get_company_email_domain(), 'offboarding_sections': offboarding_sections, }, ) def offboarding_success_impl(request, request_id: int): obj = get_object_or_404(OffboardingRequest, id=request_id) pdf_url = None if obj.generated_pdf_path: pdf_url = f"/media/pdfs/{Path(obj.generated_pdf_path).name}" return render(request, 'workflows/offboarding_success.html', {'obj': obj, 'pdf_url': pdf_url}) def delete_request_from_dashboard_impl(request, kind: str, request_id: int, *, audit_fn, request_target_label_fn): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') target_label = request_target_label_fn(obj, kind) obj.delete() audit_fn(request, 'request_deleted', target_type=kind, target_id=request_id, target_label=target_label) messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde gelöscht.') return redirect('requests_dashboard') def retry_request_from_dashboard_impl(request, kind: str, request_id: int, *, audit_fn, request_target_label_fn): if kind == 'onboarding': obj = get_object_or_404(OnboardingRequest, id=request_id) obj.processing_status = 'submitted' obj.last_error = '' obj.save(update_fields=['processing_status', 'last_error']) process_onboarding_request.delay(obj.id) audit_fn(request, 'request_retried', target_type='onboarding', target_id=obj.id, target_label=request_target_label_fn(obj, 'onboarding')) elif kind == 'offboarding': obj = get_object_or_404(OffboardingRequest, id=request_id) obj.processing_status = 'submitted' obj.last_error = '' obj.save(update_fields=['processing_status', 'last_error']) process_offboarding_request.delay(obj.id) audit_fn(request, 'request_retried', target_type='offboarding', target_id=obj.id, target_label=request_target_label_fn(obj, 'offboarding')) else: messages.error(request, f'Unbekannter Typ: {kind}') return redirect('requests_dashboard') messages.success(request, f'{kind.capitalize()}-Anfrage #{request_id} wurde erneut angestoßen.') return redirect('requests_dashboard')