Implement chat API key generation, and create a tenant_overview

This commit is contained in:
Josako
2024-05-16 23:22:26 +02:00
parent 8c6d9bf5ca
commit 883988dbab
12 changed files with 387 additions and 52 deletions

View File

@@ -7,6 +7,7 @@ from flask_login import LoginManager
from flask_cors import CORS
from flask_socketio import SocketIO
from .utils.key_encryption import JosKMSClient
# Create extensions
db = SQLAlchemy()
@@ -17,3 +18,4 @@ mail = Mail()
login_manager = LoginManager()
cors = CORS()
socketio = SocketIO()
kms_client = JosKMSClient()

View File

@@ -37,7 +37,7 @@ class Tenant(db.Model):
license_start_date = db.Column(db.Date, nullable=True)
license_end_date = db.Column(db.Date, nullable=True)
allowed_monthly_interactions = db.Column(db.Integer, nullable=True)
encrypted_api_key = db.Column(db.String(500), nullable=True)
encrypted_chat_api_key = db.Column(db.String(500), nullable=True)
# Relations
users = db.relationship('User', backref='tenant')

View File

@@ -2,50 +2,68 @@ from google.cloud import kms
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from flask import current_app
client = kms.KeyManagementServiceClient()
key_name = client.crypto_key_path('your-project-id', 'your-key-ring', 'your-crypto-key')
import random
from flask import Flask
def encrypt_api_key(api_key):
"""Encrypts the API key using the latest version of the KEK."""
dek = get_random_bytes(32) # AES 256-bit key
cipher = AES.new(dek, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(api_key.encode())
# Encrypt the DEK using the latest version of the Google Cloud KMS key
encrypt_response = client.encrypt(
request={'name': key_name, 'plaintext': dek}
)
encrypted_dek = encrypt_response.ciphertext
# Store the version of the key used
key_version = encrypt_response.name
return {
'key_version': key_version,
'encrypted_dek': b64encode(encrypted_dek).decode('utf-8'),
'nonce': b64encode(cipher.nonce).decode('utf-8'),
'tag': b64encode(tag).decode('utf-8'),
'ciphertext': b64encode(ciphertext).decode('utf-8')
}
def generate_api_key(prefix="EveAI-Chat"):
parts = [str(random.randint(1000, 9999)) for _ in range(5)]
return f"{prefix}-{'-'.join(parts)}"
def decrypt_api_key(encrypted_data):
"""Decrypts the API key using the specified key version."""
key_version = encrypted_data['key_version']
encrypted_dek = b64decode(encrypted_data['encrypted_dek'])
nonce = b64decode(encrypted_data['nonce'])
tag = b64decode(encrypted_data['tag'])
ciphertext = b64decode(encrypted_data['ciphertext'])
class JosKMSClient(kms.KeyManagementServiceClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.key_name = None
self.crypto_key = None
self.key_ring = None
self.location = None
self.project_id = None
# Decrypt the DEK using the specified version of the Google Cloud KMS key
decrypt_response = client.decrypt(
request={'name': key_version, 'ciphertext': encrypted_dek}
)
dek = decrypt_response.plaintext
def init_app(self, app: Flask):
self.project_id = app.config.get('GC_PROJECT_NAME')
self.location = app.config.get('GC_LOCATION')
self.key_ring = app.config.get('GC_KEY_RING')
self.crypto_key = app.config.get('GC_CRYPTO_KEY')
self.key_name = self.crypto_key_path(self.project_id, self.location, self.key_ring, self.crypto_key)
cipher = AES.new(dek, AES.MODE_GCM, nonce=nonce)
api_key = cipher.decrypt_and_verify(ciphertext, tag)
return api_key.decode()
def encrypt_api_key(self, api_key):
"""Encrypts the API key using the latest version of the KEK."""
dek = get_random_bytes(32) # AES 256-bit key
cipher = AES.new(dek, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(api_key.encode())
# Encrypt the DEK using the latest version of the Google Cloud KMS key
encrypt_response = self.encrypt(
request={'name': self.key_name, 'plaintext': dek}
)
encrypted_dek = encrypt_response.ciphertext
# Store the version of the key used
key_version = encrypt_response.name
return {
'key_version': key_version,
'encrypted_dek': b64encode(encrypted_dek).decode('utf-8'),
'nonce': b64encode(cipher.nonce).decode('utf-8'),
'tag': b64encode(tag).decode('utf-8'),
'ciphertext': b64encode(ciphertext).decode('utf-8')
}
def decrypt_api_key(self, encrypted_data):
"""Decrypts the API key using the specified key version."""
key_version = encrypted_data['key_version']
encrypted_dek = b64decode(encrypted_data['encrypted_dek'])
nonce = b64decode(encrypted_data['nonce'])
tag = b64decode(encrypted_data['tag'])
ciphertext = b64decode(encrypted_data['ciphertext'])
# Decrypt the DEK using the specified version of the Google Cloud KMS key
decrypt_response = self.decrypt(
request={'name': key_version, 'ciphertext': encrypted_dek}
)
dek = decrypt_response.plaintext
cipher = AES.new(dek, AES.MODE_GCM, nonce=nonce)
api_key = cipher.decrypt_and_verify(ciphertext, tag)
return api_key.decode()