from __future__ import annotations from dataclasses import dataclass from datetime import datetime as dt, timezone as tz from typing import List, Optional, Tuple, Dict from flask import current_app, request, session from flask_security import current_user from sqlalchemy import desc from sqlalchemy.exc import SQLAlchemyError, IntegrityError from common.extensions import db from common.models.user import TenantConsent, ConsentVersion, ConsentStatus, PartnerService, PartnerTenant, Tenant @dataclass class TypeStatus: consent_type: str status: ConsentStatus active_version: Optional[str] last_version: Optional[str] class ConsentServices: @staticmethod def get_required_consent_types() -> List[str]: return list(current_app.config.get("CONSENT_TYPES", [])) @staticmethod def get_active_consent_version(consent_type: str) -> Optional[ConsentVersion]: try: # Active version: the one with consent_valid_to IS NULL, latest for this type return (ConsentVersion.query .filter_by(consent_type=consent_type, consent_valid_to=None) .order_by(desc(ConsentVersion.consent_valid_from)) .first()) except SQLAlchemyError as e: current_app.logger.error(f"DB error in get_active_consent_version({consent_type}): {e}") return None @staticmethod def get_tenant_last_consent(tenant_id: int, consent_type: str) -> Optional[TenantConsent]: try: return (TenantConsent.query .filter_by(tenant_id=tenant_id, consent_type=consent_type) .order_by(desc(TenantConsent.id)) .first()) except SQLAlchemyError as e: current_app.logger.error(f"DB error in get_tenant_last_consent({tenant_id}, {consent_type}): {e}") return None @staticmethod def evaluate_type_status(tenant_id: int, consent_type: str) -> TypeStatus: active = ConsentServices.get_active_consent_version(consent_type) if not active: current_app.logger.error(f"No active ConsentVersion found for type {consent_type}") return TypeStatus(consent_type, ConsentStatus.UNKNOWN_CONSENT_VERSION, None, None) last = ConsentServices.get_tenant_last_consent(tenant_id, consent_type) if not last: return TypeStatus(consent_type, ConsentStatus.NOT_CONSENTED, active.consent_version, None) # If last consent equals active → CONSENTED if last.consent_version == active.consent_version: return TypeStatus(consent_type, ConsentStatus.CONSENTED, active.consent_version, last.consent_version) # Else: last refers to an older version; check its ConsentVersion to see grace period prev_cv = ConsentVersion.query.filter_by(consent_type=consent_type, consent_version=last.consent_version).first() if not prev_cv: current_app.logger.error(f"Tenant {tenant_id} references unknown ConsentVersion {last.consent_version} for {consent_type}") return TypeStatus(consent_type, ConsentStatus.UNKNOWN_CONSENT_VERSION, active.consent_version, last.consent_version) if prev_cv.consent_valid_to: now = dt.now(tz.utc) if prev_cv.consent_valid_to >= now: # Within transition window return TypeStatus(consent_type, ConsentStatus.RENEWAL_REQUIRED, active.consent_version, last.consent_version) else: return TypeStatus(consent_type, ConsentStatus.NOT_CONSENTED, active.consent_version, last.consent_version) else: # Should not happen if a newer active exists; treat as unknown config current_app.logger.error(f"Previous ConsentVersion without valid_to while a newer active exists for {consent_type}") return TypeStatus(consent_type, ConsentStatus.UNKNOWN_CONSENT_VERSION, active.consent_version, last.consent_version) @staticmethod def aggregate_status(type_statuses: List[TypeStatus]) -> ConsentStatus: # Priority: UNKNOWN > NOT_CONSENTED > RENEWAL_REQUIRED > CONSENTED priorities = { ConsentStatus.UNKNOWN_CONSENT_VERSION: 4, ConsentStatus.NOT_CONSENTED: 3, ConsentStatus.RENEWAL_REQUIRED: 2, ConsentStatus.CONSENTED: 1, } if not type_statuses: return ConsentStatus.CONSENTED worst = max(type_statuses, key=lambda ts: priorities.get(ts.status, 0)) return worst.status @staticmethod def get_consent_status(tenant_id: int) -> ConsentStatus: statuses = [ConsentServices.evaluate_type_status(tenant_id, ct) for ct in ConsentServices.get_required_consent_types()] return ConsentServices.aggregate_status(statuses) @staticmethod def _is_tenant_admin_for(tenant_id: int) -> bool: try: return current_user.is_authenticated and current_user.has_roles('Tenant Admin') and getattr(current_user, 'tenant_id', None) == tenant_id except Exception: return False @staticmethod def _is_management_partner_for(tenant_id: int) -> Tuple[bool, Optional[int], Optional[int]]: """Return (allowed, partner_id, partner_service_id) for management partner context.""" try: if not (current_user.is_authenticated and current_user.has_roles('Partner Admin')): return False, None, None # Check PartnerTenant relationship via MANAGEMENT_SERVICE ps = PartnerService.query.filter_by(type='MANAGEMENT_SERVICE').all() if not ps: return False, None, None ps_ids = [p.id for p in ps] pt = PartnerTenant.query.filter_by(tenant_id=tenant_id).filter(PartnerTenant.partner_service_id.in_(ps_ids)).first() if not pt: return False, None, None the_ps = PartnerService.query.get(pt.partner_service_id) return True, the_ps.partner_id if the_ps else None, the_ps.id if the_ps else None except Exception as e: current_app.logger.error(f"Error in _is_management_partner_for: {e}") return False, None, None @staticmethod def can_consent_on_behalf(tenant_id: int) -> Tuple[bool, str, Optional[int], Optional[int]]: # Returns: allowed, mode('tenant_admin'|'management_partner'), partner_id, partner_service_id if ConsentServices._is_tenant_admin_for(tenant_id): return True, 'tenant_admin', None, None allowed, partner_id, partner_service_id = ConsentServices._is_management_partner_for(tenant_id) if allowed: return True, 'management_partner', partner_id, partner_service_id return False, 'none', None, None @staticmethod def _resolve_consent_content(consent_type: str, version: str) -> Dict: """Resolve canonical file ref and hash for a consent document. Uses configurable base dir, type subpaths, and patch-dir strategy. Defaults: - base: 'content' - map: {'Data Privacy Agreement':'dpa','Terms & Conditions':'terms'} - strategy: 'major_minor' -> a.b.c => a.b/a.b.c.md - ext: '.md' """ import hashlib from pathlib import Path cfg = current_app.config if current_app else {} base_dir = cfg.get('CONSENT_CONTENT_BASE_DIR', 'content') type_paths = cfg.get('CONSENT_TYPE_PATHS', { 'Data Privacy Agreement': 'dpa', 'Terms & Conditions': 'terms', }) strategy = cfg.get('CONSENT_PATCH_DIR_STRATEGY', 'major_minor') ext = cfg.get('CONSENT_MARKDOWN_EXT', '.md') type_dir = type_paths.get(consent_type, consent_type.lower().replace(' ', '_')) subpath = '' filename = f"{version}{ext}" try: parts = version.split('.') if strategy == 'major_minor' and len(parts) >= 2: subpath = f"{parts[0]}.{parts[1]}" filename = f"{parts[0]}.{parts[1]}.{parts[2] if len(parts)>2 else '0'}{ext}" # Build canonical path if subpath: canonical_ref = f"{base_dir}/{type_dir}/{subpath}/{filename}" else: canonical_ref = f"{base_dir}/{type_dir}/{filename}" except Exception: canonical_ref = f"{base_dir}/{type_dir}/{version}{ext}" # Read file and hash content_hash = '' try: # project root = parent of app package root = Path(current_app.root_path).parent if current_app else Path('.') fpath = root / canonical_ref content_bytes = fpath.read_bytes() if fpath.exists() else b'' content_hash = hashlib.sha256(content_bytes).hexdigest() if content_bytes else '' except Exception: content_hash = '' return { 'canonical_document_ref': canonical_ref, 'content_hash': content_hash, } @staticmethod def record_consent(tenant_id: int, consent_type: str) -> TenantConsent: # Validate type if consent_type not in ConsentServices.get_required_consent_types(): raise ValueError(f"Unknown consent type: {consent_type}") active = ConsentServices.get_active_consent_version(consent_type) if not active: raise RuntimeError(f"No active ConsentVersion for type {consent_type}") allowed, mode, partner_id, partner_service_id = ConsentServices.can_consent_on_behalf(tenant_id) if not allowed: raise PermissionError("Not authorized to record consent for this tenant") # Idempotency: if already consented for active version, return existing existing = (TenantConsent.query .filter_by(tenant_id=tenant_id, consent_type=consent_type, consent_version=active.consent_version) .first()) if existing: return existing # Build consent_data with audit info ip = request.headers.get('X-Forwarded-For', '').split(',')[0].strip() or request.remote_addr or '' ua = request.headers.get('User-Agent', '') locale = session.get('locale') or request.accept_languages.best or '' content_meta = ConsentServices._resolve_consent_content(consent_type, active.consent_version) consent_data = { 'source_ip': ip, 'user_agent': ua, 'locale': locale, **content_meta, } tc = TenantConsent( tenant_id=tenant_id, partner_id=partner_id, partner_service_id=partner_service_id, user_id=getattr(current_user, 'id', None) or 0, consent_type=consent_type, consent_version=active.consent_version, consent_data=consent_data, ) try: db.session.add(tc) db.session.commit() current_app.logger.info(f"Consent recorded: tenant={tenant_id}, type={consent_type}, version={active.consent_version}, mode={mode}, user={getattr(current_user, 'id', None)}") return tc except IntegrityError as e: db.session.rollback() # In case of race, fetch existing current_app.logger.warning(f"IntegrityError on consent insert, falling back: {e}") existing = (TenantConsent.query .filter_by(tenant_id=tenant_id, consent_type=consent_type, consent_version=active.consent_version) .first()) if existing: return existing raise except SQLAlchemyError as e: db.session.rollback() current_app.logger.error(f"DB error in record_consent: {e}") raise