snapshot: preserve backup UX, remote target setup, and docs updates
This commit is contained in:
@@ -10,15 +10,19 @@ from .models import WorkflowConfig
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _active_workflow_config() -> WorkflowConfig | None:
|
||||
return WorkflowConfig.objects.filter(name='Default').order_by('-id').first() or WorkflowConfig.objects.order_by('id').first()
|
||||
|
||||
|
||||
def is_nextcloud_enabled() -> bool:
|
||||
config = WorkflowConfig.objects.order_by('id').first()
|
||||
config = _active_workflow_config()
|
||||
if config and config.nextcloud_enabled_override is not None:
|
||||
return bool(config.nextcloud_enabled_override)
|
||||
return bool(settings.NEXTCLOUD_ENABLED)
|
||||
|
||||
|
||||
def is_email_test_mode() -> bool:
|
||||
config = WorkflowConfig.objects.order_by('id').first()
|
||||
config = _active_workflow_config()
|
||||
if config and config.email_test_mode_override is not None:
|
||||
return bool(config.email_test_mode_override)
|
||||
return bool(settings.EMAIL_TEST_MODE)
|
||||
@@ -29,7 +33,7 @@ def get_email_test_redirect() -> str:
|
||||
|
||||
|
||||
def get_nextcloud_settings() -> dict[str, str]:
|
||||
config = WorkflowConfig.objects.order_by('id').first()
|
||||
config = _active_workflow_config()
|
||||
base_url = (
|
||||
config.nextcloud_base_url_override.strip()
|
||||
if config and config.nextcloud_base_url_override.strip()
|
||||
@@ -58,20 +62,49 @@ def get_nextcloud_settings() -> dict[str, str]:
|
||||
}
|
||||
|
||||
|
||||
def upload_to_nextcloud(local_file: Path, remote_filename: str) -> bool:
|
||||
if not is_nextcloud_enabled():
|
||||
def _nextcloud_remote_url(base_url: str, directory: str, remote_path: str) -> str:
|
||||
cleaned_parts = [part.strip('/') for part in [directory, remote_path] if part and part.strip('/')]
|
||||
return f"{base_url}/{'/'.join(cleaned_parts)}"
|
||||
|
||||
|
||||
def _ensure_nextcloud_directory(base_url: str, directory: str, auth: tuple[str, str], timeout: int) -> bool:
|
||||
if not directory:
|
||||
return False
|
||||
current_parts: list[str] = []
|
||||
for part in [p for p in directory.split('/') if p]:
|
||||
current_parts.append(part)
|
||||
response = requests.request(
|
||||
'MKCOL',
|
||||
f"{base_url}/{'/'.join(current_parts)}",
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
)
|
||||
if response.status_code in (201, 301, 405):
|
||||
continue
|
||||
logger.warning('Nextcloud directory ensure failed with status %s for %s', response.status_code, '/'.join(current_parts))
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def upload_to_nextcloud(local_file: Path, remote_filename: str, *, directory_override: str | None = None, require_enabled: bool = True) -> bool:
|
||||
if require_enabled and not is_nextcloud_enabled():
|
||||
return False
|
||||
|
||||
nc = get_nextcloud_settings()
|
||||
base_url = nc['base_url']
|
||||
directory = nc['directory']
|
||||
directory_source = nc['directory'] if directory_override is None else directory_override
|
||||
directory = (directory_source or '').strip('/')
|
||||
if not base_url or not directory:
|
||||
return False
|
||||
|
||||
safe_remote_name = Path(remote_filename).name
|
||||
remote_url = f"{base_url}/{directory}/{safe_remote_name}"
|
||||
remote_url = _nextcloud_remote_url(base_url, directory, safe_remote_name)
|
||||
retries = max(0, int(getattr(settings, 'NEXTCLOUD_UPLOAD_RETRIES', 2)))
|
||||
timeout = max(5, int(getattr(settings, 'NEXTCLOUD_UPLOAD_TIMEOUT_SECONDS', 30)))
|
||||
auth = (nc['username'], nc['password'])
|
||||
|
||||
if not _ensure_nextcloud_directory(base_url, directory, auth, timeout):
|
||||
return False
|
||||
|
||||
for attempt in range(retries + 1):
|
||||
try:
|
||||
@@ -79,7 +112,7 @@ def upload_to_nextcloud(local_file: Path, remote_filename: str) -> bool:
|
||||
response = requests.put(
|
||||
remote_url,
|
||||
data=handle,
|
||||
auth=(nc['username'], nc['password']),
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
)
|
||||
if response.status_code in (200, 201, 204):
|
||||
@@ -102,3 +135,21 @@ def upload_to_nextcloud(local_file: Path, remote_filename: str) -> bool:
|
||||
if attempt < retries:
|
||||
time.sleep(0.6 * (attempt + 1))
|
||||
return False
|
||||
|
||||
|
||||
def delete_from_nextcloud(remote_path: str, *, directory_override: str | None = None) -> bool:
|
||||
nc = get_nextcloud_settings()
|
||||
base_url = nc['base_url']
|
||||
directory_source = nc['directory'] if directory_override is None else directory_override
|
||||
directory = (directory_source or '').strip('/')
|
||||
if not base_url:
|
||||
return False
|
||||
if directory_override is None and not directory:
|
||||
return False
|
||||
timeout = max(5, int(getattr(settings, 'NEXTCLOUD_UPLOAD_TIMEOUT_SECONDS', 30)))
|
||||
response = requests.delete(
|
||||
_nextcloud_remote_url(base_url, directory, remote_path),
|
||||
auth=(nc['username'], nc['password']),
|
||||
timeout=timeout,
|
||||
)
|
||||
return response.status_code in (200, 204, 404)
|
||||
|
||||
Reference in New Issue
Block a user