- Consent giving UI introduced

- Possibility to view the document version the consent is given to
- Blocking functionality is no valid consent
This commit is contained in:
Josako
2025-10-15 18:35:28 +02:00
parent 3ea3a06de6
commit eeb76d57b7
22 changed files with 803 additions and 126 deletions

View File

@@ -1,5 +1,6 @@
from common.services.user.user_services import UserServices
from common.services.user.partner_services import PartnerServices
from common.services.user.tenant_services import TenantServices
from common.services.user.consent_service import ConsentService
__all__ = ['UserServices', 'PartnerServices', 'TenantServices']
__all__ = ['UserServices', 'PartnerServices', 'TenantServices', 'ConsentService']

View File

@@ -0,0 +1,254 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime as dt, timezone as tz
from typing import List, Optional, Tuple, Dict
from flask import current_app, request, session
from flask_security import current_user
from sqlalchemy import desc
from sqlalchemy.exc import SQLAlchemyError, IntegrityError
from common.extensions import db
from common.models.user import TenantConsent, ConsentVersion, ConsentStatus, PartnerService, PartnerTenant, Tenant
@dataclass
class TypeStatus:
consent_type: str
status: ConsentStatus
active_version: Optional[str]
last_version: Optional[str]
class ConsentService:
@staticmethod
def get_required_consent_types() -> List[str]:
return list(current_app.config.get("CONSENT_TYPES", []))
@staticmethod
def get_active_consent_version(consent_type: str) -> Optional[ConsentVersion]:
try:
# Active version: the one with consent_valid_to IS NULL, latest for this type
return (ConsentVersion.query
.filter_by(consent_type=consent_type, consent_valid_to=None)
.order_by(desc(ConsentVersion.consent_valid_from))
.first())
except SQLAlchemyError as e:
current_app.logger.error(f"DB error in get_active_consent_version({consent_type}): {e}")
return None
@staticmethod
def get_tenant_last_consent(tenant_id: int, consent_type: str) -> Optional[TenantConsent]:
try:
return (TenantConsent.query
.filter_by(tenant_id=tenant_id, consent_type=consent_type)
.order_by(desc(TenantConsent.id))
.first())
except SQLAlchemyError as e:
current_app.logger.error(f"DB error in get_tenant_last_consent({tenant_id}, {consent_type}): {e}")
return None
@staticmethod
def evaluate_type_status(tenant_id: int, consent_type: str) -> TypeStatus:
active = ConsentService.get_active_consent_version(consent_type)
if not active:
current_app.logger.error(f"No active ConsentVersion found for type {consent_type}")
return TypeStatus(consent_type, ConsentStatus.UNKNOWN_CONSENT_VERSION, None, None)
last = ConsentService.get_tenant_last_consent(tenant_id, consent_type)
if not last:
return TypeStatus(consent_type, ConsentStatus.NOT_CONSENTED, active.consent_version, None)
# If last consent equals active → CONSENTED
if last.consent_version == active.consent_version:
return TypeStatus(consent_type, ConsentStatus.CONSENTED, active.consent_version, last.consent_version)
# Else: last refers to an older version; check its ConsentVersion to see grace period
prev_cv = ConsentVersion.query.filter_by(consent_type=consent_type,
consent_version=last.consent_version).first()
if not prev_cv:
current_app.logger.error(f"Tenant {tenant_id} references unknown ConsentVersion {last.consent_version} for {consent_type}")
return TypeStatus(consent_type, ConsentStatus.UNKNOWN_CONSENT_VERSION, active.consent_version, last.consent_version)
if prev_cv.consent_valid_to:
now = dt.now(tz.utc)
if prev_cv.consent_valid_to >= now:
# Within transition window
return TypeStatus(consent_type, ConsentStatus.RENEWAL_REQUIRED, active.consent_version, last.consent_version)
else:
return TypeStatus(consent_type, ConsentStatus.NOT_CONSENTED, active.consent_version, last.consent_version)
else:
# Should not happen if a newer active exists; treat as unknown config
current_app.logger.error(f"Previous ConsentVersion without valid_to while a newer active exists for {consent_type}")
return TypeStatus(consent_type, ConsentStatus.UNKNOWN_CONSENT_VERSION, active.consent_version, last.consent_version)
@staticmethod
def aggregate_status(type_statuses: List[TypeStatus]) -> ConsentStatus:
# Priority: UNKNOWN > NOT_CONSENTED > RENEWAL_REQUIRED > CONSENTED
priorities = {
ConsentStatus.UNKNOWN_CONSENT_VERSION: 4,
ConsentStatus.NOT_CONSENTED: 3,
ConsentStatus.RENEWAL_REQUIRED: 2,
ConsentStatus.CONSENTED: 1,
}
if not type_statuses:
return ConsentStatus.CONSENTED
worst = max(type_statuses, key=lambda ts: priorities.get(ts.status, 0))
return worst.status
@staticmethod
def get_consent_status(tenant_id: int) -> ConsentStatus:
statuses = [ConsentService.evaluate_type_status(tenant_id, ct) for ct in ConsentService.get_required_consent_types()]
return ConsentService.aggregate_status(statuses)
@staticmethod
def _is_tenant_admin_for(tenant_id: int) -> bool:
try:
return current_user.is_authenticated and current_user.has_roles('Tenant Admin') and getattr(current_user, 'tenant_id', None) == tenant_id
except Exception:
return False
@staticmethod
def _is_management_partner_for(tenant_id: int) -> Tuple[bool, Optional[int], Optional[int]]:
"""Return (allowed, partner_id, partner_service_id) for management partner context."""
try:
if not (current_user.is_authenticated and current_user.has_roles('Partner Admin')):
return False, None, None
# Check PartnerTenant relationship via MANAGEMENT_SERVICE
ps = PartnerService.query.filter_by(type='MANAGEMENT_SERVICE').all()
if not ps:
return False, None, None
ps_ids = [p.id for p in ps]
pt = PartnerTenant.query.filter_by(tenant_id=tenant_id).filter(PartnerTenant.partner_service_id.in_(ps_ids)).first()
if not pt:
return False, None, None
the_ps = PartnerService.query.get(pt.partner_service_id)
return True, the_ps.partner_id if the_ps else None, the_ps.id if the_ps else None
except Exception as e:
current_app.logger.error(f"Error in _is_management_partner_for: {e}")
return False, None, None
@staticmethod
def can_consent_on_behalf(tenant_id: int) -> Tuple[bool, str, Optional[int], Optional[int]]:
# Returns: allowed, mode('tenant_admin'|'management_partner'), partner_id, partner_service_id
if ConsentService._is_tenant_admin_for(tenant_id):
return True, 'tenant_admin', None, None
allowed, partner_id, partner_service_id = ConsentService._is_management_partner_for(tenant_id)
if allowed:
return True, 'management_partner', partner_id, partner_service_id
return False, 'none', None, None
@staticmethod
def _resolve_consent_content(consent_type: str, version: str) -> Dict:
"""Resolve canonical file ref and hash for a consent document.
Uses configurable base dir, type subpaths, and patch-dir strategy.
Defaults:
- base: 'content'
- map: {'Data Privacy Agreement':'dpa','Terms & Conditions':'terms'}
- strategy: 'major_minor' -> a.b.c => a.b/a.b.c.md
- ext: '.md'
"""
import hashlib
from pathlib import Path
cfg = current_app.config if current_app else {}
base_dir = cfg.get('CONSENT_CONTENT_BASE_DIR', 'content')
type_paths = cfg.get('CONSENT_TYPE_PATHS', {
'Data Privacy Agreement': 'dpa',
'Terms & Conditions': 'terms',
})
strategy = cfg.get('CONSENT_PATCH_DIR_STRATEGY', 'major_minor')
ext = cfg.get('CONSENT_MARKDOWN_EXT', '.md')
type_dir = type_paths.get(consent_type, consent_type.lower().replace(' ', '_'))
subpath = ''
filename = f"{version}{ext}"
try:
parts = version.split('.')
if strategy == 'major_minor' and len(parts) >= 2:
subpath = f"{parts[0]}.{parts[1]}"
filename = f"{parts[0]}.{parts[1]}.{parts[2] if len(parts)>2 else '0'}{ext}"
# Build canonical path
if subpath:
canonical_ref = f"{base_dir}/{type_dir}/{subpath}/{filename}"
else:
canonical_ref = f"{base_dir}/{type_dir}/{filename}"
except Exception:
canonical_ref = f"{base_dir}/{type_dir}/{version}{ext}"
# Read file and hash
content_hash = ''
try:
# project root = parent of app package
root = Path(current_app.root_path).parent if current_app else Path('.')
fpath = root / canonical_ref
content_bytes = fpath.read_bytes() if fpath.exists() else b''
content_hash = hashlib.sha256(content_bytes).hexdigest() if content_bytes else ''
except Exception:
content_hash = ''
return {
'canonical_document_ref': canonical_ref,
'content_hash': content_hash,
}
@staticmethod
def record_consent(tenant_id: int, consent_type: str) -> TenantConsent:
# Validate type
if consent_type not in ConsentService.get_required_consent_types():
raise ValueError(f"Unknown consent type: {consent_type}")
active = ConsentService.get_active_consent_version(consent_type)
if not active:
raise RuntimeError(f"No active ConsentVersion for type {consent_type}")
allowed, mode, partner_id, partner_service_id = ConsentService.can_consent_on_behalf(tenant_id)
if not allowed:
raise PermissionError("Not authorized to record consent for this tenant")
# Idempotency: if already consented for active version, return existing
existing = (TenantConsent.query
.filter_by(tenant_id=tenant_id, consent_type=consent_type, consent_version=active.consent_version)
.first())
if existing:
return existing
# Build consent_data with audit info
ip = request.headers.get('X-Forwarded-For', '').split(',')[0].strip() or request.remote_addr or ''
ua = request.headers.get('User-Agent', '')
locale = session.get('locale') or request.accept_languages.best or ''
content_meta = ConsentService._resolve_consent_content(consent_type, active.consent_version)
consent_data = {
'source_ip': ip,
'user_agent': ua,
'locale': locale,
**content_meta,
}
tc = TenantConsent(
tenant_id=tenant_id,
partner_id=partner_id,
partner_service_id=partner_service_id,
user_id=getattr(current_user, 'id', None) or 0,
consent_type=consent_type,
consent_version=active.consent_version,
consent_data=consent_data,
)
try:
db.session.add(tc)
db.session.commit()
current_app.logger.info(f"Consent recorded: tenant={tenant_id}, type={consent_type}, version={active.consent_version}, mode={mode}, user={getattr(current_user, 'id', None)}")
return tc
except IntegrityError as e:
db.session.rollback()
# In case of race, fetch existing
current_app.logger.warning(f"IntegrityError on consent insert, falling back: {e}")
existing = (TenantConsent.query
.filter_by(tenant_id=tenant_id, consent_type=consent_type, consent_version=active.consent_version)
.first())
if existing:
return existing
raise
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.error(f"DB error in record_consent: {e}")
raise

