108 lines
4.2 KiB
Python
108 lines
4.2 KiB
Python
from google.cloud import kms_v1
|
|
from base64 import b64encode, b64decode
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Random import get_random_bytes
|
|
import random
|
|
import time
|
|
from flask import Flask
|
|
import os
|
|
|
|
|
|
def generate_api_key(prefix="EveAI-Chat"):
|
|
parts = [str(random.randint(1000, 9999)) for _ in range(5)]
|
|
return f"{prefix}-{'-'.join(parts)}"
|
|
|
|
|
|
class JosKMSClient(kms_v1.KeyManagementServiceClient):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.key_name = None
|
|
self.crypto_key = None
|
|
self.key_ring = None
|
|
self.location = None
|
|
self.project_id = None
|
|
|
|
def init_app(self, app: Flask):
|
|
self.project_id = app.config.get('GC_PROJECT_NAME')
|
|
self.location = app.config.get('GC_LOCATION')
|
|
self.key_ring = app.config.get('GC_KEY_RING')
|
|
self.crypto_key = app.config.get('GC_CRYPTO_KEY')
|
|
self.key_name = self.crypto_key_path(self.project_id, self.location, self.key_ring, self.crypto_key)
|
|
app.logger.info(f'Project ID: {self.project_id}')
|
|
app.logger.info(f'Location: {self.location}')
|
|
app.logger.info(f'Key Ring: {self.key_ring}')
|
|
app.logger.info(f'Crypto Key: {self.crypto_key}')
|
|
app.logger.info(f'Key Name: {self.key_name}')
|
|
|
|
app.logger.info(f'Service Account Key Path: {os.getenv('GOOGLE_APPLICATION_CREDENTIALS')}')
|
|
|
|
os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id
|
|
|
|
def encrypt_api_key(self, api_key):
|
|
"""Encrypts the API key using the latest version of the KEK."""
|
|
dek = get_random_bytes(32) # AES 256-bit key
|
|
cipher = AES.new(dek, AES.MODE_GCM)
|
|
ciphertext, tag = cipher.encrypt_and_digest(api_key.encode())
|
|
# print(f'Dek: {dek}')
|
|
|
|
# Encrypt the DEK using the latest version of the Google Cloud KMS key
|
|
encrypt_response = self.encrypt(
|
|
request={'name': self.key_name, 'plaintext': dek}
|
|
)
|
|
encrypted_dek = encrypt_response.ciphertext
|
|
# print(f"Encrypted DEK: {encrypted_dek}")
|
|
#
|
|
# # Check
|
|
# decrypt_response = self.decrypt(
|
|
# request={'name': self.key_name, 'ciphertext': encrypted_dek}
|
|
# )
|
|
# decrypted_dek = decrypt_response.plaintext
|
|
# print(f"Decrypted DEK: {decrypted_dek}")
|
|
|
|
# Store the version of the key used
|
|
key_version = encrypt_response.name
|
|
|
|
return {
|
|
'key_version': key_version,
|
|
'encrypted_dek': b64encode(encrypted_dek).decode('utf-8'),
|
|
'nonce': b64encode(cipher.nonce).decode('utf-8'),
|
|
'tag': b64encode(tag).decode('utf-8'),
|
|
'ciphertext': b64encode(ciphertext).decode('utf-8')
|
|
}
|
|
|
|
def decrypt_api_key(self, encrypted_data):
|
|
"""Decrypts the API key using the specified key version."""
|
|
key_version = encrypted_data['key_version']
|
|
key_name = self.key_name
|
|
encrypted_dek = b64decode(encrypted_data['encrypted_dek'].encode('utf-8'))
|
|
nonce = b64decode(encrypted_data['nonce'].encode('utf-8'))
|
|
tag = b64decode(encrypted_data['tag'].encode('utf-8'))
|
|
ciphertext = b64decode(encrypted_data['ciphertext'].encode('utf-8'))
|
|
|
|
# Decrypt the DEK using the specified version of the Google Cloud KMS key
|
|
try:
|
|
decrypt_response = self.decrypt(
|
|
request={'name': key_name, 'ciphertext': encrypted_dek}
|
|
)
|
|
dek = decrypt_response.plaintext
|
|
except Exception as e:
|
|
print(f"Failed to decrypt DEK: {e}")
|
|
return None
|
|
|
|
cipher = AES.new(dek, AES.MODE_GCM, nonce=nonce)
|
|
api_key = cipher.decrypt_and_verify(ciphertext, tag)
|
|
return api_key.decode()
|
|
|
|
def check_kms_access_and_latency(self):
|
|
# key_name = self.crypto_key_path(self.project_id, self.location, self.key_ring, self.crypto_key)
|
|
#
|
|
# start_time = time.time()
|
|
# try:
|
|
# response = self.get_crypto_key(name=key_name)
|
|
# end_time = time.time()
|
|
# print(f"Response Time: {end_time - start_time} seconds")
|
|
# print("Access to KMS is successful.")
|
|
# except Exception as e:
|
|
# print(f"Failed to access KMS: {e}")
|
|
pass
|