from google.cloud import kms_v1 from base64 import b64encode, b64decode from Crypto.Cipher import AES from Crypto.Random import get_random_bytes import random import time from flask import Flask import os import ast def generate_api_key(prefix="EveAI-Chat"): parts = [str(random.randint(1000, 9999)) for _ in range(5)] return f"{prefix}-{'-'.join(parts)}" class JosKMSClient(kms_v1.KeyManagementServiceClient): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.key_name = None self.crypto_key = None self.key_ring = None self.location = None self.project_id = None def init_app(self, app: Flask): self.project_id = app.config.get('GC_PROJECT_NAME') self.location = app.config.get('GC_LOCATION') self.key_ring = app.config.get('GC_KEY_RING') self.crypto_key = app.config.get('GC_CRYPTO_KEY') self.key_name = self.crypto_key_path(self.project_id, self.location, self.key_ring, self.crypto_key) app.logger.info(f'Project ID: {self.project_id}') app.logger.info(f'Location: {self.location}') app.logger.info(f'Key Ring: {self.key_ring}') app.logger.info(f'Crypto Key: {self.crypto_key}') app.logger.info(f'Key Name: {self.key_name}') app.logger.info(f'Service Account Key Path: {os.getenv('GOOGLE_APPLICATION_CREDENTIALS')}') os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id def encrypt_api_key(self, api_key): """Encrypts the API key using the latest version of the KEK.""" dek = get_random_bytes(32) # AES 256-bit key cipher = AES.new(dek, AES.MODE_GCM) ciphertext, tag = cipher.encrypt_and_digest(api_key.encode()) # print(f'Dek: {dek}') # Encrypt the DEK using the latest version of the Google Cloud KMS key encrypt_response = self.encrypt( request={'name': self.key_name, 'plaintext': dek} ) encrypted_dek = encrypt_response.ciphertext # print(f"Encrypted DEK: {encrypted_dek}") # # # Check # decrypt_response = self.decrypt( # request={'name': self.key_name, 'ciphertext': encrypted_dek} # ) # decrypted_dek = decrypt_response.plaintext # print(f"Decrypted DEK: {decrypted_dek}") # Store the version of the key used key_version = encrypt_response.name return { 'key_version': key_version, 'encrypted_dek': b64encode(encrypted_dek).decode('utf-8'), 'nonce': b64encode(cipher.nonce).decode('utf-8'), 'tag': b64encode(tag).decode('utf-8'), 'ciphertext': b64encode(ciphertext).decode('utf-8') } def decrypt_api_key(self, encrypted_data): """Decrypts the API key using the specified key version.""" if isinstance(encrypted_data, str): encrypted_data = ast.literal_eval(encrypted_data) key_version = encrypted_data['key_version'] key_name = self.key_name encrypted_dek = b64decode(encrypted_data['encrypted_dek'].encode('utf-8')) nonce = b64decode(encrypted_data['nonce'].encode('utf-8')) tag = b64decode(encrypted_data['tag'].encode('utf-8')) ciphertext = b64decode(encrypted_data['ciphertext'].encode('utf-8')) # Decrypt the DEK using the specified version of the Google Cloud KMS key try: decrypt_response = self.decrypt( request={'name': key_name, 'ciphertext': encrypted_dek} ) dek = decrypt_response.plaintext except Exception as e: print(f"Failed to decrypt DEK: {e}") return None cipher = AES.new(dek, AES.MODE_GCM, nonce=nonce) api_key = cipher.decrypt_and_verify(ciphertext, tag) return api_key.decode() def check_kms_access_and_latency(self): # key_name = self.crypto_key_path(self.project_id, self.location, self.key_ring, self.crypto_key) # # start_time = time.time() # try: # response = self.get_crypto_key(name=key_name) # end_time = time.time() # print(f"Response Time: {end_time - start_time} seconds") # print("Access to KMS is successful.") # except Exception as e: # print(f"Failed to access KMS: {e}") pass