from datetime import timedelta from django.contrib import messages from django.contrib.auth import get_user_model from django.shortcuts import get_object_or_404, redirect, render from django.utils import timezone from django.utils.translation import gettext as _ from .app_registry import get_portal_app_registry_rows, normalize_portal_app_sort_orders from .branding import get_portal_trial_config, is_trial_expired from .forms import PortalBrandingForm, PortalCompanyConfigForm, PortalTrialConfigForm, UserManagementCreateForm from .models import PortalAppConfig, PortalBranding, PortalCompanyConfig, UserNotification, UserProfile from .notifications import notify_user from .roles import ROLE_GROUP_NAMES, ROLE_PLATFORM_OWNER, get_user_role_key def portal_app_registry_page_impl(request, *, translate_choice_list): return render( request, 'workflows/app_registry.html', { 'rows': get_portal_app_registry_rows(), 'section_choices': translate_choice_list(PortalAppConfig.SECTION_CHOICES), }, ) def save_portal_app_registry_impl(request, *, audit_fn): rows = get_portal_app_registry_rows() updated_configs = [] for row in rows: config = row['config'] key = config.key config.section = (request.POST.get(f'section__{key}') or config.section).strip() if config.section not in dict(PortalAppConfig.SECTION_CHOICES): config.section = row['default_section'] config.is_enabled = request.POST.get(f'is_enabled__{key}') == 'on' config.visible_to_super_admin = request.POST.get(f'visible_to_super_admin__{key}') == 'on' config.visible_to_admin = request.POST.get(f'visible_to_admin__{key}') == 'on' config.visible_to_it_staff = request.POST.get(f'visible_to_it_staff__{key}') == 'on' config.visible_to_staff = request.POST.get(f'visible_to_staff__{key}') == 'on' try: config.sort_order = int((request.POST.get(f'sort_order__{key}') or '').strip() or row['default_sort_order']) except ValueError: config.sort_order = row['default_sort_order'] config.title_override = (request.POST.get(f'title_override__{key}') or '').strip() config.title_override_en = (request.POST.get(f'title_override_en__{key}') or '').strip() config.description_override = (request.POST.get(f'description_override__{key}') or '').strip() config.description_override_en = (request.POST.get(f'description_override_en__{key}') or '').strip() config.action_label_override = (request.POST.get(f'action_label_override__{key}') or '').strip() config.action_label_override_en = (request.POST.get(f'action_label_override_en__{key}') or '').strip() config.save() updated_configs.append(config) normalize_portal_app_sort_orders() audit_fn( request, 'portal_app_registry_saved', target_type='portal_app_registry', target_label='Portal App Registry', details={'updated_apps': len(rows)}, ) messages.success(request, _('App-Registry gespeichert.')) return redirect('portal_app_registry_page') def user_management_page_impl(request, *, render_user_management_fn): return render_user_management_fn(request) def portal_branding_page_impl(request, *, build_branding_sections_fn): branding, created = PortalBranding.objects.get_or_create(name='Default') form = PortalBrandingForm(instance=branding) return render( request, 'workflows/branding_settings.html', { 'form': form, 'branding': branding, 'branding_sections': build_branding_sections_fn(form, branding), 'editing_branding_section': '', }, ) def save_portal_branding_impl(request, *, audit_fn, build_branding_sections_fn): branding, created = PortalBranding.objects.get_or_create(name='Default') section_key = (request.POST.get('section_key') or '').strip() data = request.POST.copy() for field_name in PortalBrandingForm.Meta.fields: if field_name not in data: field = PortalBranding._meta.get_field(field_name) if getattr(field, 'many_to_many', False): continue if getattr(field, 'null', False) and getattr(branding, field_name, None) is None: data[field_name] = '' else: data[field_name] = getattr(branding, field_name, '') or '' form = PortalBrandingForm(data, request.FILES, instance=branding) if not form.is_valid(): messages.error(request, _('Branding konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/branding_settings.html', { 'form': form, 'branding': branding, 'branding_sections': build_branding_sections_fn(form, branding), 'editing_branding_section': section_key, }, status=400, ) branding = form.save() audit_fn( request, 'portal_branding_saved', target_type='portal_branding', target_id=branding.id, target_label=branding.portal_title, details={ 'company_name': branding.company_name, 'support_email': branding.support_email, 'default_language': branding.default_language, 'has_custom_logo': bool(branding.logo_image), 'has_custom_letterhead': bool(branding.pdf_letterhead), }, ) messages.success(request, _('Portal-Branding wurde gespeichert.')) return render( request, 'workflows/branding_settings.html', { 'form': PortalBrandingForm(instance=branding), 'branding': branding, 'branding_sections': build_branding_sections_fn(PortalBrandingForm(instance=branding), branding), 'editing_branding_section': '', }, ) def portal_company_config_page_impl(request, *, build_company_config_sections_fn): company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default') form = PortalCompanyConfigForm(instance=company_config) return render( request, 'workflows/company_config.html', { 'form': form, 'company_config': company_config, 'company_config_sections': build_company_config_sections_fn(form, company_config), 'editing_company_section': '', }, ) def save_portal_company_config_impl(request, *, audit_fn, build_company_config_sections_fn): company_config, created = PortalCompanyConfig.objects.get_or_create(name='Default') section_key = (request.POST.get('section_key') or '').strip() data = request.POST.copy() for field_name in PortalCompanyConfigForm.Meta.fields: if field_name not in data: data[field_name] = getattr(company_config, field_name, '') or '' form = PortalCompanyConfigForm(data, instance=company_config) if not form.is_valid(): messages.error(request, _('Firmenkonfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/company_config.html', { 'form': form, 'company_config': company_config, 'company_config_sections': build_company_config_sections_fn(form, company_config), 'editing_company_section': section_key, }, status=400, ) company_config = form.save() audit_fn( request, 'portal_company_config_saved', target_type='portal_company_config', target_id=company_config.id, target_label=company_config.legal_company_name or 'Default', details={ 'website_url': company_config.website_url, 'imprint_url': company_config.imprint_url, 'privacy_url': company_config.privacy_url, 'hr_contact_email': company_config.hr_contact_email, 'it_contact_email': company_config.it_contact_email, 'operations_contact_email': company_config.operations_contact_email, }, ) messages.success(request, _('Firmenkonfiguration wurde gespeichert.')) return render( request, 'workflows/company_config.html', { 'form': PortalCompanyConfigForm(instance=company_config), 'company_config': company_config, 'company_config_sections': build_company_config_sections_fn(PortalCompanyConfigForm(instance=company_config), company_config), 'editing_company_section': '', }, ) def portal_trial_config_page_impl(request): trial_config = get_portal_trial_config() form = PortalTrialConfigForm(instance=trial_config) return render( request, 'workflows/trial_management.html', { 'form': form, 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, ) def save_portal_trial_config_impl(request, *, audit_fn): trial_config = get_portal_trial_config() form = PortalTrialConfigForm(request.POST, instance=trial_config) if not form.is_valid(): messages.error(request, _('Trial-Konfiguration konnte nicht gespeichert werden. Bitte prüfen Sie die Eingaben.')) return render( request, 'workflows/trial_management.html', { 'form': form, 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, status=400, ) trial_config = form.save() audit_fn( request, 'portal_trial_config_saved', target_type='portal_trial_config', target_id=trial_config.id, target_label='Default', details={ 'is_trial_mode': trial_config.is_trial_mode, 'trial_started_at': trial_config.trial_started_at.isoformat() if trial_config.trial_started_at else '', 'trial_expires_at': trial_config.trial_expires_at.isoformat() if trial_config.trial_expires_at else '', 'restrict_production_integrations': trial_config.restrict_production_integrations, 'auto_cleanup_enabled': trial_config.auto_cleanup_enabled, }, ) if trial_config.is_trial_mode and trial_config.trial_expires_at: remaining = trial_config.trial_expires_at - timezone.now() if remaining.total_seconds() <= 0: notify_user( user=request.user, title=_('Trial ist abgelaufen'), body=_('Der Trial-Zeitraum ist überschritten. Nicht-Platform-Owner werden jetzt blockiert.'), level=UserNotification.LEVEL_WARNING, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) elif remaining <= timedelta(days=7): notify_user( user=request.user, title=_('Trial läuft bald ab'), body=_('Der Trial endet am %(date)s.') % {'date': timezone.localtime(trial_config.trial_expires_at).strftime('%d.%m.%Y %H:%M')}, level=UserNotification.LEVEL_WARNING, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) elif not trial_config.is_trial_mode: notify_user( user=request.user, title=_('Trial-Modus deaktiviert'), body=_('Der Trial-Modus wurde ausgeschaltet.'), level=UserNotification.LEVEL_INFO, link_url='/admin-tools/trial/', event_key=UserProfile.NOTIFICATION_TRIAL_ALERTS, ) messages.success(request, _('Trial-Konfiguration wurde gespeichert.')) return render( request, 'workflows/trial_management.html', { 'form': PortalTrialConfigForm(instance=trial_config), 'trial_config': trial_config, 'trial_is_expired': is_trial_expired(), }, ) def create_user_from_admin_impl(request, *, render_user_management_fn, send_user_access_email_fn, audit_fn, display_user_name_fn): form = UserManagementCreateForm(request.POST, include_product_owner=(get_user_role_key(request.user) == ROLE_PLATFORM_OWNER)) if not form.is_valid(): messages.error(request, _('Benutzer konnte nicht erstellt werden. Bitte prüfen Sie die Eingaben.')) return render_user_management_fn(request, create_form=form, status_code=400) user = form.save() send_user_access_email_fn(request, user, invitation=True) audit_fn( request, 'user_created', target_type='user', target_id=user.id, target_label=display_user_name_fn(user), details={'username': user.username, 'role': get_user_role_key(user), 'invitation_sent': True}, ) messages.success(request, _('Benutzer wurde erstellt und eingeladen: %(username)s') % {'username': user.username}) return redirect('user_management_page') def update_user_from_admin_impl(request, user_id: int, *, would_remove_last_platform_owner_fn, would_remove_last_super_admin_fn, audit_fn, display_user_name_fn): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) role_key = (request.POST.get('role_key') or '').strip() is_active = request.POST.get('is_active') == 'on' new_password = (request.POST.get('new_password') or '').strip() if role_key not in ROLE_GROUP_NAMES: messages.error(request, _('Ungültige Rolle.')) return redirect('user_management_page') if role_key == ROLE_PLATFORM_OWNER and get_user_role_key(request.user) != ROLE_PLATFORM_OWNER: messages.error(request, _('Nur Platform Owner dürfen diese Rolle vergeben.')) return redirect('user_management_page') current_role = get_user_role_key(request.user) if target_user == request.user and current_role == ROLE_PLATFORM_OWNER and (role_key != ROLE_PLATFORM_OWNER or not is_active): messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst sperren oder herabstufen.')) return redirect('user_management_page') if target_user == request.user and current_role == ROLE_SUPER_ADMIN and (role_key != ROLE_SUPER_ADMIN or not is_active): messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst sperren oder herabstufen.')) return redirect('user_management_page') if would_remove_last_platform_owner_fn(target_user, new_role_key=role_key, new_is_active=is_active): messages.error(request, _('Der letzte aktive Platform Owner kann nicht deaktiviert oder herabgestuft werden.')) return redirect('user_management_page') if would_remove_last_super_admin_fn(target_user, new_role_key=role_key, new_is_active=is_active): messages.error(request, _('Der letzte aktive Super Admin kann nicht deaktiviert oder herabgestuft werden.')) return redirect('user_management_page') assign_user_role(target_user, role_key) target_user.is_active = is_active if new_password: target_user.set_password(new_password) target_user.save() audit_fn( request, 'user_updated', target_type='user', target_id=target_user.id, target_label=display_user_name_fn(target_user), details={'username': target_user.username, 'role': role_key, 'is_active': is_active, 'password_changed': bool(new_password)}, ) messages.success(request, _('Benutzer wurde aktualisiert: %(username)s') % {'username': target_user.username}) return redirect('user_management_page') def send_password_reset_from_admin_impl(request, user_id: int, *, send_user_access_email_fn, audit_fn, display_user_name_fn): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) try: send_user_access_email_fn(request, target_user, invitation=False) except ValueError as exc: messages.error(request, str(exc)) return redirect('user_management_page') audit_fn( request, 'user_password_reset_sent', target_type='user', target_id=target_user.id, target_label=display_user_name_fn(target_user), details={'username': target_user.username, 'email': target_user.email}, ) messages.success(request, _('Passwort-Reset-Link wurde versendet: %(username)s') % {'username': target_user.username}) return redirect('user_management_page') def delete_user_from_admin_impl(request, user_id: int, *, would_remove_last_platform_owner_fn, would_remove_last_super_admin_fn, audit_fn, display_user_name_fn): user_model = get_user_model() target_user = get_object_or_404(user_model, id=user_id) current_role = get_user_role_key(request.user) if target_user == request.user and current_role == ROLE_PLATFORM_OWNER: messages.error(request, _('Der aktuell angemeldete Platform Owner kann sich hier nicht selbst löschen.')) return redirect('user_management_page') if target_user == request.user: messages.error(request, _('Der aktuell angemeldete Super Admin kann sich hier nicht selbst löschen.')) return redirect('user_management_page') if would_remove_last_platform_owner_fn(target_user, deleting=True): messages.error(request, _('Der letzte aktive Platform Owner kann nicht gelöscht werden.')) return redirect('user_management_page') if would_remove_last_super_admin_fn(target_user, deleting=True): messages.error(request, _('Der letzte aktive Super Admin kann nicht gelöscht werden.')) return redirect('user_management_page') target_label = display_user_name_fn(target_user) username = target_user.username target_user.delete() audit_fn( request, 'user_deleted', target_type='user', target_label=target_label, details={'username': username}, ) messages.success(request, _('Benutzer wurde gelöscht: %(username)s') % {'username': username}) return redirect('user_management_page') def handbook_page_impl(request): return render(request, 'workflows/handbook.html') def project_wiki_page_impl(request): return render(request, 'workflows/project_wiki.html') def developer_handbook_page_impl(request): return render(request, 'workflows/developer_handbook.html') def deployment_hosts_page_impl(request): return render(request, 'workflows/deployment_hosts.html') def release_checklist_page_impl(request): return render(request, 'workflows/release_checklist.html')