106 lines
4.3 KiB
Python
106 lines
4.3 KiB
Python
from datetime import timedelta
|
|
|
|
from django.db.models import Count
|
|
from django.utils import timezone
|
|
from django.utils.translation import get_language, gettext as _, override
|
|
|
|
from .backup_ops import latest_backup_health_snapshot
|
|
from .form_builder import get_custom_field_configs
|
|
from .forms import OffboardingRequestForm, OnboardingRequestForm
|
|
from .models import AsyncTaskLog, OffboardingRequest, OnboardingRequest
|
|
from .roles import user_has_capability
|
|
|
|
|
|
def form_field_labels(form_type: str) -> dict[str, str]:
|
|
if form_type == 'onboarding':
|
|
return {name: str(field.label or name) for name, field in OnboardingRequestForm.base_fields.items()}
|
|
if form_type == 'offboarding':
|
|
return {name: str(field.label or name) for name, field in OffboardingRequestForm.base_fields.items()}
|
|
return {}
|
|
|
|
|
|
def request_target_label(obj, kind: str | None = None) -> str:
|
|
request_kind = (kind or '').strip()
|
|
if not request_kind:
|
|
request_kind = 'onboarding' if isinstance(obj, OnboardingRequest) else 'offboarding'
|
|
name = (getattr(obj, 'full_name', '') or '').strip() or f'#{getattr(obj, "id", "?")}'
|
|
email = (getattr(obj, 'work_email', '') or '').strip()
|
|
created_at = getattr(obj, 'created_at', None)
|
|
date_label = created_at.strftime('%Y-%m-%d') if created_at else ''
|
|
parts = [request_kind.capitalize(), name]
|
|
if email:
|
|
parts.append(f'<{email}>')
|
|
if date_label:
|
|
parts.append(date_label)
|
|
return ' | '.join(parts)
|
|
|
|
|
|
def request_status_label(status_key: str, language_code: str | None = None) -> str:
|
|
lang = ((language_code or 'de').split('-')[0] or 'de').lower()
|
|
with override(lang):
|
|
labels = {
|
|
'submitted': _('Eingereicht'),
|
|
'processing': _('In Bearbeitung'),
|
|
'completed': _('Abgeschlossen'),
|
|
'failed': _('Fehlgeschlagen'),
|
|
}
|
|
return labels.get(status_key, status_key)
|
|
|
|
|
|
def request_custom_field_details(obj, kind: str, language_code: str | None = None) -> list[dict[str, str]]:
|
|
form_type = 'onboarding' if kind == 'onboarding' else 'offboarding'
|
|
language_code = ((language_code or getattr(obj, 'preferred_language', '') or get_language() or 'de').split('-')[0]).lower()
|
|
values = getattr(obj, 'custom_field_values', {}) or {}
|
|
rows = []
|
|
yes_label = 'Ja' if language_code == 'de' else 'Yes'
|
|
for cfg in get_custom_field_configs(form_type, include_inactive=True):
|
|
raw_value = values.get(cfg.field_key)
|
|
if raw_value in (None, '', False, []):
|
|
continue
|
|
if isinstance(raw_value, bool):
|
|
display_value = str(yes_label) if raw_value else ''
|
|
elif isinstance(raw_value, list):
|
|
display_value = ', '.join(str(item).strip() for item in raw_value if str(item).strip())
|
|
else:
|
|
display_value = str(raw_value).strip()
|
|
if not display_value:
|
|
continue
|
|
rows.append(
|
|
{
|
|
'label': cfg.translated_label(language_code),
|
|
'value': display_value,
|
|
'section': cfg.section_key,
|
|
'sort_order': cfg.sort_order,
|
|
}
|
|
)
|
|
rows.sort(key=lambda item: (item['section'], item['sort_order'], item['label']))
|
|
return rows
|
|
|
|
|
|
def ops_summary_for_user(user) -> dict[str, object]:
|
|
can_view_jobs = user_has_capability(user, 'view_job_monitor')
|
|
can_manage_backups = user_has_capability(user, 'manage_backups')
|
|
summary: dict[str, object] = {
|
|
'show': can_view_jobs or can_manage_backups,
|
|
'can_view_jobs': can_view_jobs,
|
|
'can_manage_backups': can_manage_backups,
|
|
'failed_count_24h': 0,
|
|
'started_count_24h': 0,
|
|
'success_count_24h': 0,
|
|
'recent_failed_logs': [],
|
|
'backup_health': latest_backup_health_snapshot() if can_manage_backups else None,
|
|
}
|
|
if not can_view_jobs:
|
|
return summary
|
|
|
|
since = timezone.now() - timedelta(hours=24)
|
|
logs = AsyncTaskLog.objects.filter(started_at__gte=since)
|
|
counts = {row['status']: row['count'] for row in logs.values('status').annotate(count=Count('id'))}
|
|
summary['failed_count_24h'] = counts.get('failed', 0)
|
|
summary['started_count_24h'] = counts.get('started', 0)
|
|
summary['success_count_24h'] = counts.get('succeeded', 0)
|
|
summary['recent_failed_logs'] = list(
|
|
AsyncTaskLog.objects.filter(status='failed').order_by('-started_at', '-id')[:5]
|
|
)
|
|
return summary
|