228 lines
8.4 KiB
Python
228 lines
8.4 KiB
Python
import uuid
|
|
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth import logout
|
|
from django.contrib.messages.api import MessageFailure
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import PermissionDenied, SuspiciousOperation
|
|
from django.http import Http404, HttpResponse
|
|
from django.shortcuts import redirect, render
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext as _
|
|
|
|
from .branding import is_trial_expired, is_trial_mode_enabled
|
|
from .error_views import bad_request, not_found, permission_denied, server_error
|
|
from .logging_utils import clear_request_id, set_request_id
|
|
from .roles import ROLE_PLATFORM_OWNER, get_user_role_key
|
|
|
|
|
|
class FriendlyExceptionMiddleware:
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request):
|
|
if not settings.FORCE_BRANDED_ERROR_PAGES:
|
|
return self.get_response(request)
|
|
|
|
try:
|
|
return self.get_response(request)
|
|
except Http404 as exc:
|
|
return not_found(request, exc)
|
|
except PermissionDenied as exc:
|
|
return permission_denied(request, exc)
|
|
except SuspiciousOperation as exc:
|
|
return bad_request(request, exc)
|
|
|
|
|
|
class RequestIDMiddleware:
|
|
HEADER_NAME = 'X-Request-ID'
|
|
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request):
|
|
request_id = (
|
|
request.META.get('HTTP_X_REQUEST_ID')
|
|
or request.META.get('HTTP_X_CORRELATION_ID')
|
|
or uuid.uuid4().hex
|
|
)
|
|
request.request_id = request_id
|
|
set_request_id(request_id)
|
|
try:
|
|
response = self.get_response(request)
|
|
finally:
|
|
clear_request_id()
|
|
response[self.HEADER_NAME] = request_id
|
|
return response
|
|
|
|
|
|
class RateLimitMiddleware:
|
|
LOGIN_PATHS = ('/accounts/login/', '/accounts/login/totp/')
|
|
PASSWORD_RESET_PATHS = ('/accounts/password_reset/',)
|
|
# Keep this list path-prefix based so new platform actions get protected
|
|
# without having to wire every single view into a second permission layer.
|
|
ADMIN_SENSITIVE_PREFIXES = (
|
|
'/admin-tools/nextcloud/toggle/',
|
|
'/admin-tools/email-mode/toggle/',
|
|
'/admin-tools/integrations/save',
|
|
'/admin-tools/welcome-emails/',
|
|
'/admin-tools/branding/save/',
|
|
'/admin-tools/company/save/',
|
|
'/admin-tools/trial/save/',
|
|
'/admin-tools/apps/save/',
|
|
'/admin-tools/users/',
|
|
'/admin-tools/backups/',
|
|
'/requests/delete/',
|
|
'/requests/retry/',
|
|
)
|
|
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def _client_identifier(self, request) -> str:
|
|
user = getattr(request, 'user', None)
|
|
if getattr(user, 'is_authenticated', False):
|
|
return f'user:{user.pk}'
|
|
forwarded = (request.META.get('HTTP_X_FORWARDED_FOR') or '').split(',')[0].strip()
|
|
remote = forwarded or request.META.get('REMOTE_ADDR') or 'unknown'
|
|
return f'ip:{remote}'
|
|
|
|
def _match_rule(self, path: str):
|
|
if any(path.startswith(prefix) for prefix in self.LOGIN_PATHS):
|
|
return ('login', settings.RATE_LIMIT_LOGIN_LIMIT, settings.RATE_LIMIT_LOGIN_WINDOW)
|
|
if any(path.startswith(prefix) for prefix in self.PASSWORD_RESET_PATHS):
|
|
return ('password_reset', settings.RATE_LIMIT_PASSWORD_RESET_LIMIT, settings.RATE_LIMIT_PASSWORD_RESET_WINDOW)
|
|
if any(path.startswith(prefix) for prefix in self.ADMIN_SENSITIVE_PREFIXES):
|
|
return ('admin_post', settings.RATE_LIMIT_ADMIN_ACTION_LIMIT, settings.RATE_LIMIT_ADMIN_ACTION_WINDOW)
|
|
return None
|
|
|
|
def __call__(self, request):
|
|
if not settings.RATE_LIMIT_ENABLED or request.method != 'POST':
|
|
return self.get_response(request)
|
|
|
|
rule = self._match_rule(request.path or '/')
|
|
if not rule:
|
|
return self.get_response(request)
|
|
|
|
scope, limit, window = rule
|
|
identifier = self._client_identifier(request)
|
|
cache_key = f'ratelimit:{scope}:{identifier}'
|
|
added = cache.add(cache_key, 1, timeout=window)
|
|
current = 1 if added else cache.incr(cache_key)
|
|
if current > limit:
|
|
response = HttpResponse(
|
|
_('Zu viele Anfragen. Bitte versuchen Sie es in wenigen Minuten erneut.'),
|
|
status=429,
|
|
content_type='text/plain; charset=utf-8',
|
|
)
|
|
retry_after = cache.ttl(cache_key) if hasattr(cache, 'ttl') else window
|
|
response['Retry-After'] = str(max(1, retry_after))
|
|
return response
|
|
|
|
return self.get_response(request)
|
|
|
|
|
|
class AuthSessionHardeningMiddleware:
|
|
EXEMPT_PREFIXES = (
|
|
'/healthz/',
|
|
'/i18n/',
|
|
'/accounts/login/',
|
|
'/accounts/logout/',
|
|
'/accounts/password_reset/',
|
|
'/accounts/reset/',
|
|
'/static/',
|
|
'/media/',
|
|
)
|
|
SENSITIVE_POST_PREFIXES = (
|
|
'/admin-tools/users/',
|
|
'/admin-tools/backups/',
|
|
'/admin-tools/trial/save/',
|
|
'/admin-tools/apps/save/',
|
|
'/admin-tools/branding/save/',
|
|
'/admin-tools/company/save/',
|
|
'/admin-tools/integrations/save',
|
|
'/requests/delete/',
|
|
)
|
|
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def _is_exempt(self, path: str) -> bool:
|
|
return any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES)
|
|
|
|
def _touch_session(self, request, now_ts: int) -> None:
|
|
request.session['last_activity_ts'] = now_ts
|
|
request.session.setdefault('auth_fresh_ts', now_ts)
|
|
|
|
def _warn(self, request, message: str) -> None:
|
|
try:
|
|
messages.warning(request, message)
|
|
except MessageFailure:
|
|
return
|
|
|
|
def __call__(self, request):
|
|
path = request.path or '/'
|
|
user = getattr(request, 'user', None)
|
|
if not getattr(user, 'is_authenticated', False) or self._is_exempt(path):
|
|
return self.get_response(request)
|
|
|
|
now_ts = int(timezone.now().timestamp())
|
|
idle_timeout = max(60, settings.SESSION_IDLE_TIMEOUT_SECONDS)
|
|
last_activity_ts = int(request.session.get('last_activity_ts') or now_ts)
|
|
if now_ts - last_activity_ts > idle_timeout:
|
|
logout(request)
|
|
self._warn(request, _('Ihre Sitzung ist wegen Inaktivität abgelaufen. Bitte melden Sie sich erneut an.'))
|
|
login_url = reverse('login')
|
|
return redirect(f'{login_url}?next={request.get_full_path()}')
|
|
|
|
is_sensitive_post = request.method == 'POST' and any(path.startswith(prefix) for prefix in self.SENSITIVE_POST_PREFIXES)
|
|
if request.method == 'POST' and path == '/account/':
|
|
account_form = (request.POST.get('account_form') or '').strip()
|
|
if account_form in {'totp_disable', 'totp_regenerate_codes'}:
|
|
is_sensitive_post = True
|
|
|
|
if is_sensitive_post:
|
|
fresh_window = max(60, settings.SENSITIVE_ACTION_REAUTH_SECONDS)
|
|
auth_fresh_ts = int(request.session.get('auth_fresh_ts') or last_activity_ts)
|
|
if now_ts - auth_fresh_ts > fresh_window:
|
|
logout(request)
|
|
self._warn(request, _('Bitte bestätigen Sie Ihre Identität erneut, bevor Sie diese sensible Aktion ausführen.'))
|
|
login_url = reverse('login')
|
|
return redirect(f'{login_url}?next={request.get_full_path()}')
|
|
|
|
response = self.get_response(request)
|
|
self._touch_session(request, now_ts)
|
|
return response
|
|
|
|
|
|
class TrialModeMiddleware:
|
|
EXEMPT_PREFIXES = (
|
|
'/healthz/',
|
|
'/i18n/',
|
|
'/accounts/login/',
|
|
'/accounts/logout/',
|
|
'/accounts/password_reset/',
|
|
'/accounts/reset/',
|
|
'/static/',
|
|
'/media/',
|
|
)
|
|
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request):
|
|
if not is_trial_mode_enabled() or not is_trial_expired():
|
|
return self.get_response(request)
|
|
|
|
path = request.path or '/'
|
|
if any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES):
|
|
return self.get_response(request)
|
|
|
|
user = getattr(request, 'user', None)
|
|
if getattr(user, 'is_authenticated', False) and get_user_role_key(user) == ROLE_PLATFORM_OWNER:
|
|
return self.get_response(request)
|
|
|
|
return render(request, 'workflows/trial_expired.html', status=403)
|