From 64cf8df3a9d9a3fc1087d2787ab95c8a5a4d96d8 Mon Sep 17 00:00:00 2001 From: Josako Date: Thu, 1 Aug 2024 17:35:54 +0200 Subject: [PATCH] - Improvements to enable deployment in the cloud, mainly changing file access to Minio - Improvements on RAG logging, and some debugging in that area --- .gitignore | 1 + common/extensions.py | 2 + common/langchain/EveAIRetriever.py | 67 +++++- common/models/user.py | 1 - common/utils/minio_utils.py | 86 +++++++ common/utils/model_utils.py | 2 +- config/config.py | 90 +++++++- config/prompts/openai/gpt-4o-mini.yaml | 79 +++++++ docker/compose.yaml | 34 ++- eveai_app/__init__.py | 4 +- eveai_app/views/document_views.py | 46 ++-- eveai_app/views/user_views.py | 23 +- eveai_chat_workers/tasks.py | 23 +- eveai_workers/__init__.py | 3 +- eveai_workers/tasks.py | 300 ++++++++++++------------- nginx/public/chat_eveai_mini.html | 28 +++ nginx/public/chat_flow.html | 28 +++ requirements.txt | 4 + scripts/start_eveai_chat.sh | 2 +- 19 files changed, 617 insertions(+), 206 deletions(-) create mode 100644 common/utils/minio_utils.py create mode 100644 config/prompts/openai/gpt-4o-mini.yaml create mode 100644 nginx/public/chat_eveai_mini.html create mode 100644 nginx/public/chat_flow.html diff --git a/.gitignore b/.gitignore index 52d3159..0db0019 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ docker/db/postgresql/ docker/db/redis/ docker/logs/ docker/tenant_files/ +/docker/minio/ diff --git a/common/extensions.py b/common/extensions.py index a676f55..c727a0c 100644 --- a/common/extensions.py +++ b/common/extensions.py @@ -11,6 +11,7 @@ from flask_session import Session from flask_wtf import CSRFProtect from .utils.key_encryption import JosKMSClient +from .utils.minio_utils import MinioClient # Create extensions db = SQLAlchemy() @@ -26,3 +27,4 @@ jwt = JWTManager() session = Session() kms_client = JosKMSClient.from_service_account_json('config/gc_sa_eveai.json') +minio_client = MinioClient() diff --git a/common/langchain/EveAIRetriever.py b/common/langchain/EveAIRetriever.py index 258da0e..5496398 100644 --- a/common/langchain/EveAIRetriever.py +++ b/common/langchain/EveAIRetriever.py @@ -1,5 +1,5 @@ from langchain_core.retrievers import BaseRetriever -from sqlalchemy import func, and_, or_ +from sqlalchemy import func, and_, or_, desc from sqlalchemy.exc import SQLAlchemyError from pydantic import BaseModel, Field from typing import Any, Dict @@ -20,12 +20,56 @@ class EveAIRetriever(BaseRetriever): self.tenant_info = tenant_info def _get_relevant_documents(self, query: str): + + + current_app.logger.debug(f'Retrieving relevant documents for query: {query}') query_embedding = self._get_query_embedding(query) db_class = self.model_variables['embedding_db_model'] similarity_threshold = self.model_variables['similarity_threshold'] k = self.model_variables['k'] + if self.tenant_info['rag_tuning']: + try: + current_date = get_date_in_timezone(self.tenant_info['timezone']) + current_app.rag_tuning_logger.debug(f'Current date: {current_date}\n') + + # Debug query to show similarity for all valid documents (without chunk text) + debug_query = ( + db.session.query( + Document.id.label('document_id'), + DocumentVersion.id.label('version_id'), + db_class.id.label('embedding_id'), + (1 - db_class.embedding.cosine_distance(query_embedding)).label('similarity') + ) + .join(DocumentVersion, db_class.doc_vers_id == DocumentVersion.id) + .join(Document, DocumentVersion.doc_id == Document.id) + .filter( + or_(Document.valid_from.is_(None), func.date(Document.valid_from) <= current_date), + or_(Document.valid_to.is_(None), func.date(Document.valid_to) >= current_date) + ) + .order_by(desc('similarity')) + ) + + debug_results = debug_query.all() + + current_app.logger.debug("Debug: Similarity for all valid documents:") + for row in debug_results: + current_app.rag_tuning_logger.debug(f"Doc ID: {row.document_id}, " + f"Version ID: {row.version_id}, " + f"Embedding ID: {row.embedding_id}, " + f"Similarity: {row.similarity}") + current_app.rag_tuning_logger.debug(f'---------------------------------------\n') + except SQLAlchemyError as e: + current_app.logger.error(f'Error generating overview: {e}') + db.session.rollback() + + if self.tenant_info['rag_tuning']: + current_app.rag_tuning_logger.debug(f'Parameters for Retrieval of documents: \n') + current_app.rag_tuning_logger.debug(f'Similarity Threshold: {similarity_threshold}\n') + current_app.rag_tuning_logger.debug(f'K: {k}\n') + current_app.rag_tuning_logger.debug(f'---------------------------------------\n') + try: current_date = get_date_in_timezone(self.tenant_info['timezone']) # Subquery to find the latest version of each document @@ -40,24 +84,31 @@ class EveAIRetriever(BaseRetriever): # Main query to filter embeddings query_obj = ( db.session.query(db_class, - db_class.embedding.cosine_distance(query_embedding).label('distance')) + (1 - db_class.embedding.cosine_distance(query_embedding)).label('similarity')) .join(DocumentVersion, db_class.doc_vers_id == DocumentVersion.id) .join(Document, DocumentVersion.doc_id == Document.id) .join(subquery, DocumentVersion.id == subquery.c.latest_version_id) .filter( - or_(Document.valid_from.is_(None), Document.valid_from <= current_date), - or_(Document.valid_to.is_(None), Document.valid_to >= current_date), - db_class.embedding.cosine_distance(query_embedding) < similarity_threshold + or_(Document.valid_from.is_(None), func.date(Document.valid_from) <= current_date), + or_(Document.valid_to.is_(None), func.date(Document.valid_to) >= current_date), + (1 - db_class.embedding.cosine_distance(query_embedding)) > similarity_threshold ) - .order_by('distance') + .order_by(desc('similarity')) .limit(k) ) + if self.tenant_info['rag_tuning']: + current_app.rag_tuning_logger.debug(f'Query executed for Retrieval of documents: \n') + current_app.rag_tuning_logger.debug(f'{query_obj.statement}\n') + current_app.rag_tuning_logger.debug(f'---------------------------------------\n') + res = query_obj.all() if self.tenant_info['rag_tuning']: - current_app.rag_tuning_logger.debug(f'Retrieved {len(res)} relevant documents') - current_app.rag_tuning_logger.debug(f'---------------------------------------') + current_app.rag_tuning_logger.debug(f'Retrieved {len(res)} relevant documents \n') + current_app.rag_tuning_logger.debug(f'Data retrieved: \n') + current_app.rag_tuning_logger.debug(f'{res}\n') + current_app.rag_tuning_logger.debug(f'---------------------------------------\n') result = [] for doc in res: diff --git a/common/models/user.py b/common/models/user.py index f806dfe..69e17f2 100644 --- a/common/models/user.py +++ b/common/models/user.py @@ -82,7 +82,6 @@ class Tenant(db.Model): 'html_excluded_elements': self.html_excluded_elements, 'min_chunk_size': self.min_chunk_size, 'max_chunk_size': self.max_chunk_size, - 'es_k' 'es_k': self.es_k, 'es_similarity_threshold': self.es_similarity_threshold, 'chat_RAG_temperature': self.chat_RAG_temperature, diff --git a/common/utils/minio_utils.py b/common/utils/minio_utils.py new file mode 100644 index 0000000..bd4e2bb --- /dev/null +++ b/common/utils/minio_utils.py @@ -0,0 +1,86 @@ +from minio import Minio +from minio.error import S3Error +from flask import Flask +import io +from werkzeug.datastructures import FileStorage + +class MinioClient: + def __init__(self): + self.client = None + + def init_app(self, app: Flask): + self.client = Minio( + app.config['MINIO_ENDPOINT'], + access_key=app.config['MINIO_ACCESS_KEY'], + secret_key=app.config['MINIO_SECRET_KEY'], + secure=app.config.get('MINIO_USE_HTTPS', False) + ) + app.logger.info(f"MinIO client initialized with endpoint: {app.config['MINIO_ENDPOINT']}") + + def generate_bucket_name(self, tenant_id): + return f"tenant-{tenant_id}-bucket" + + def create_tenant_bucket(self, tenant_id): + bucket_name = self.generate_bucket_name(tenant_id) + try: + if not self.client.bucket_exists(bucket_name): + self.client.make_bucket(bucket_name) + return bucket_name + return bucket_name + except S3Error as err: + raise Exception(f"Error occurred while creating bucket: {err}") + + def generate_object_name(self, document_id, language, version_id, filename): + return f"{document_id}/{language}/{version_id}/{filename}" + + def upload_document_file(self, tenant_id, document_id, language, version_id, filename, file_data): + bucket_name = self.generate_bucket_name(tenant_id) + object_name = self.generate_object_name(document_id, language, version_id, filename) + + try: + if isinstance(file_data, FileStorage): + file_data = file_data.read() + elif isinstance(file_data, io.BytesIO): + file_data = file_data.getvalue() + elif isinstance(file_data, str): + file_data = file_data.encode('utf-8') + elif not isinstance(file_data, bytes): + raise TypeError('Unsupported file type. Expected FileStorage, BytesIO, str, or bytes.') + + self.client.put_object( + bucket_name, object_name, io.BytesIO(file_data), len(file_data) + ) + return True + except S3Error as err: + raise Exception(f"Error occurred while uploading file: {err}") + + def download_document_file(self, tenant_id, document_id, language, version_id, filename): + bucket_name = self.generate_bucket_name(tenant_id) + object_name = self.generate_object_name(document_id, language, version_id, filename) + try: + response = self.client.get_object(bucket_name, object_name) + return response.read() + except S3Error as err: + raise Exception(f"Error occurred while downloading file: {err}") + + def list_document_files(self, tenant_id, document_id, language=None, version_id=None): + bucket_name = self.generate_bucket_name(tenant_id) + prefix = f"{document_id}/" + if language: + prefix += f"{language}/" + if version_id: + prefix += f"{version_id}/" + try: + objects = self.client.list_objects(bucket_name, prefix=prefix, recursive=True) + return [obj.object_name for obj in objects] + except S3Error as err: + raise Exception(f"Error occurred while listing files: {err}") + + def delete_document_file(self, tenant_id, document_id, language, version_id, filename): + bucket_name = self.generate_bucket_name(tenant_id) + object_name = self.generate_object_name(document_id, language, version_id, filename) + try: + self.client.remove_object(bucket_name, object_name) + return True + except S3Error as err: + raise Exception(f"Error occurred while deleting file: {err}") diff --git a/common/utils/model_utils.py b/common/utils/model_utils.py index 67d6851..9470685 100644 --- a/common/utils/model_utils.py +++ b/common/utils/model_utils.py @@ -141,7 +141,7 @@ def select_model_variables(tenant): default_headers=portkey_headers) tool_calling_supported = False match llm_model: - case 'gpt-4-turbo' | 'gpt-4o': + case 'gpt-4-turbo' | 'gpt-4o' | 'gpt-4o-mini': tool_calling_supported = True case _: raise Exception(f'Error setting model variables for tenant {tenant.id} ' diff --git a/config/config.py b/config/config.py index 8ad7d2b..7c2bd79 100644 --- a/config/config.py +++ b/config/config.py @@ -61,7 +61,7 @@ class Config(object): # supported LLMs SUPPORTED_EMBEDDINGS = ['openai.text-embedding-3-small', 'openai.text-embedding-3-large', 'mistral.mistral-embed'] - SUPPORTED_LLMS = ['openai.gpt-4o', 'anthropic.claude-3-5-sonnet'] + SUPPORTED_LLMS = ['openai.gpt-4o', 'anthropic.claude-3-5-sonnet', 'openai.gpt-4o-mini'] ANTHROPIC_LLM_VERSIONS = {'claude-3-5-sonnet': 'claude-3-5-sonnet-20240620', } @@ -71,6 +71,7 @@ class Config(object): # Annotation text chunk length ANNOTATION_TEXT_CHUNK_LENGTH = { 'openai.gpt-4o': 10000, + 'openai.gpt-4o-mini': 10000, 'anthropic.claude-3-5-sonnet': 8000 } @@ -184,12 +185,95 @@ class DevConfig(Config): # PATH settings ffmpeg_path = '/usr/bin/ffmpeg' + # MINIO + MINIO_ENDPOINT = 'minio:9000' + MINIO_ACCESS_KEY = 'minioadmin' + MINIO_SECRET_KEY = 'minioadmin' + class ProdConfig(Config): DEVELOPMENT = False DEBUG = False - # SQLALCHEMY_DATABASE_URI = environ.get('SQLALCHEMY_DATABASE_URI') or \ - # 'sqlite:///' + os.path.join(basedir, 'db.sqlite') + DEVELOPMENT = True + DEBUG = True + FLASK_DEBUG = True + PYCHARM_DEBUG = False + DB_HOST = environ.get('DB_HOST', 'bswnz4.stackhero-network.com') + DB_USER = environ.get('DB_USER', 'luke_skywalker') + DB_PASS = environ.get('DB_PASS', '2MK&1rHmWEydE2rFuJLq*ls%tdkPAk2') + DB_NAME = environ.get('DB_NAME', 'eveai') + DB_PORT = environ.get('DB_PORT', '5945') + + SQLALCHEMY_DATABASE_URI = f'postgresql+pg8000://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}' + SQLALCHEMY_BINDS = {'public': SQLALCHEMY_DATABASE_URI} + EXPLAIN_TEMPLATE_LOADING = False + + # Define the nginx prefix used for the specific apps + EVEAI_APP_LOCATION_PREFIX = '/admin' + EVEAI_CHAT_LOCATION_PREFIX = '/chat' + + # flask-mailman settings + MAIL_USERNAME = 'eveai_super@flow-it.net' + MAIL_PASSWORD = '$6xsWGbNtx$CFMQZqc*' + + # file upload settings + UPLOAD_FOLDER = '/app/tenant_files' + + REDIS_USER = 'admin' + REDIS_PASS = 'b32vtDtLriSY1fL2zGrZg8IZKI0g9ucsLtVNanRFAras6oZ51wjVNB1Y05uG7uEw' + REDIS_URL = '8bciqc.stackhero-network.com' + REDIS_PORT = '9961' + REDIS_BASE_URI = f'redis://{REDIS_USER}:{REDIS_PASS}@{REDIS_URL}:{REDIS_PORT}' + + # Celery settings + # eveai_app Redis Settings + CELERY_BROKER_URL = f'{REDIS_BASE_URI}/0' + CELERY_RESULT_BACKEND = f'{REDIS_BASE_URI}/0' + # eveai_chat Redis Settings + CELERY_BROKER_URL_CHAT = f'{REDIS_BASE_URI}/3' + CELERY_RESULT_BACKEND_CHAT = f'{REDIS_BASE_URI}/3' + + # Session settings + SESSION_REDIS = redis.from_url(f'{REDIS_BASE_URI}/2') + + # OpenAI API Keys + OPENAI_API_KEY = 'sk-proj-8R0jWzwjL7PeoPyMhJTZT3BlbkFJLb6HfRB2Hr9cEVFWEhU7' + + # Groq API Keys + GROQ_API_KEY = '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' + + # Anthropic API Keys + ANTHROPIC_API_KEY = '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' + + # Portkey API Keys + PORTKEY_API_KEY = 'T2Dt4QTpgCvWxa1OftYCJtj7NcDZ' + + # Unstructured settings + UNSTRUCTURED_API_KEY = 'pDgCrXumYhM3CNvjvwV8msMldXC3uw' + UNSTRUCTURED_BASE_URL = 'https://flowitbv-16c4us0m.api.unstructuredapp.io' + UNSTRUCTURED_FULL_URL = 'https://flowitbv-16c4us0m.api.unstructuredapp.io/general/v0/general' + + # SocketIO settings + SOCKETIO_MESSAGE_QUEUE = f'{REDIS_BASE_URI}/1' + SOCKETIO_CORS_ALLOWED_ORIGINS = '*' + SOCKETIO_LOGGER = True + SOCKETIO_ENGINEIO_LOGGER = True + SOCKETIO_PING_TIMEOUT = 20000 + SOCKETIO_PING_INTERVAL = 25000 + SOCKETIO_MAX_IDLE_TIME = timedelta(minutes=60) # Changing this value ==> change maxConnectionDuration value in + # eveai-chat-widget.js + + # Google Cloud settings + GC_PROJECT_NAME = 'eveai-420711' + GC_LOCATION = 'europe-west1' + GC_KEY_RING = 'eveai-chat' + GC_CRYPTO_KEY = 'envelope-encryption-key' + + # JWT settings + JWT_SECRET_KEY = 'bsdMkmQ8ObfMD52yAFg4trrvjgjMhuIqg2fjDpD/JqvgY0ccCcmlsEnVFmR79WPiLKEA3i8a5zmejwLZKl4v9Q==' + + # PATH settings + ffmpeg_path = '/usr/bin/ffmpeg' config = { diff --git a/config/prompts/openai/gpt-4o-mini.yaml b/config/prompts/openai/gpt-4o-mini.yaml new file mode 100644 index 0000000..2413299 --- /dev/null +++ b/config/prompts/openai/gpt-4o-mini.yaml @@ -0,0 +1,79 @@ +html_parse: | + You are a top administrative assistant specialized in transforming given HTML into markdown formatted files. The generated files will be used to generate embeddings in a RAG-system. + + # Best practices are: + - Respect wordings and language(s) used in the HTML. + - The following items need to be considered: headings, paragraphs, listed items (numbered or not) and tables. Images can be neglected. + - Sub-headers can be used as lists. This is true when a header is followed by a series of sub-headers without content (paragraphs or listed items). Present those sub-headers as a list. + - Be careful of encoding of the text. Everything needs to be human readable. + + Process the file carefully, and take a stepped approach. The resulting markdown should be the result of the processing of the complete input html file. Answer with the pure markdown, without any other text. + + HTML is between triple backquotes. + + ```{html}``` + +pdf_parse: | + You are a top administrative aid specialized in transforming given PDF-files into markdown formatted files. The generated files will be used to generate embeddings in a RAG-system. + + # Best practices are: + - Respect wordings and language(s) used in the PDF. + - The following items need to be considered: headings, paragraphs, listed items (numbered or not) and tables. Images can be neglected. + - When headings are numbered, show the numbering and define the header level. + - A new item is started when a is found before a full line is reached. In order to know the number of characters in a line, please check the document and the context within the document (e.g. an image could limit the number of characters temporarily). + - Paragraphs are to be stripped of newlines so they become easily readable. + - Be careful of encoding of the text. Everything needs to be human readable. + + Process the file carefully, and take a stepped approach. The resulting markdown should be the result of the processing of the complete input pdf content. Answer with the pure markdown, without any other text. + + PDF content is between triple backquotes. + + ```{pdf_content}``` + +summary: | + Write a concise summary of the text in {language}. The text is delimited between triple backquotes. + ```{text}``` + +rag: | + Answer the question based on the following context, delimited between triple backquotes. + {tenant_context} + Use the following {language} in your communication, and cite the sources used. + If the question cannot be answered using the given context, say "I have insufficient information to answer this question." + Context: + ```{context}``` + Question: + {question} + +history: | + You are a helpful assistant that details a question based on a previous context, + in such a way that the question is understandable without the previous context. + The context is a conversation history, with the HUMAN asking questions, the AI answering questions. + The history is delimited between triple backquotes. + You answer by stating the question in {language}. + History: + ```{history}``` + Question to be detailed: + {question} + +encyclopedia: | + You have a lot of background knowledge, and as such you are some kind of + 'encyclopedia' to explain general terminology. Only answer if you have a clear understanding of the question. + If not, say you do not have sufficient information to answer the question. Use the {language} in your communication. + Question: + {question} + +transcript: | + You are a top administrative assistant specialized in transforming given transcriptions into markdown formatted files. The generated files will be used to generate embeddings in a RAG-system. The transcriptions originate from podcast, videos and similar material. + + # Best practices and steps are: + - Respect wordings and language(s) used in the transcription. Main language is {language}. + - Sometimes, the transcript contains speech of several people participating in a conversation. Although these are not obvious from reading the file, try to detect when other people are speaking. + - Divide the transcript into several logical parts. Ensure questions and their answers are in the same logical part. + - annotate the text to identify these logical parts using headings in {language}. + - improve errors in the transcript given the context, but do not change the meaning and intentions of the transcription. + + Process the file carefully, and take a stepped approach. The resulting markdown should be the result of processing the complete input transcription. Answer with the pure markdown, without any other text. + + The transcript is between triple backquotes. + + ```{transcript}``` \ No newline at end of file diff --git a/docker/compose.yaml b/docker/compose.yaml index 1cad69b..093d139 100644 --- a/docker/compose.yaml +++ b/docker/compose.yaml @@ -15,6 +15,9 @@ x-common-variables: &common-variables DB_NAME: eveai FLASK_ENV: development FLASK_DEBUG: 1 + MINIO_ENDPOINT: minio:9000 + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin services: nginx: @@ -48,12 +51,13 @@ services: - ../scripts:/app/scripts - ../patched_packages:/app/patched_packages - ./logs:/app/logs - - ./tenant_files:/app/tenant_files depends_on: db: condition: service_healthy redis: condition: service_healthy + minio: + condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:5001/health"] interval: 10s @@ -76,12 +80,13 @@ services: - ../scripts:/app/scripts - ../patched_packages:/app/patched_packages - ./logs:/app/logs - - ./tenant_files:/app/tenant_files depends_on: db: condition: service_healthy redis: condition: service_healthy + minio: + condition: service_healthy # healthcheck: # test: [ "CMD", "curl", "-f", "http://localhost:5001/health" ] # interval: 10s @@ -174,7 +179,30 @@ services: interval: 10s timeout: 5s retries: 5 -#volumes: + + minio: + image: minio/minio + ports: + - "9000:9000" + - "9001:9001" + expose: + - 9000 + volumes: + - ./minio/data:/data + - ./minio/config:/root/.minio + environment: + MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin} + MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin} + command: server /data --console-address ":9001" + healthcheck: + test: [ "CMD", "mc", "ready", "local" ] + interval: 30s + timeout: 20s + retries: 3 + start_period: 30s + +volumes: + minio_data: # db-data: # redis-data: # tenant-files: diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index e82ea24..afc415b 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -6,7 +6,8 @@ from flask_security.signals import user_authenticated from werkzeug.middleware.proxy_fix import ProxyFix import logging.config -from common.extensions import db, migrate, bootstrap, security, mail, login_manager, cors, kms_client, csrf, session +from common.extensions import (db, migrate, bootstrap, security, mail, login_manager, cors, kms_client, csrf, session, + minio_client) from common.models.user import User, Role, Tenant, TenantDomain import common.models.interaction from config.logging_config import LOGGING @@ -102,6 +103,7 @@ def register_extensions(app): cors.init_app(app) kms_client.init_app(app) session.init_app(app) + minio_client.init_app(app) # Register Blueprints diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index b21bc1d..e2b9611 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -14,9 +14,10 @@ import requests from requests.exceptions import SSLError from urllib.parse import urlparse import io +from minio.error import S3Error from common.models.document import Document, DocumentVersion -from common.extensions import db +from common.extensions import db, minio_client from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddYoutubeForm, \ AddURLsForm from common.utils.middleware import mw_before_request @@ -558,33 +559,34 @@ def upload_file_for_version(doc_vers, file, extension): doc_vers.file_name = doc_vers.calc_file_name() doc_vers.file_location = doc_vers.calc_file_location() - upload_path = os.path.join(current_app.config['UPLOAD_FOLDER'], doc_vers.file_location) - if not os.path.exists(upload_path): - os.makedirs(upload_path, exist_ok=True) - if isinstance(file, FileStorage): - file.save(os.path.join(upload_path, doc_vers.file_name)) - elif isinstance(file, io.BytesIO): - # It's a BytesIO object, handle accordingly - # Example: write content to a file manually - with open(os.path.join(upload_path, doc_vers.file_name), 'wb') as f: - f.write(file.getvalue()) - elif isinstance(file, str): - # It's a string, handle accordingly - with open(os.path.join(upload_path, doc_vers.file_name), 'w') as f: - f.write(file) - else: - raise TypeError('Unsupported file type.') + # Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place. + tenant_id = session['tenant']['id'] + minio_client.create_tenant_bucket(tenant_id) try: + minio_client.upload_document_file( + tenant_id, + doc_vers.doc_id, + doc_vers.language, + doc_vers.id, + doc_vers.file_name, + file + ) db.session.commit() + current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' + f'document version {doc_vers.id} while uploading file.') + except S3Error as e: + db.session.rollback() + flash('Error saving document to MinIO.', 'error') + current_app.logger.error( + f'Error saving document to MinIO for tenant {tenant_id}: {e}') + raise except SQLAlchemyError as e: db.session.rollback() - flash('Error saving document.', 'error') + flash('Error saving document metadata.', 'error') current_app.logger.error( - f'Error saving document for tenant {session["tenant"]["id"]} while uploading file: {e}') - - current_app.logger.info(f'Succesfully saved document for tenant {session['tenant']['id']} for ' - f'document version {doc_vers.id} while uploading file.') + f'Error saving document metadata for tenant {tenant_id}: {e}') + raise def fetch_html(url): diff --git a/eveai_app/views/user_views.py b/eveai_app/views/user_views.py index bf6e548..d3ec1bd 100644 --- a/eveai_app/views/user_views.py +++ b/eveai_app/views/user_views.py @@ -8,7 +8,7 @@ from sqlalchemy.exc import SQLAlchemyError import ast from common.models.user import User, Tenant, Role, TenantDomain -from common.extensions import db, kms_client, security +from common.extensions import db, kms_client, security, minio_client from common.utils.security_utils import send_confirmation_email, send_reset_email from .user_forms import TenantForm, CreateUserForm, EditUserForm, TenantDomainForm from common.utils.database import Database @@ -61,12 +61,13 @@ def tenant(): # rag_tuning=form.rag_tuning.data) # Handle Embedding Variables - new_tenant.html_tags = form.html_tags.data.split(',') if form.html_tags.data else [] - new_tenant.html_end_tags = form.html_end_tags.data.split(',') if form.html_end_tags.data else [] - new_tenant.html_included_elements = form.html_included_elements.data.split( - ',') if form.html_included_elements.data else [] - new_tenant.html_excluded_elements = form.html_excluded_elements.data.split( - ',') if form.html_excluded_elements.data else [] + new_tenant.html_tags = [tag.strip() for tag in form.html_tags.data.split(',')] if form.html_tags.data else [] + new_tenant.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',')] \ + if form.html_end_tags.data else [] + new_tenant.html_included_elements = [tag.strip() for tag in form.html_included_elements.data.split(',')] \ + if form.html_included_elements.data else [] + new_tenant.html_excluded_elements = [tag.strip() for tag in form.html_excluded_elements.data.split(',')] \ + if form.html_excluded_elements.data else [] current_app.logger.debug(f'html_tags: {new_tenant.html_tags},' f'html_end_tags: {new_tenant.html_end_tags},' @@ -87,11 +88,17 @@ def tenant(): flash(f'Failed to add tenant to database. Error: {str(e)}') return render_template('user/tenant.html', form=form) - # Create schema for new tenant current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database") flash(f"Successfully created tenant {new_tenant.id} in Database") + + # Create schema for new tenant current_app.logger.info(f"Creating schema for tenant {new_tenant.id}") Database(new_tenant.id).create_tenant_schema() + + # Create MinIO bucket for new tenant + current_app.logger.info(f"Creating MinIO bucket for tenant {new_tenant.id}") + minio_client.create_tenant_bucket(new_tenant.id) + return redirect(prefixed_url_for('basic_bp.index')) else: form_validation_failed(request, form) diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index 003a1d0..cc68eee 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -81,6 +81,12 @@ def ask_question(tenant_id, question, language, session_id, user_timezone): current_app.logger.error(f'ask_question: Error initializing chat session in database: {e}') raise + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'Received question for tenant {tenant_id}:\n{question}. Processing...') + current_app.rag_tuning_logger.debug(f'Tenant Information: \n{tenant.to_dict()}') + current_app.rag_tuning_logger.debug(f'===================================================================') + current_app.rag_tuning_logger.debug(f'===================================================================') + result, interaction = answer_using_tenant_rag(question, language, tenant, chat_session) result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name'] result['interaction_id'] = interaction.id @@ -116,6 +122,9 @@ def answer_using_tenant_rag(question, language, tenant, chat_session): detailed_question = detail_question(question, language, model_variables, chat_session.session_id) current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'Detailed Question for tenant {tenant.id}:\n{question}.') + current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') new_interaction.detailed_question = detailed_question new_interaction.detailed_question_at = dt.now(tz.utc) @@ -126,6 +135,9 @@ def answer_using_tenant_rag(question, language, tenant, chat_session): full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context']) rag_prompt = ChatPromptTemplate.from_template(full_template) setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()}) + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'Full prompt for tenant {tenant.id}:\n{full_template}.') + current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') new_interaction_embeddings = [] if not model_variables['cited_answer_cls']: # The model doesn't support structured feedback @@ -151,6 +163,11 @@ def answer_using_tenant_rag(question, language, tenant, chat_session): current_app.logger.debug(f'ask_question: result answer: {result['answer']}') current_app.logger.debug(f'ask_question: result citations: {result["citations"]}') current_app.logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}') + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'ask_question: result answer: {result['answer']}') + current_app.rag_tuning_logger.debug(f'ask_question: result citations: {result["citations"]}') + current_app.rag_tuning_logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}') + current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') new_interaction.answer = result['answer'] # Filter out the existing Embedding IDs @@ -161,7 +178,11 @@ def answer_using_tenant_rag(question, language, tenant, chat_session): .all() ) existing_embedding_ids = [emb.id for emb in embeddings] - urls = [emb.document_version.url for emb in embeddings] + urls = list(set(emb.document_version.url for emb in embeddings)) + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'Referenced documents for answer for tenant {tenant.id}:\n') + current_app.rag_tuning_logger.debug(f'{urls}') + current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') for emb_id in existing_embedding_ids: new_interaction_embedding = InteractionEmbedding(embedding_id=emb_id) diff --git a/eveai_workers/__init__.py b/eveai_workers/__init__.py index 95ef7e9..55db2f9 100644 --- a/eveai_workers/__init__.py +++ b/eveai_workers/__init__.py @@ -3,7 +3,7 @@ import logging.config from flask import Flask from common.utils.celery_utils import make_celery, init_celery -from common.extensions import db +from common.extensions import db, minio_client from config.logging_config import LOGGING @@ -33,6 +33,7 @@ def create_app(config_file=None): def register_extensions(app): db.init_app(app) + minio_client.init_app(app) app, celery = create_app() diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 22467d6..0c76f7a 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -1,3 +1,4 @@ +import io import os from datetime import datetime as dt, timezone as tz import subprocess @@ -21,7 +22,7 @@ import PyPDF2 from pydub import AudioSegment import tempfile -from common.extensions import db +from common.extensions import db, minio_client from common.models.document import DocumentVersion, Embedding from common.models.user import Tenant from common.utils.celery_utils import current_celery @@ -32,11 +33,6 @@ from common.utils.os_utils import safe_remove, sync_folder @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): - # Setup Remote Debugging only if PYCHARM_DEBUG=True - if current_app.config['PYCHARM_DEBUG']: - import pydevd_pycharm - pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True) - current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') try: @@ -50,6 +46,7 @@ def create_embeddings(tenant_id, document_version_id): # Select variables to work with depending on tenant and model model_variables = select_model_variables(tenant) + current_app.logger.debug(f'Model variables: {model_variables}') # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) @@ -107,33 +104,20 @@ def create_embeddings(tenant_id, document_version_id): def process_pdf(tenant, model_variables, document_version): - base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], - document_version.file_location) - file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], - document_version.file_location, - document_version.file_name) - if os.path.exists(file_path): - pdf_text = '' - # Function to extract text from PDF and return as string - with open(file_path, 'rb') as file: - reader = PyPDF2.PdfReader(file) - for page_num in range(len(reader.pages)): - page = reader.pages[page_num] - pdf_text += page.extract_text() - else: - current_app.logger.error(f'The physical file for document version {document_version.id} ' - f'for tenant {tenant.id} ' - f'at {file_path} does not exist') - create_embeddings.update_state(state=states.FAILURE) - raise + file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, + document_version.id, document_version.file_name) + + pdf_text = '' + pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data)) + for page in pdf_reader.pages: + pdf_text += page.extract_text() markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text) markdown_file_name = f'{document_version.id}.md' - output_file = os.path.join(base_path, markdown_file_name) - with open(output_file, 'w') as f: - f.write(markdown) + minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, + markdown_file_name, markdown.encode()) - potential_chunks = create_potential_chunks_for_markdown(base_path, markdown_file_name, tenant) + potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) @@ -175,43 +159,29 @@ def delete_embeddings_for_document_version(document_version): def process_html(tenant, model_variables, document_version): + file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, + document_version.id, document_version.file_name) + html_content = file_data.decode('utf-8') + # The tags to be considered can be dependent on the tenant html_tags = model_variables['html_tags'] html_end_tags = model_variables['html_end_tags'] html_included_elements = model_variables['html_included_elements'] html_excluded_elements = model_variables['html_excluded_elements'] - base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], - document_version.file_location) - - file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], - document_version.file_location, - document_version.file_name) - - if os.path.exists(file_path): - with open(file_path, 'rb') as f: - html_content = f.read() - else: - current_app.logger.error(f'The physical file for document version {document_version.id} ' - f'for tenant {tenant.id} ' - f'at {file_path} does not exist') - create_embeddings.update_state(state=states.FAILURE) - raise - extracted_html, title = parse_html(tenant, html_content, html_tags, included_elements=html_included_elements, excluded_elements=html_excluded_elements) + extracted_file_name = f'{document_version.id}-extracted.html' - output_file = os.path.join(base_path, extracted_file_name) - with open(output_file, 'w') as f: - f.write(extracted_html) + minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, + extracted_file_name, extracted_html.encode()) markdown = generate_markdown_from_html(tenant, model_variables, document_version, extracted_html) markdown_file_name = f'{document_version.id}.md' - output_file = os.path.join(base_path, markdown_file_name) - with open(output_file, 'w') as f: - f.write(markdown) + minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, + markdown_file_name, markdown.encode()) - potential_chunks = create_potential_chunks_for_markdown(base_path, markdown_file_name, tenant) + potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) @@ -222,7 +192,7 @@ def process_html(tenant, model_variables, document_version): else: document_version.system_context = (f'Title: {title}\n') - enriched_chunks = enrich_chunks(tenant, document_version, chunks) + enriched_chunks = enrich_chunks(tenant, document_version, title, chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: @@ -241,16 +211,17 @@ def process_html(tenant, model_variables, document_version): f'on document version {document_version.id} :-)') -def enrich_chunks(tenant, document_version, chunks): +def enrich_chunks(tenant, document_version, title, chunks): current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') current_app.logger.debug(f'Nr of chunks: {len(chunks)}') chunk_total_context = (f'Filename: {document_version.file_name}\n' - f'User Context:{document_version.user_context}\n' + f'User Context:\n{document_version.user_context}\n\n' f'{document_version.system_context}\n\n') enriched_chunks = [] initial_chunk = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' + f'Title: {title}\n' f'{chunks[0]}') enriched_chunks.append(initial_chunk) @@ -311,7 +282,7 @@ def summarize_chunk(tenant, model_variables, document_version, chunk): text_to_summarize = doc_creator.create_documents(chunk) try: - summary = chain.run(text_to_summarize) + summary = chain.invoke({"text": text_to_summarize}) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') return summary @@ -391,23 +362,26 @@ def process_youtube(tenant, model_variables, document_version): markdown_file_name = f'{document_version.id}.md' # Remove existing files (in case of a re-processing of the file - safe_remove(os.path.join(base_path, download_file_name)) - safe_remove(os.path.join(base_path, compressed_file_name)) - safe_remove(os.path.join(base_path, transcription_file_name)) - safe_remove(os.path.join(base_path, markdown_file_name)) - sync_folder(base_path) + minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, + document_version.id, download_file_name) + minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, + document_version.id, compressed_file_name) + minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, + document_version.id, transcription_file_name) + minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, + document_version.id, markdown_file_name) - of, title, description, author = download_youtube(document_version.url, base_path, download_file_name, tenant) + of, title, description, author = download_youtube(document_version.url, tenant.id, document_version, + download_file_name) document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' - compress_audio(base_path, download_file_name, compressed_file_name, tenant) - transcribe_audio(base_path, compressed_file_name, transcription_file_name, - document_version.language, tenant, model_variables) - annotate_transcription(base_path, transcription_file_name, markdown_file_name, - document_version.language, tenant, model_variables) + compress_audio(tenant.id, document_version, download_file_name, compressed_file_name) + transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables) + annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables) - potential_chunks = create_potential_chunks_for_markdown(base_path, markdown_file_name, tenant) + potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) + enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) @@ -427,83 +401,72 @@ def process_youtube(tenant, model_variables, document_version): f'on Youtube document version {document_version.id} :-)') -def download_youtube(url, file_location, file_name, tenant): +def download_youtube(url, tenant_id, document_version, file_name): try: - current_app.logger.info(f'Downloading YouTube video: {url} on location {file_location} for tenant: {tenant.id}') + current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}') yt = YouTube(url) stream = yt.streams.get_audio_only() - output_file = stream.download(output_path=file_location, filename=file_name) - current_app.logger.info(f'Downloaded YouTube video: {url} on location {file_location} for tenant: {tenant.id}') - return output_file, yt.title, yt.description, yt.author + + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + stream.download(output_path=temp_file.name) + with open(temp_file.name, 'rb') as f: + file_data = f.read() + + minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, + file_name, file_data) + + current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}') + return file_name, yt.title, yt.description, yt.author except Exception as e: - current_app.logger.error(f'Error downloading YouTube video: {url} on location {file_location} for ' - f'tenant: {tenant.id} with error: {e}') + current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}') raise -def compress_audio(file_location, input_file, output_file, tenant): +def compress_audio(tenant_id, document_version, input_file, output_file): try: - current_app.logger.info(f'Compressing audio on {file_location} for tenant: {tenant.id}') + current_app.logger.info(f'Compressing audio for tenant: {tenant_id}') - # Run the compression script - result = subprocess.run( - ['scripts/compress.sh', '-d', file_location, '-i', input_file, '-o', output_file], - capture_output=True, - text=True - ) + input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, + document_version.id, input_file) - if result.returncode != 0: - raise Exception(f"Compression failed: {result.stderr}") + with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input: + temp_input.write(input_data) + temp_input.flush() - output_file_path = os.path.join(file_location, output_file) + with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output: + result = subprocess.run( + ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name], + capture_output=True, + text=True + ) - # Additional check for file stability - previous_size = -1 - stable_count = 0 - max_attempts = 12 # 1 minute total wait time + if result.returncode != 0: + raise Exception(f"Compression failed: {result.stderr}") - for _ in range(max_attempts): - if os.path.exists(output_file_path): - current_size = os.path.getsize(output_file_path) - if current_size == previous_size: - stable_count += 1 - if stable_count >= 3: # File size hasn't changed for 3 checks - break - else: - stable_count = 0 - previous_size = current_size - gevent.sleep(5) + with open(temp_output.name, 'rb') as f: + compressed_data = f.read() - if stable_count < 3: - raise Exception("File size did not stabilize within the expected time") + minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, + output_file, compressed_data) - current_app.logger.info(f'Compressed audio for {file_location} for tenant: {tenant.id}') - return output_file_path + current_app.logger.info(f'Compressed audio for tenant: {tenant_id}') except Exception as e: - current_app.logger.error(f'Error compressing audio on {file_location} for tenant: {tenant.id} with error: {e}') + current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}') raise -def transcribe_audio(file_location, input_file, output_file, language, tenant, model_variables): +def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables): try: - current_app.logger.info(f'Transcribing audio on {file_location} for tenant: {tenant.id}') + current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}') client = model_variables['transcription_client'] model = model_variables['transcription_model'] - input_file_path = os.path.join(file_location, input_file) - output_file_path = os.path.join(file_location, output_file) - # Wait for the input file to exist - count = 0 - while not os.path.exists(input_file_path) and count < 10: - gevent.sleep(1) - current_app.logger.debug(f'Waiting for {input_file_path} to exist... Count: {count}') - count += 1 + # Download the audio file from MinIO + audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, + document_version.id, input_file) - if not os.path.exists(input_file_path): - raise FileNotFoundError(f"Input file {input_file_path} not found after waiting.") - - # Load the audio file - audio = AudioSegment.from_file(input_file_path) + # Load the audio data into pydub + audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) # Define segment length (e.g., 10 minutes) segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds @@ -512,14 +475,16 @@ def transcribe_audio(file_location, input_file, output_file, language, tenant, m # Split audio into segments and transcribe each for i, chunk in enumerate(audio[::segment_length]): - current_app.logger.debug(f'Transcribing chunk {i} of {len(audio) // segment_length} ') + current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}') + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: chunk.export(temp_audio.name, format="mp3") + with open(temp_audio.name, 'rb') as audio_segment: transcription = client.audio.transcriptions.create( file=audio_segment, model=model, - language=language, + language=document_version.language, response_format='verbose_json', ) @@ -530,20 +495,25 @@ def transcribe_audio(file_location, input_file, output_file, language, tenant, m # Combine all transcriptions full_transcription = " ".join(transcriptions) - # Write the full transcription to the output file - with open(output_file_path, 'w') as f: - f.write(full_transcription) + # Upload the full transcription to MinIO + minio_client.upload_document_file( + tenant_id, + document_version.doc_id, + document_version.language, + document_version.id, + output_file, + full_transcription.encode('utf-8') + ) - current_app.logger.info(f'Transcribed audio for {file_location} for tenant: {tenant.id}') + current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}') except Exception as e: - current_app.logger.error(f'Error transcribing audio for {file_location} for tenant: {tenant.id}, ' - f'with error: {e}') + current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}') raise -def annotate_transcription(file_location, input_file, output_file, language, tenant, model_variables): +def annotate_transcription(tenant, document_version, input_file, output_file, model_variables): try: - current_app.logger.debug(f'Annotating transcription on {file_location} for tenant {tenant.id}') + current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}') char_splitter = CharacterTextSplitter(separator='.', chunk_size=model_variables['annotation_chunk_length'], @@ -552,18 +522,21 @@ def annotate_transcription(file_location, input_file, output_file, language, ten headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), - # ("###", "Header 3"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) llm = model_variables['llm'] template = model_variables['transcript_template'] - language_template = create_language_template(template, language) + language_template = create_language_template(template, document_version.language) transcript_prompt = ChatPromptTemplate.from_template(language_template) setup = RunnablePassthrough() output_parser = StrOutputParser() - with open(os.path.join(file_location, input_file), 'r') as f: - transcript = f.read() + + # Download the transcription file from MinIO + transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id, + document_version.language, document_version.id, + input_file) + transcript = transcript_data.decode('utf-8') chain = setup | transcript_prompt | llm | output_parser @@ -598,38 +571,53 @@ def annotate_transcription(file_location, input_file, output_file, language, ten markdown_chunks.pop() all_markdown_chunks += markdown_chunks - all_markdown_chunks += [last_markdown_chunk] annotated_transcript = '\n'.join(all_markdown_chunks) - with open(os.path.join(file_location, output_file), 'w') as f: - f.write(annotated_transcript) + # Upload the annotated transcript to MinIO + minio_client.upload_document_file( + tenant.id, + document_version.doc_id, + document_version.language, + document_version.id, + output_file, + annotated_transcript.encode('utf-8') + ) - current_app.logger.info(f'Annotated transcription for {file_location} for tenant {tenant.id}') + current_app.logger.info(f'Annotated transcription for tenant {tenant.id}') except Exception as e: - current_app.logger.error(f'Error annotating transcription for {file_location} for tenant {tenant.id}, ' - f'with error: {e}') + current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}') raise -def create_potential_chunks_for_markdown(base_path, input_file, tenant): - current_app.logger.info(f'Creating potential chunks for {base_path} for tenant {tenant.id}') - markdown = '' - with open(os.path.join(base_path, input_file), 'r') as f: - markdown = f.read() +def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): + try: + current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') - headers_to_split_on = [ - ("#", "Header 1"), - ("##", "Header 2"), - # ("###", "Header 3"), - ] + # Download the markdown file from MinIO + markdown_data = minio_client.download_document_file(tenant_id, + document_version.doc_id, + document_version.language, + document_version.id, + input_file + ) + markdown = markdown_data.decode('utf-8') - markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) - md_header_splits = markdown_splitter.split_text(markdown) - potential_chunks = [doc.page_content for doc in md_header_splits] + headers_to_split_on = [ + ("#", "Header 1"), + ("##", "Header 2"), + ] - return potential_chunks + markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) + md_header_splits = markdown_splitter.split_text(markdown) + potential_chunks = [doc.page_content for doc in md_header_splits] + + current_app.logger.debug(f'Created {len(potential_chunks)} potential chunks for tenant {tenant_id}') + return potential_chunks + except Exception as e: + current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}') + raise def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): diff --git a/nginx/public/chat_eveai_mini.html b/nginx/public/chat_eveai_mini.html new file mode 100644 index 0000000..98b072b --- /dev/null +++ b/nginx/public/chat_eveai_mini.html @@ -0,0 +1,28 @@ + + + + + + Chat Client EveAI Mini + + + + + + + + +
+ + + \ No newline at end of file diff --git a/nginx/public/chat_flow.html b/nginx/public/chat_flow.html new file mode 100644 index 0000000..a38e493 --- /dev/null +++ b/nginx/public/chat_flow.html @@ -0,0 +1,28 @@ + + + + + + Chat Client AE + + + + + + + + +
+ + + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0fd70a4..e48ad06 100644 --- a/requirements.txt +++ b/requirements.txt @@ -76,3 +76,7 @@ groq~=0.9.0 pydub~=0.25.1 argparse~=1.4.0 portkey_ai~=1.7.0 + +minio~=7.2.7 +Werkzeug~=3.0.3 +itsdangerous~=2.2.0 \ No newline at end of file diff --git a/scripts/start_eveai_chat.sh b/scripts/start_eveai_chat.sh index 2f1fc61..789410a 100755 --- a/scripts/start_eveai_chat.sh +++ b/scripts/start_eveai_chat.sh @@ -7,7 +7,7 @@ export PYTHONPATH="$PROJECT_DIR/patched_packages:$PYTHONPATH:$PROJECT_DIR" # In # Set flask environment variables #export FLASK_ENV=development # Use 'production' as appropriate #export FLASK_DEBUG=1 # Use 0 for production -print "Starting EveAI Chat" +echo "Starting EveAI Chat" # Start Flask app gunicorn -w 4 -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -b 0.0.0.0:5002 scripts.run_eveai_chat:app