from datetime import timedelta from django.db.models import Count from django.utils import timezone from django.utils.translation import get_language, gettext as _, override from .backup_ops import latest_backup_health_snapshot from .form_builder import get_custom_field_configs from .forms import OffboardingRequestForm, OnboardingRequestForm from .models import AsyncTaskLog, OffboardingRequest, OnboardingRequest from .roles import user_has_capability def form_field_labels(form_type: str) -> dict[str, str]: if form_type == 'onboarding': return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()} if form_type == 'offboarding': return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()} return {} def request_target_label(obj, kind: str | None = None) -> str: request_kind = (kind or '').strip() if not request_kind: request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding' name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}' email = (getattr(obj, 'work_email', '') or '').strip() created_at = getattr(obj, 'created_at', None) date_label = created_at.strftime('%Y-%m-%d') if created_at else '' parts = [request_kind.capitalize(), name] if email: parts.append(f'<{email}>') if date_label: parts.append(date_label) return ' | '.join(parts) def request_status_label(status_key: str, language_code: str | None = None) -> str: lang = ((language_code or 'de').split('-')[0] or 'de').lower() with override(lang): labels = { 'submitted': _('Eingereicht'), 'processing': _('In Bearbeitung'), 'completed': _('Abgeschlossen'), 'failed': _('Fehlgeschlagen'), } return labels.get(status_key, status_key) def request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]: form_type = 'onboarding' if kind == 'onboarding' else 'offboarding' language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower() values = getattr(obj, 'custom_field_values', {}) or {} rows = [] yes_label = 'Ja' if language_code == 'de' else 'Yes' for cfg in get_custom_field_configs(form_type, include_inactive=True): raw_value = values.get(cfg.field_key) if raw_value in (None, '', False, []): continue if isinstance(raw_value, bool): display_value = str(yes_label) if raw_value else '' elif isinstance(raw_value, list): display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip()) else: display_value = str(raw_value).strip() if not display_value: continue rows.append( { 'label': cfg.translated_label(language_code), 'value': display_value, 'section': cfg.section_key, 'sort_order': cfg.sort_order, } ) rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label'])) return rows def ops_summary_for_user(user) -> dict[str, object]: can_view_jobs = user_has_capability(user, 'view_job_monitor') can_manage_backups = user_has_capability(user, 'manage_backups') summary: dict[str, object] = { 'show': can_view_jobs or can_manage_backups, 'can_view_jobs': can_view_jobs, 'can_manage_backups': can_manage_backups, 'failed_count_24h': 0, 'started_count_24h': 0, 'success_count_24h': 0, 'recent_failed_logs': [], 'backup_health': latest_backup_health_snapshot() if can_manage_backups else None, } if not can_view_jobs: return summary since = timezone.now() - timedelta(hours=24) logs = AsyncTaskLog.objects.filter(started_at__gte=since) counts = {row['status']: row['count'] for row in logs.values('status').annotate(count=Count('id'))} summary['failed_count_24h'] = counts.get('failed', 0) summary['started_count_24h'] = counts.get('started', 0) summary['success_count_24h'] = counts.get('succeeded', 0) summary['recent_failed_logs'] = list( AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5] ) return summary