View File

@@ -177,52 +177,6 @@ class TenantServices:
@staticmethod
def get_consent_status(tenant_id: int) -> ConsentStatus:
cts = current_app.config.get("CONSENT_TYPES")
status = ConsentStatus.CONSENTED
for ct in cts:
consent = (TenantConsent.query.filter_by(tenant_id=tenant_id, consent_type=ct)
.order_by(desc(TenantConsent.id))
.first())
if not consent:
status = ConsentStatus.NOT_CONSENTED
break
cv = ConsentVersion.query.filter_by(consent_type=ct, consent_version=consent.consent_version).first()
if not cv:
current_app.logger.error(f"Consent version {consent.consent_version} not found checking tenant {tenant_id}")
status = ConsentStatus.UNKNOWN_CONSENT_VERSION
break
if cv.consent_valid_to:
if cv.consent_valid_to.date() >= dt.now(tz.utc).date():
status = ConsentStatus.RENEWAL_REQUIRED
break
else:
status = ConsentStatus.NOT_CONSENTED
break
return status
@staticmethod
def get_consent_status_details(tenant_id: int) -> Dict[str, str]:
cts = current_app.config.get("CONSENT_TYPES")
details = {}
for ct in cts:
ct = cv.consent_type
consent = (TenantConsent.query.filter_by(tenant_id=tenant_id, consent_type=ct)
.order_by(desc(TenantConsent.id))
.first())
if not consent:
details[ct] = {
'status': str(ConsentStatus.NOT_CONSENTED),
'version': str(cv.consent_version)
}
continue
if cv.consent_valid_to.date >= dt.now(tz.utc).date():
details[ct] = {
'status': str(ConsentStatus.RENEWAL_REQUIRED),
'version': str(cv.consent_version)
}
details[ct] = {
'status': str(ConsentStatus.CONSENTED),
'version': str(cv.consent_version)
}
# Delegate to centralized ConsentService to ensure consistent logic
from common.services.user.consent_service import ConsentService
return ConsentService.get_consent_status(tenant_id)