import uuid from django.conf import settings from django.contrib import messages from django.contrib.auth import logout from django.contrib.messages.api import MessageFailure from django.core.cache import cache from django.http import HttpResponse from django.shortcuts import redirect, render from django.urls import reverse from django.utils import timezone from django.utils.translation import gettext as _ from .branding import is_trial_expired, is_trial_mode_enabled from .logging_utils import clear_request_id, set_request_id from .roles import ROLE_PLATFORM_OWNER, get_user_role_key class RequestIDMiddleware: HEADER_NAME = 'X-Request-ID' def __init__(self, get_response): self.get_response = get_response def __call__(self, request): request_id = ( request.META.get('HTTP_X_REQUEST_ID') or request.META.get('HTTP_X_CORRELATION_ID') or uuid.uuid4().hex ) request.request_id = request_id set_request_id(request_id) try: response = self.get_response(request) finally: clear_request_id() response[self.HEADER_NAME] = request_id return response class RateLimitMiddleware: LOGIN_PATHS = ('/accounts/login/', '/accounts/login/totp/') PASSWORD_RESET_PATHS = ('/accounts/password_reset/',) # Keep this list path-prefix based so new platform actions get protected # without having to wire every single view into a second permission layer. ADMIN_SENSITIVE_PREFIXES = ( '/admin-tools/nextcloud/toggle/', '/admin-tools/email-mode/toggle/', '/admin-tools/integrations/save', '/admin-tools/welcome-emails/', '/admin-tools/branding/save/', '/admin-tools/company/save/', '/admin-tools/trial/save/', '/admin-tools/apps/save/', '/admin-tools/users/', '/admin-tools/backups/', '/requests/delete/', '/requests/retry/', ) def __init__(self, get_response): self.get_response = get_response def _client_identifier(self, request) -> str: user = getattr(request, 'user', None) if getattr(user, 'is_authenticated', False): return f'user:{user.pk}' forwarded = (request.META.get('HTTP_X_FORWARDED_FOR') or '').split(',')[0].strip() remote = forwarded or request.META.get('REMOTE_ADDR') or 'unknown' return f'ip:{remote}' def _match_rule(self, path: str): if any(path.startswith(prefix) for prefix in self.LOGIN_PATHS): return ('login', settings.RATE_LIMIT_LOGIN_LIMIT, settings.RATE_LIMIT_LOGIN_WINDOW) if any(path.startswith(prefix) for prefix in self.PASSWORD_RESET_PATHS): return ('password_reset', settings.RATE_LIMIT_PASSWORD_RESET_LIMIT, settings.RATE_LIMIT_PASSWORD_RESET_WINDOW) if any(path.startswith(prefix) for prefix in self.ADMIN_SENSITIVE_PREFIXES): return ('admin_post', settings.RATE_LIMIT_ADMIN_ACTION_LIMIT, settings.RATE_LIMIT_ADMIN_ACTION_WINDOW) return None def __call__(self, request): if not settings.RATE_LIMIT_ENABLED or request.method != 'POST': return self.get_response(request) rule = self._match_rule(request.path or '/') if not rule: return self.get_response(request) scope, limit, window = rule identifier = self._client_identifier(request) cache_key = f'ratelimit:{scope}:{identifier}' added = cache.add(cache_key, 1, timeout=window) current = 1 if added else cache.incr(cache_key) if current > limit: response = HttpResponse( _('Zu viele Anfragen. Bitte versuchen Sie es in wenigen Minuten erneut.'), status=429, content_type='text/plain; charset=utf-8', ) retry_after = cache.ttl(cache_key) if hasattr(cache, 'ttl') else window response['Retry-After'] = str(max(1, retry_after)) return response return self.get_response(request) class AuthSessionHardeningMiddleware: EXEMPT_PREFIXES = ( '/healthz/', '/i18n/', '/accounts/login/', '/accounts/logout/', '/accounts/password_reset/', '/accounts/reset/', '/static/', '/media/', ) SENSITIVE_POST_PREFIXES = ( '/admin-tools/users/', '/admin-tools/backups/', '/admin-tools/trial/save/', '/admin-tools/apps/save/', '/admin-tools/branding/save/', '/admin-tools/company/save/', '/admin-tools/integrations/save', '/requests/delete/', ) def __init__(self, get_response): self.get_response = get_response def _is_exempt(self, path: str) -> bool: return any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES) def _touch_session(self, request, now_ts: int) -> None: request.session['last_activity_ts'] = now_ts request.session.setdefault('auth_fresh_ts', now_ts) def _warn(self, request, message: str) -> None: try: messages.warning(request, message) except MessageFailure: return def __call__(self, request): path = request.path or '/' user = getattr(request, 'user', None) if not getattr(user, 'is_authenticated', False) or self._is_exempt(path): return self.get_response(request) now_ts = int(timezone.now().timestamp()) idle_timeout = max(60, settings.SESSION_IDLE_TIMEOUT_SECONDS) last_activity_ts = int(request.session.get('last_activity_ts') or now_ts) if now_ts - last_activity_ts > idle_timeout: logout(request) self._warn(request, _('Ihre Sitzung ist wegen Inaktivität abgelaufen. Bitte melden Sie sich erneut an.')) login_url = reverse('login') return redirect(f'{login_url}?next={request.get_full_path()}') is_sensitive_post = request.method == 'POST' and any(path.startswith(prefix) for prefix in self.SENSITIVE_POST_PREFIXES) if request.method == 'POST' and path == '/account/': account_form = (request.POST.get('account_form') or '').strip() if account_form in {'totp_disable', 'totp_regenerate_codes'}: is_sensitive_post = True if is_sensitive_post: fresh_window = max(60, settings.SENSITIVE_ACTION_REAUTH_SECONDS) auth_fresh_ts = int(request.session.get('auth_fresh_ts') or last_activity_ts) if now_ts - auth_fresh_ts > fresh_window: logout(request) self._warn(request, _('Bitte bestätigen Sie Ihre Identität erneut, bevor Sie diese sensible Aktion ausführen.')) login_url = reverse('login') return redirect(f'{login_url}?next={request.get_full_path()}') response = self.get_response(request) self._touch_session(request, now_ts) return response class TrialModeMiddleware: EXEMPT_PREFIXES = ( '/healthz/', '/i18n/', '/accounts/login/', '/accounts/logout/', '/accounts/password_reset/', '/accounts/reset/', '/static/', '/media/', ) def __init__(self, get_response): self.get_response = get_response def __call__(self, request): if not is_trial_mode_enabled() or not is_trial_expired(): return self.get_response(request) path = request.path or '/' if any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES): return self.get_response(request) user = getattr(request, 'user', None) if getattr(user, 'is_authenticated', False) and get_user_role_key(user) == ROLE_PLATFORM_OWNER: return self.get_response(request) return render(request, 'workflows/trial_expired.html', status=403)