139 lines
4.6 KiB
Python
139 lines
4.6 KiB
Python
from flask import current_app, render_template
|
|
from flask_security import current_user
|
|
from flask_mailman import EmailMessage
|
|
from itsdangerous import URLSafeTimedSerializer
|
|
import socket
|
|
|
|
from common.models.user import Role
|
|
from common.utils.nginx_utils import prefixed_url_for
|
|
|
|
|
|
def confirm_token(token, expiration=3600):
|
|
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
|
try:
|
|
email = serializer.loads(token, salt=current_app.config['SECURITY_PASSWORD_SALT'], max_age=expiration)
|
|
except Exception as e:
|
|
current_app.logger.error(f'Error confirming token: {e}')
|
|
raise
|
|
return email
|
|
|
|
|
|
def send_email(to, subject, template):
|
|
msg = EmailMessage(subject=subject,
|
|
body=template,
|
|
to=[to])
|
|
msg.content_subtype = "html"
|
|
msg.send()
|
|
|
|
|
|
def generate_reset_token(email):
|
|
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
|
return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT'])
|
|
|
|
|
|
def generate_confirmation_token(email):
|
|
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
|
return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT'])
|
|
|
|
|
|
def send_confirmation_email(user):
|
|
if not test_smtp_connection():
|
|
raise Exception("Failed to connect to SMTP server")
|
|
|
|
token = generate_confirmation_token(user.email)
|
|
confirm_url = prefixed_url_for('security_bp.confirm_email', token=token, _external=True)
|
|
|
|
html = render_template('email/activate.html', confirm_url=confirm_url)
|
|
subject = "Please confirm your email"
|
|
|
|
try:
|
|
send_email(user.email, "Confirm your email", html)
|
|
current_app.logger.info(f'Confirmation email sent to {user.email}')
|
|
except Exception as e:
|
|
current_app.logger.error(f'Failed to send confirmation email to {user.email}. Error: {str(e)}')
|
|
raise
|
|
|
|
|
|
def send_reset_email(user):
|
|
token = generate_reset_token(user.email)
|
|
reset_url = prefixed_url_for('security_bp.reset_password', token=token, _external=True)
|
|
|
|
html = render_template('email/reset_password.html', reset_url=reset_url)
|
|
subject = "Reset Your Password"
|
|
|
|
try:
|
|
send_email(user.email, "Reset Your Password", html)
|
|
current_app.logger.info(f'Reset email sent to {user.email}')
|
|
except Exception as e:
|
|
current_app.logger.error(f'Failed to send reset email to {user.email}. Error: {str(e)}')
|
|
raise
|
|
|
|
|
|
def test_smtp_connection():
|
|
try:
|
|
current_app.logger.info(f"Attempting to resolve google.com...")
|
|
google_ip = socket.gethostbyname('google.com')
|
|
current_app.logger.info(f"Successfully resolved google.com to {google_ip}")
|
|
except Exception as e:
|
|
current_app.logger.error(f"Failed to resolve google.com: {str(e)}")
|
|
|
|
try:
|
|
smtp_server = current_app.config['MAIL_SERVER']
|
|
current_app.logger.info(f"Attempting to resolve {smtp_server}...")
|
|
smtp_ip = socket.gethostbyname(smtp_server)
|
|
current_app.logger.info(f"Successfully resolved {smtp_server} to {smtp_ip}")
|
|
except Exception as e:
|
|
current_app.logger.error(f"Failed to resolve {smtp_server}: {str(e)}")
|
|
|
|
try:
|
|
smtp_server = current_app.config['MAIL_SERVER']
|
|
smtp_port = current_app.config['MAIL_PORT']
|
|
sock = socket.create_connection((smtp_server, smtp_port), timeout=10)
|
|
sock.close()
|
|
current_app.logger.info(f"Successfully connected to SMTP server {smtp_server}:{smtp_port}")
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(f"Failed to connect to SMTP server: {str(e)}")
|
|
return False
|
|
|
|
|
|
def get_current_user_roles():
|
|
"""Get the roles of the currently authenticated user.
|
|
|
|
Returns:
|
|
List of Role objects or empty list if no user is authenticated
|
|
"""
|
|
if current_user.is_authenticated:
|
|
return current_user.roles
|
|
return []
|
|
|
|
|
|
def current_user_has_role(role_name):
|
|
"""Check if the current user has the specified role.
|
|
|
|
Args:
|
|
role_name (str): Name of the role to check
|
|
|
|
Returns:
|
|
bool: True if user has the role, False otherwise
|
|
"""
|
|
if not current_user.is_authenticated:
|
|
return False
|
|
|
|
return any(role.name == role_name for role in current_user.roles)
|
|
|
|
|
|
def current_user_roles():
|
|
"""Get the roles of the currently authenticated user.
|
|
|
|
Returns:
|
|
List of Role objects or empty list if no user is authenticated
|
|
"""
|
|
if current_user.is_authenticated:
|
|
return current_user.roles
|
|
return []
|
|
|
|
|
|
def all_user_roles():
|
|
roles = [(role.id, role.name) for role in Role.query.all()]
|