from pathlib import Path import logging import time import requests from django.conf import settings from .branding import should_restrict_trial_integrations from .models import WorkflowConfig logger = logging.getLogger(__name__) def _active_workflow_config() -> WorkflowConfig | None: return WorkflowConfig.objects.filter(name='Default').order_by('-id').first() or WorkflowConfig.objects.order_by('id').first() def is_nextcloud_enabled() -> bool: if should_restrict_trial_integrations(): return False config = _active_workflow_config() if config and config.nextcloud_enabled_override is not None: return bool(config.nextcloud_enabled_override) return bool(settings.NEXTCLOUD_ENABLED) def is_email_test_mode() -> bool: if should_restrict_trial_integrations(): return True config = _active_workflow_config() if config and config.email_test_mode_override is not None: return bool(config.email_test_mode_override) return bool(settings.EMAIL_TEST_MODE) def get_email_test_redirect() -> str: return settings.EMAIL_TEST_REDIRECT def get_nextcloud_settings() -> dict[str, str]: config = _active_workflow_config() base_url = ( config.nextcloud_base_url_override.strip() if config and config.nextcloud_base_url_override.strip() else settings.NEXTCLOUD_BASE_URL ) username = ( config.nextcloud_username_override.strip() if config and config.nextcloud_username_override.strip() else settings.NEXTCLOUD_USERNAME ) password = ( config.nextcloud_password_override if config and config.nextcloud_password_override else settings.NEXTCLOUD_PASSWORD ) directory = ( config.nextcloud_directory_override.strip() if config and config.nextcloud_directory_override.strip() else settings.NEXTCLOUD_DIRECTORY ) return { 'base_url': (base_url or '').rstrip('/'), 'username': username or '', 'password': password or '', 'directory': (directory or '').strip('/'), } def _nextcloud_remote_url(base_url: str, directory: str, remote_path: str) -> str: cleaned_parts = [part.strip('/') for part in [directory, remote_path] if part and part.strip('/')] return f"{base_url}/{'/'.join(cleaned_parts)}" def _ensure_nextcloud_directory(base_url: str, directory: str, auth: tuple[str, str], timeout: int) -> bool: if not directory: return False current_parts: list[str] = [] for part in [p for p in directory.split('/') if p]: current_parts.append(part) try: response = requests.request( 'MKCOL', f"{base_url}/{'/'.join(current_parts)}", auth=auth, timeout=timeout, ) except requests.RequestException as exc: logger.warning('Nextcloud directory ensure error for %s: %s', '/'.join(current_parts), exc) return False if response.status_code in (201, 301, 405): continue logger.warning('Nextcloud directory ensure failed with status %s for %s', response.status_code, '/'.join(current_parts)) return False return True def upload_to_nextcloud(local_file: Path, remote_filename: str, *, directory_override: str | None = None, require_enabled: bool = True) -> bool: if require_enabled and not is_nextcloud_enabled(): return False nc = get_nextcloud_settings() base_url = nc['base_url'] directory_source = nc['directory'] if directory_override is None else directory_override directory = (directory_source or '').strip('/') if not base_url or not directory: return False safe_remote_name = Path(remote_filename).name remote_url = _nextcloud_remote_url(base_url, directory, safe_remote_name) retries = max(0, int(getattr(settings, 'NEXTCLOUD_UPLOAD_RETRIES', 2))) timeout = max(5, int(getattr(settings, 'NEXTCLOUD_UPLOAD_TIMEOUT_SECONDS', 30))) auth = (nc['username'], nc['password']) if not _ensure_nextcloud_directory(base_url, directory, auth, timeout): return False for attempt in range(retries + 1): try: with local_file.open('rb') as handle: response = requests.put( remote_url, data=handle, auth=auth, timeout=timeout, ) if response.status_code in (200, 201, 204): return True logger.warning( 'Nextcloud upload failed with status %s (attempt %s/%s) for %s', response.status_code, attempt + 1, retries + 1, safe_remote_name, ) except requests.RequestException as exc: logger.warning( 'Nextcloud upload error (attempt %s/%s) for %s: %s', attempt + 1, retries + 1, safe_remote_name, exc, ) if attempt < retries: time.sleep(0.6 * (attempt + 1)) return False def delete_from_nextcloud(remote_path: str, *, directory_override: str | None = None) -> bool: nc = get_nextcloud_settings() base_url = nc['base_url'] directory_source = nc['directory'] if directory_override is None else directory_override directory = (directory_source or '').strip('/') if not base_url: return False if directory_override is None and not directory: return False timeout = max(5, int(getattr(settings, 'NEXTCLOUD_UPLOAD_TIMEOUT_SECONDS', 30))) response = requests.delete( _nextcloud_remote_url(base_url, directory, remote_path), auth=(nc['username'], nc['password']), timeout=timeout, ) return response.status_code in (200, 204, 404)