chore: initial snapshot of tubco people portal
This commit is contained in:
0
backend/workflows/management/commands/__init__.py
Normal file
0
backend/workflows/management/commands/__init__.py
Normal file
242
backend/workflows/management/commands/run_staging_e2e_check.py
Normal file
242
backend/workflows/management/commands/run_staging_e2e_check.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.utils import timezone
|
||||
|
||||
from workflows.models import EmployeeProfile, OffboardingRequest, OnboardingRequest
|
||||
from workflows.tasks import process_offboarding_request, process_onboarding_request
|
||||
|
||||
|
||||
@dataclass
|
||||
class CheckResult:
|
||||
name: str
|
||||
status: str
|
||||
detail: str
|
||||
|
||||
|
||||
def _mailhog_messages(api_url: str) -> list[dict]:
|
||||
response = requests.get(api_url, timeout=15)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
items = payload.get('items', [])
|
||||
if isinstance(items, list):
|
||||
return items
|
||||
return []
|
||||
|
||||
|
||||
def _extract_subject(msg: dict) -> str:
|
||||
headers = ((msg or {}).get('Content') or {}).get('Headers') or {}
|
||||
subject = headers.get('Subject') or ['']
|
||||
if isinstance(subject, list) and subject:
|
||||
return str(subject[0])
|
||||
return str(subject)
|
||||
|
||||
|
||||
def _nextcloud_file_exists(remote_filename: str) -> bool:
|
||||
if not settings.NEXTCLOUD_BASE_URL or not settings.NEXTCLOUD_DIRECTORY:
|
||||
return False
|
||||
|
||||
remote_url = f"{settings.NEXTCLOUD_BASE_URL}/{settings.NEXTCLOUD_DIRECTORY}/{remote_filename}"
|
||||
auth = (settings.NEXTCLOUD_USERNAME, settings.NEXTCLOUD_PASSWORD)
|
||||
|
||||
try:
|
||||
head = requests.head(remote_url, auth=auth, timeout=20, allow_redirects=True)
|
||||
if head.status_code in (200, 204):
|
||||
return True
|
||||
if head.status_code not in (405, 501):
|
||||
return False
|
||||
except requests.RequestException:
|
||||
return False
|
||||
|
||||
try:
|
||||
get_resp = requests.get(
|
||||
remote_url,
|
||||
auth=auth,
|
||||
timeout=20,
|
||||
headers={'Range': 'bytes=0-0'},
|
||||
stream=True,
|
||||
allow_redirects=True,
|
||||
)
|
||||
return get_resp.status_code in (200, 206)
|
||||
except requests.RequestException:
|
||||
return False
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = (
|
||||
'Run a real end-to-end staging verification: onboarding + offboarding processing, '
|
||||
'PDF generation, email evidence (optional MailHog), and Nextcloud verification.'
|
||||
)
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'--email-check',
|
||||
choices=['auto', 'mailhog', 'none'],
|
||||
default='auto',
|
||||
help='Email verification mode. auto tries MailHog if reachable.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--mailhog-api-url',
|
||||
default='http://mailhog:8025/api/v2/messages',
|
||||
help='MailHog API URL used when email-check is auto/mailhog.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--skip-nextcloud',
|
||||
action='store_true',
|
||||
help='Skip Nextcloud existence checks.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--cleanup',
|
||||
action='store_true',
|
||||
help='Delete generated E2E DB rows and PDFs after checks.',
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
run_id = timezone.now().strftime('%Y%m%d%H%M%S')
|
||||
employee_name = f'E2E Check {run_id}'
|
||||
work_email = f'e2e.{run_id}@tub.co'
|
||||
requester_email = 'e2e.requester@tub.co'
|
||||
|
||||
created_onboarding: OnboardingRequest | None = None
|
||||
created_offboarding: OffboardingRequest | None = None
|
||||
check_results: list[CheckResult] = []
|
||||
email_before: list[dict] = []
|
||||
email_after: list[dict] = []
|
||||
use_mailhog = False
|
||||
|
||||
email_mode = options['email_check']
|
||||
mailhog_api_url = options['mailhog_api_url']
|
||||
|
||||
if email_mode in ('auto', 'mailhog'):
|
||||
try:
|
||||
email_before = _mailhog_messages(mailhog_api_url)
|
||||
use_mailhog = True
|
||||
check_results.append(CheckResult('mailhog_access', 'PASS', f'MailHog reachable: {mailhog_api_url}'))
|
||||
except Exception as exc:
|
||||
if email_mode == 'mailhog':
|
||||
raise CommandError(f'MailHog is required but not reachable: {exc}')
|
||||
check_results.append(CheckResult('mailhog_access', 'WARN', f'MailHog not reachable, skipping email evidence: {exc}'))
|
||||
|
||||
try:
|
||||
created_onboarding = OnboardingRequest.objects.create(
|
||||
full_name=employee_name,
|
||||
gender='herr',
|
||||
job_title='QA Engineer',
|
||||
department='IT-Service',
|
||||
work_email=work_email,
|
||||
contract_start=timezone.localdate() + timedelta(days=7),
|
||||
employment_type='unbefristet',
|
||||
order_business_cards=True,
|
||||
business_card_name=employee_name,
|
||||
business_card_title='QA Engineer',
|
||||
business_card_email=work_email,
|
||||
business_card_phone='030 44720210',
|
||||
needed_devices='Laptop\nSchlüssel',
|
||||
needed_accesses='HR Works',
|
||||
needed_software='Nextcloud',
|
||||
needed_resources='Drucker Euref',
|
||||
group_mailboxes_required=False,
|
||||
onboarded_by_email=requester_email,
|
||||
agreement='accepted',
|
||||
additional_notes=f'E2E run {run_id}',
|
||||
)
|
||||
process_onboarding_request(created_onboarding.id)
|
||||
created_onboarding.refresh_from_db()
|
||||
|
||||
onboarding_pdf = Path(created_onboarding.generated_pdf_path)
|
||||
if created_onboarding.generated_pdf_path and onboarding_pdf.exists() and onboarding_pdf.stat().st_size > 0:
|
||||
check_results.append(CheckResult('onboarding_pdf', 'PASS', str(onboarding_pdf)))
|
||||
else:
|
||||
check_results.append(CheckResult('onboarding_pdf', 'FAIL', 'Onboarding PDF missing or empty'))
|
||||
|
||||
profile_exists = EmployeeProfile.objects.filter(work_email=work_email).exists()
|
||||
check_results.append(
|
||||
CheckResult('employee_profile', 'PASS' if profile_exists else 'FAIL', f'Profile for {work_email}')
|
||||
)
|
||||
|
||||
created_offboarding = OffboardingRequest.objects.create(
|
||||
full_name=employee_name,
|
||||
work_email=work_email,
|
||||
department='IT-Service',
|
||||
job_title='QA Engineer',
|
||||
last_working_day=timezone.localdate() + timedelta(days=30),
|
||||
notes=f'E2E run {run_id}',
|
||||
requested_by_email=requester_email,
|
||||
)
|
||||
process_offboarding_request(created_offboarding.id)
|
||||
created_offboarding.refresh_from_db()
|
||||
|
||||
offboarding_pdf = Path(created_offboarding.generated_pdf_path)
|
||||
if created_offboarding.generated_pdf_path and offboarding_pdf.exists() and offboarding_pdf.stat().st_size > 0:
|
||||
check_results.append(CheckResult('offboarding_pdf', 'PASS', str(offboarding_pdf)))
|
||||
else:
|
||||
check_results.append(CheckResult('offboarding_pdf', 'FAIL', 'Offboarding PDF missing or empty'))
|
||||
|
||||
if not options['skip_nextcloud']:
|
||||
if settings.NEXTCLOUD_ENABLED:
|
||||
on_name = onboarding_pdf.name if created_onboarding.generated_pdf_path else ''
|
||||
off_name = offboarding_pdf.name if created_offboarding.generated_pdf_path else ''
|
||||
on_ok = bool(on_name) and _nextcloud_file_exists(on_name)
|
||||
off_ok = bool(off_name) and _nextcloud_file_exists(off_name)
|
||||
check_results.append(
|
||||
CheckResult('nextcloud_onboarding_pdf', 'PASS' if on_ok else 'FAIL', on_name or 'no filename')
|
||||
)
|
||||
check_results.append(
|
||||
CheckResult('nextcloud_offboarding_pdf', 'PASS' if off_ok else 'FAIL', off_name or 'no filename')
|
||||
)
|
||||
else:
|
||||
check_results.append(CheckResult('nextcloud', 'WARN', 'NEXTCLOUD_ENABLED=0, skipped'))
|
||||
|
||||
if use_mailhog:
|
||||
email_after = _mailhog_messages(mailhog_api_url)
|
||||
before_size = len(email_before)
|
||||
new_msgs = email_after[before_size:] if len(email_after) >= before_size else email_after
|
||||
matching = [m for m in new_msgs if run_id in _extract_subject(m)]
|
||||
if len(matching) >= 8:
|
||||
check_results.append(
|
||||
CheckResult('email_evidence', 'PASS', f'Found {len(matching)} MailHog messages containing run id {run_id}')
|
||||
)
|
||||
else:
|
||||
check_results.append(
|
||||
CheckResult('email_evidence', 'FAIL', f'Expected >=8 matching messages, found {len(matching)}')
|
||||
)
|
||||
else:
|
||||
check_results.append(CheckResult('email_evidence', 'WARN', 'MailHog disabled/unavailable, not verified'))
|
||||
|
||||
finally:
|
||||
if options['cleanup']:
|
||||
for path in (
|
||||
Path(created_onboarding.generated_pdf_path) if created_onboarding and created_onboarding.generated_pdf_path else None,
|
||||
Path(created_offboarding.generated_pdf_path) if created_offboarding and created_offboarding.generated_pdf_path else None,
|
||||
):
|
||||
if path and path.exists():
|
||||
path.unlink(missing_ok=True)
|
||||
|
||||
if created_offboarding:
|
||||
created_offboarding.delete()
|
||||
if created_onboarding:
|
||||
created_onboarding.delete()
|
||||
EmployeeProfile.objects.filter(work_email=work_email).delete()
|
||||
|
||||
self.stdout.write('')
|
||||
self.stdout.write(self.style.NOTICE(f'E2E staging check run id: {run_id}'))
|
||||
for item in check_results:
|
||||
if item.status == 'PASS':
|
||||
style = self.style.SUCCESS
|
||||
elif item.status == 'WARN':
|
||||
style = self.style.WARNING
|
||||
else:
|
||||
style = self.style.ERROR
|
||||
self.stdout.write(style(f'[{item.status}] {item.name}: {item.detail}'))
|
||||
|
||||
failures = [r for r in check_results if r.status == 'FAIL']
|
||||
if failures:
|
||||
raise CommandError(f'E2E staging check failed with {len(failures)} failing check(s).')
|
||||
|
||||
self.stdout.write(self.style.SUCCESS('All mandatory staging checks passed.'))
|
||||
Reference in New Issue
Block a user