from flask import current_app, render_template from flask_security import current_user from flask_mailman import EmailMessage from itsdangerous import URLSafeTimedSerializer import socket from common.models.user import Role from common.utils.nginx_utils import prefixed_url_for def confirm_token(token, expiration=3600): serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) try: email = serializer.loads(token, salt=current_app.config['SECURITY_PASSWORD_SALT'], max_age=expiration) except Exception as e: current_app.logger.error(f'Error confirming token: {e}') raise return email def send_email(to, subject, template): msg = EmailMessage(subject=subject, body=template, to=[to]) msg.content_subtype = "html" msg.send() def generate_reset_token(email): serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT']) def generate_confirmation_token(email): serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT']) def send_confirmation_email(user): if not test_smtp_connection(): raise Exception("Failed to connect to SMTP server") token = generate_confirmation_token(user.email) confirm_url = prefixed_url_for('security_bp.confirm_email', token=token, _external=True) html = render_template('email/activate.html', confirm_url=confirm_url) subject = "Please confirm your email" try: send_email(user.email, "Confirm your email", html) current_app.logger.info(f'Confirmation email sent to {user.email}') except Exception as e: current_app.logger.error(f'Failed to send confirmation email to {user.email}. Error: {str(e)}') raise def send_reset_email(user): token = generate_reset_token(user.email) reset_url = prefixed_url_for('security_bp.reset_password', token=token, _external=True) html = render_template('email/reset_password.html', reset_url=reset_url) subject = "Reset Your Password" try: send_email(user.email, "Reset Your Password", html) current_app.logger.info(f'Reset email sent to {user.email}') except Exception as e: current_app.logger.error(f'Failed to send reset email to {user.email}. Error: {str(e)}') raise def test_smtp_connection(): try: current_app.logger.info(f"Attempting to resolve google.com...") google_ip = socket.gethostbyname('google.com') current_app.logger.info(f"Successfully resolved google.com to {google_ip}") except Exception as e: current_app.logger.error(f"Failed to resolve google.com: {str(e)}") try: smtp_server = current_app.config['MAIL_SERVER'] current_app.logger.info(f"Attempting to resolve {smtp_server}...") smtp_ip = socket.gethostbyname(smtp_server) current_app.logger.info(f"Successfully resolved {smtp_server} to {smtp_ip}") except Exception as e: current_app.logger.error(f"Failed to resolve {smtp_server}: {str(e)}") try: smtp_server = current_app.config['MAIL_SERVER'] smtp_port = current_app.config['MAIL_PORT'] sock = socket.create_connection((smtp_server, smtp_port), timeout=10) sock.close() current_app.logger.info(f"Successfully connected to SMTP server {smtp_server}:{smtp_port}") return True except Exception as e: current_app.logger.error(f"Failed to connect to SMTP server: {str(e)}") return False def get_current_user_roles(): """Get the roles of the currently authenticated user. Returns: List of Role objects or empty list if no user is authenticated """ if current_user.is_authenticated: return current_user.roles return [] def current_user_has_role(role_name): """Check if the current user has the specified role. Args: role_name (str): Name of the role to check Returns: bool: True if user has the role, False otherwise """ if not current_user.is_authenticated: return False return any(role.name == role_name for role in current_user.roles) def current_user_roles(): """Get the roles of the currently authenticated user. Returns: List of Role objects or empty list if no user is authenticated """ if current_user.is_authenticated: return current_user.roles return [] def all_user_roles(): roles = [(role.id, role.name) for role in Role.query.all()]