Files
workdock-platform/backend/workflows/services.py
2026-03-26 14:43:10 +01:00

161 lines
5.5 KiB
Python

from pathlib import Path
import logging
import time
import requests
from django.conf import settings
from .branding import should_restrict_trial_integrations
from .models import WorkflowConfig
logger = logging.getLogger(__name__)
def _active_workflow_config() -> WorkflowConfig | None:
return WorkflowConfig.objects.filter(name='Default').order_by('-id').first() or WorkflowConfig.objects.order_by('id').first()
def is_nextcloud_enabled() -> bool:
if should_restrict_trial_integrations():
return False
config = _active_workflow_config()
if config and config.nextcloud_enabled_override is not None:
return bool(config.nextcloud_enabled_override)
return bool(settings.NEXTCLOUD_ENABLED)
def is_email_test_mode() -> bool:
if should_restrict_trial_integrations():
return True
config = _active_workflow_config()
if config and config.email_test_mode_override is not None:
return bool(config.email_test_mode_override)
return bool(settings.EMAIL_TEST_MODE)
def get_email_test_redirect() -> str:
return settings.EMAIL_TEST_REDIRECT
def get_nextcloud_settings() -> dict[str, str]:
config = _active_workflow_config()
base_url = (
config.nextcloud_base_url_override.strip()
if config and config.nextcloud_base_url_override.strip()
else settings.NEXTCLOUD_BASE_URL
)
username = (
config.nextcloud_username_override.strip()
if config and config.nextcloud_username_override.strip()
else settings.NEXTCLOUD_USERNAME
)
password = (
config.nextcloud_password_override
if config and config.nextcloud_password_override
else settings.NEXTCLOUD_PASSWORD
)
directory = (
config.nextcloud_directory_override.strip()
if config and config.nextcloud_directory_override.strip()
else settings.NEXTCLOUD_DIRECTORY
)
return {
'base_url': (base_url or '').rstrip('/'),
'username': username or '',
'password': password or '',
'directory': (directory or '').strip('/'),
}
def _nextcloud_remote_url(base_url: str, directory: str, remote_path: str) -> str:
cleaned_parts = [part.strip('/') for part in [directory, remote_path] if part and part.strip('/')]
return f"{base_url}/{'/'.join(cleaned_parts)}"
def _ensure_nextcloud_directory(base_url: str, directory: str, auth: tuple[str, str], timeout: int) -> bool:
if not directory:
return False
current_parts: list[str] = []
for part in [p for p in directory.split('/') if p]:
current_parts.append(part)
response = requests.request(
'MKCOL',
f"{base_url}/{'/'.join(current_parts)}",
auth=auth,
timeout=timeout,
)
if response.status_code in (201, 301, 405):
continue
logger.warning('Nextcloud directory ensure failed with status %s for %s', response.status_code, '/'.join(current_parts))
return False
return True
def upload_to_nextcloud(local_file: Path, remote_filename: str, *, directory_override: str | None = None, require_enabled: bool = True) -> bool:
if require_enabled and not is_nextcloud_enabled():
return False
nc = get_nextcloud_settings()
base_url = nc['base_url']
directory_source = nc['directory'] if directory_override is None else directory_override
directory = (directory_source or '').strip('/')
if not base_url or not directory:
return False
safe_remote_name = Path(remote_filename).name
remote_url = _nextcloud_remote_url(base_url, directory, safe_remote_name)
retries = max(0, int(getattr(settings, 'NEXTCLOUD_UPLOAD_RETRIES', 2)))
timeout = max(5, int(getattr(settings, 'NEXTCLOUD_UPLOAD_TIMEOUT_SECONDS', 30)))
auth = (nc['username'], nc['password'])
if not _ensure_nextcloud_directory(base_url, directory, auth, timeout):
return False
for attempt in range(retries + 1):
try:
with local_file.open('rb') as handle:
response = requests.put(
remote_url,
data=handle,
auth=auth,
timeout=timeout,
)
if response.status_code in (200, 201, 204):
return True
logger.warning(
'Nextcloud upload failed with status %s (attempt %s/%s) for %s',
response.status_code,
attempt + 1,
retries + 1,
safe_remote_name,
)
except requests.RequestException as exc:
logger.warning(
'Nextcloud upload error (attempt %s/%s) for %s: %s',
attempt + 1,
retries + 1,
safe_remote_name,
exc,
)
if attempt < retries:
time.sleep(0.6 * (attempt + 1))
return False
def delete_from_nextcloud(remote_path: str, *, directory_override: str | None = None) -> bool:
nc = get_nextcloud_settings()
base_url = nc['base_url']
directory_source = nc['directory'] if directory_override is None else directory_override
directory = (directory_source or '').strip('/')
if not base_url:
return False
if directory_override is None and not directory:
return False
timeout = max(5, int(getattr(settings, 'NEXTCLOUD_UPLOAD_TIMEOUT_SECONDS', 30)))
response = requests.delete(
_nextcloud_remote_url(base_url, directory, remote_path),
auth=(nc['username'], nc['password']),
timeout=timeout,
)
return response.status_code in (200, 204, 404)