Files
workdock-platform/backend/workflows/middleware.py
2026-03-27 01:11:29 +01:00

202 lines
7.4 KiB
Python

import uuid
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import logout
from django.contrib.messages.api import MessageFailure
from django.core.cache import cache
from django.http import HttpResponse
from django.shortcuts import redirect, render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from .branding import is_trial_expired, is_trial_mode_enabled
from .logging_utils import clear_request_id, set_request_id
from .roles import ROLE_PLATFORM_OWNER, get_user_role_key
class RequestIDMiddleware:
HEADER_NAME = 'X-Request-ID'
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
request_id = (
request.META.get('HTTP_X_REQUEST_ID')
or request.META.get('HTTP_X_CORRELATION_ID')
or uuid.uuid4().hex
)
request.request_id = request_id
set_request_id(request_id)
try:
response = self.get_response(request)
finally:
clear_request_id()
response[self.HEADER_NAME] = request_id
return response
class RateLimitMiddleware:
LOGIN_PATHS = ('/accounts/login/',)
PASSWORD_RESET_PATHS = ('/accounts/password_reset/',)
# Keep this list path-prefix based so new platform actions get protected
# without having to wire every single view into a second permission layer.
ADMIN_SENSITIVE_PREFIXES = (
'/admin-tools/nextcloud/toggle/',
'/admin-tools/email-mode/toggle/',
'/admin-tools/integrations/save',
'/admin-tools/welcome-emails/',
'/admin-tools/branding/save/',
'/admin-tools/company/save/',
'/admin-tools/trial/save/',
'/admin-tools/apps/save/',
'/admin-tools/users/',
'/admin-tools/backups/',
'/requests/delete/',
'/requests/retry/',
)
def __init__(self, get_response):
self.get_response = get_response
def _client_identifier(self, request) -> str:
user = getattr(request, 'user', None)
if getattr(user, 'is_authenticated', False):
return f'user:{user.pk}'
forwarded = (request.META.get('HTTP_X_FORWARDED_FOR') or '').split(',')[0].strip()
remote = forwarded or request.META.get('REMOTE_ADDR') or 'unknown'
return f'ip:{remote}'
def _match_rule(self, path: str):
if any(path.startswith(prefix) for prefix in self.LOGIN_PATHS):
return ('login', settings.RATE_LIMIT_LOGIN_LIMIT, settings.RATE_LIMIT_LOGIN_WINDOW)
if any(path.startswith(prefix) for prefix in self.PASSWORD_RESET_PATHS):
return ('password_reset', settings.RATE_LIMIT_PASSWORD_RESET_LIMIT, settings.RATE_LIMIT_PASSWORD_RESET_WINDOW)
if any(path.startswith(prefix) for prefix in self.ADMIN_SENSITIVE_PREFIXES):
return ('admin_post', settings.RATE_LIMIT_ADMIN_ACTION_LIMIT, settings.RATE_LIMIT_ADMIN_ACTION_WINDOW)
return None
def __call__(self, request):
if not settings.RATE_LIMIT_ENABLED or request.method != 'POST':
return self.get_response(request)
rule = self._match_rule(request.path or '/')
if not rule:
return self.get_response(request)
scope, limit, window = rule
identifier = self._client_identifier(request)
cache_key = f'ratelimit:{scope}:{identifier}'
added = cache.add(cache_key, 1, timeout=window)
current = 1 if added else cache.incr(cache_key)
if current > limit:
response = HttpResponse(
_('Zu viele Anfragen. Bitte versuchen Sie es in wenigen Minuten erneut.'),
status=429,
content_type='text/plain; charset=utf-8',
)
retry_after = cache.ttl(cache_key) if hasattr(cache, 'ttl') else window
response['Retry-After'] = str(max(1, retry_after))
return response
return self.get_response(request)
class AuthSessionHardeningMiddleware:
EXEMPT_PREFIXES = (
'/healthz/',
'/i18n/',
'/accounts/login/',
'/accounts/logout/',
'/accounts/password_reset/',
'/accounts/reset/',
'/static/',
'/media/',
)
SENSITIVE_POST_PREFIXES = (
'/admin-tools/users/',
'/admin-tools/backups/',
'/admin-tools/trial/save/',
'/admin-tools/apps/save/',
'/admin-tools/branding/save/',
'/admin-tools/company/save/',
'/admin-tools/integrations/save',
'/requests/delete/',
)
def __init__(self, get_response):
self.get_response = get_response
def _is_exempt(self, path: str) -> bool:
return any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES)
def _touch_session(self, request, now_ts: int) -> None:
request.session['last_activity_ts'] = now_ts
request.session.setdefault('auth_fresh_ts', now_ts)
def _warn(self, request, message: str) -> None:
try:
messages.warning(request, message)
except MessageFailure:
return
def __call__(self, request):
path = request.path or '/'
user = getattr(request, 'user', None)
if not getattr(user, 'is_authenticated', False) or self._is_exempt(path):
return self.get_response(request)
now_ts = int(timezone.now().timestamp())
idle_timeout = max(60, settings.SESSION_IDLE_TIMEOUT_SECONDS)
last_activity_ts = int(request.session.get('last_activity_ts') or now_ts)
if now_ts - last_activity_ts > idle_timeout:
logout(request)
self._warn(request, _('Ihre Sitzung ist wegen Inaktivität abgelaufen. Bitte melden Sie sich erneut an.'))
login_url = reverse('login')
return redirect(f'{login_url}?next={request.get_full_path()}')
if request.method == 'POST' and any(path.startswith(prefix) for prefix in self.SENSITIVE_POST_PREFIXES):
fresh_window = max(60, settings.SENSITIVE_ACTION_REAUTH_SECONDS)
auth_fresh_ts = int(request.session.get('auth_fresh_ts') or last_activity_ts)
if now_ts - auth_fresh_ts > fresh_window:
logout(request)
self._warn(request, _('Bitte bestätigen Sie Ihre Identität erneut, bevor Sie diese sensible Aktion ausführen.'))
login_url = reverse('login')
return redirect(f'{login_url}?next={request.get_full_path()}')
response = self.get_response(request)
self._touch_session(request, now_ts)
return response
class TrialModeMiddleware:
EXEMPT_PREFIXES = (
'/healthz/',
'/i18n/',
'/accounts/login/',
'/accounts/logout/',
'/accounts/password_reset/',
'/accounts/reset/',
'/static/',
'/media/',
)
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if not is_trial_mode_enabled() or not is_trial_expired():
return self.get_response(request)
path = request.path or '/'
if any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES):
return self.get_response(request)
user = getattr(request, 'user', None)
if getattr(user, 'is_authenticated', False) and get_user_role_key(user) == ROLE_PLATFORM_OWNER:
return self.get_response(request)
return render(request, 'workflows/trial_expired.html', status=403)