- Introduction of Partner Admin role in combination with 'Management Partner' type.

This commit is contained in:
Josako
2025-04-09 09:40:59 +02:00
parent c2c3b01b28
commit f43e79376c
17 changed files with 368 additions and 111 deletions

View File

@@ -144,9 +144,9 @@ class TenantDomain(db.Model):
# Versioning Information
created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now())
created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False)
created_by = db.Column(db.Integer, db.ForeignKey('public.user.id'), nullable=False)
updated_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now(), onupdate=db.func.now())
updated_by = db.Column(db.Integer, db.ForeignKey(User.id))
updated_by = db.Column(db.Integer, db.ForeignKey('public.user.id'))
def __repr__(self):
return f"<TenantDomain {self.id}: {self.domain}>"
@@ -202,13 +202,25 @@ class Partner(db.Model):
tenant = db.relationship('Tenant', backref=db.backref('partner', uselist=False))
def to_dict(self):
services_info = []
for service in self.services:
services_info.append({
'id': service.id,
'name': service.name,
'description': service.description,
'type': service.type,
'type_version': service.type_version,
'active': service.active,
'configuration': service.configuration
})
return {
'id': self.id,
'tenant_id': self.tenant_id,
'code': self.code,
'logo_url': self.logo_url,
'active': self.active,
'name': self.tenant.name
'name': self.tenant.name,
'services': services_info
}

View File

View File

@@ -0,0 +1,44 @@
from flask import session
from common.models.user import Partner, Role
# common/services/user_service.py
from common.utils.eveai_exceptions import EveAIRoleAssignmentException
from common.utils.security_utils import current_user_has_role, all_user_roles
class UserService:
@staticmethod
def get_assignable_roles():
"""Retrieves roles that can be assigned to a user depending on the current user logged in,
and the active tenant for the session"""
current_tenant_id = session.get('tenant').get('id', None)
effective_role_names = []
if current_tenant_id:
if current_user_has_role("Super User"):
if current_tenant_id == 1:
effective_role_names.append("Super User")
if session.get('partner'):
effective_role_names.append("Partner Admin")
effective_role_names.append("Tenant Admin")
if current_user_has_role("Tenant Admin"):
effective_role_names.append("Tenant Admin")
if current_user_has_role("Partner Admin"):
effective_role_names.append("Tenant Admin")
if session.get('partner'):
if session.get('partner').get('tenant_id') == current_tenant_id:
effective_role_names.append("Partner Admin")
effective_role_names = list(set(effective_role_names))
effective_roles = [(role.id, role.name) for role in
Role.query.filter(Role.name.in_(effective_role_names)).all()]
return effective_roles
else:
return []
@staticmethod
def validate_role_assignments(role_ids):
"""Validate a set of role assignments, raising exception for first invalid role"""
assignable_roles = UserService.get_assignable_roles()
assignable_role_ids = {role[0] for role in assignable_roles}
role_id_set = set(role_ids)
return role_id_set.issubset(assignable_role_ids)

View File

@@ -147,3 +147,10 @@ class EveAIDoublePartner(EveAIException):
message = f"Tenant with ID '{tenant_id}' is already defined as a Partner."
super().__init__(message, status_code, payload)
class EveAIRoleAssignmentException(EveAIException):
"""Exception raised when a role cannot be assigned due to business rules"""
def __init__(self, message, status_code=403, payload=None):
super().__init__(message, status_code, payload)

View File

@@ -1,7 +1,7 @@
from flask import session, current_app
from sqlalchemy import and_
from common.models.user import Tenant
from common.models.user import Tenant, Partner
from common.models.entitlements import License
from common.utils.database import Database
from common.utils.eveai_exceptions import EveAITenantNotFound, EveAITenantInvalid, EveAINoActiveLicense
@@ -13,13 +13,19 @@ def set_tenant_session_data(sender, user, **kwargs):
tenant = Tenant.query.filter_by(id=user.tenant_id).first()
session['tenant'] = tenant.to_dict()
session['default_language'] = tenant.default_language
session['default_llm_model'] = tenant.llm_model
partner = Partner.query.filter_by(tenant_id=user.tenant_id).first()
if partner:
session['partner'] = partner.to_dict()
else:
# Remove partner from session if it exists
session.pop('partner', None)
def clear_tenant_session_data(sender, user, **kwargs):
session.pop('tenant', None)
session.pop('default_language', None)
session.pop('default_llm_model', None)
session.pop('partner', None)
def is_valid_tenant(tenant_id):
@@ -40,4 +46,4 @@ def is_valid_tenant(tenant_id):
if not active_license:
raise EveAINoActiveLicense(tenant_id)
return True
return True

View File

@@ -1,8 +1,10 @@
from flask import current_app, render_template
from flask_security import current_user
from flask_mailman import EmailMessage
from itsdangerous import URLSafeTimedSerializer
import socket
from common.models.user import Role
from common.utils.nginx_utils import prefixed_url_for
@@ -93,3 +95,44 @@ def test_smtp_connection():
except Exception as e:
current_app.logger.error(f"Failed to connect to SMTP server: {str(e)}")
return False
def get_current_user_roles():
"""Get the roles of the currently authenticated user.
Returns:
List of Role objects or empty list if no user is authenticated
"""
if current_user.is_authenticated:
return current_user.roles
return []
def current_user_has_role(role_name):
"""Check if the current user has the specified role.
Args:
role_name (str): Name of the role to check
Returns:
bool: True if user has the role, False otherwise
"""
if not current_user.is_authenticated:
return False
return any(role.name == role_name for role in current_user.roles)
def current_user_roles():
"""Get the roles of the currently authenticated user.
Returns:
List of Role objects or empty list if no user is authenticated
"""
if current_user.is_authenticated:
return current_user.roles
return []
def all_user_roles():
roles = [(role.id, role.name) for role in Role.query.all()]